From 1a9e6177d7a528292fe0a47c49cb502adbc16829 Mon Sep 17 00:00:00 2001 From: Richard Stiskalek Date: Tue, 2 May 2023 13:57:13 +0100 Subject: [PATCH] Switch to h5py format (#52) * Edit the particle paths * Remove script * Add h5py to dumping * Minor adjustments * add h5py support * remove split path * h5py support * Type * Edit initmatch paths * Shorten func * dist_centmass to work with 2D arrays * Forgot to return the centre of mass * Fixed code * Fix halo bug * Start MPI broadcasting * Mini bug * Remove commenting * Remove test statement * Fix index * Printing from rank 0 only * Move where clump index stored * Add dtype options * Add dtype options --- csiborgtools/fits/halo.py | 21 +--- csiborgtools/match/match.py | 16 +-- csiborgtools/read/paths.py | 52 ++++----- scripts/fit_halos.py | 34 +++--- scripts/fit_profiles.py | 33 +++--- scripts/pre_dumppart.py | 75 ++++++++++--- scripts/pre_initmatch.py | 210 ++++++++++++++++++++---------------- scripts/pre_splithalos.py | 118 -------------------- 8 files changed, 236 insertions(+), 323 deletions(-) delete mode 100644 scripts/pre_splithalos.py diff --git a/csiborgtools/fits/halo.py b/csiborgtools/fits/halo.py index fef2e40..1f32ad8 100644 --- a/csiborgtools/fits/halo.py +++ b/csiborgtools/fits/halo.py @@ -40,8 +40,7 @@ class BaseStructure(ABC): @particles.setter def particles(self, particles): - pars = ["x", "y", "z", "M"] - assert all(p in particles.dtype.names for p in pars) + assert particles.ndim == 2 and particles.shape[1] == 7 self._particles = particles @property @@ -256,24 +255,14 @@ class BaseStructure(ABC): return numpy.nan, numpy.nan return rs[k], cmass[k] - @property - def keys(self): - """ - Particle array keys. - - Returns - ------- - key : list of str - """ - return self.particles.dtype.names - def __getitem__(self, key): + keys = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M'] if key not in self.keys: - raise RuntimeError("Invalid key `{}`!".format(key)) - return self.particles[key] + raise RuntimeError(f"Invalid key `{key}`!") + return self.particles[:, keys.index(key)] def __len__(self): - return self.particles.size + return self.particles.shape[0] class Clump(BaseStructure): diff --git a/csiborgtools/match/match.py b/csiborgtools/match/match.py index 126ac5f..0e07418 100644 --- a/csiborgtools/match/match.py +++ b/csiborgtools/match/match.py @@ -827,8 +827,8 @@ def dist_centmass(clump): Parameters ---------- - clump : structurered arrays - Clump structured array. Keyes must include `x`, `y`, `z` and `M`. + clump : 2-dimensional array of shape (n_particles, 7) + Particle array. The first four columns must be `x`, `y`, `z` and `M`. Returns ------- @@ -838,16 +838,8 @@ def dist_centmass(clump): Center of mass coordinates. """ # CM along each dimension - cmx, cmy, cmz = [numpy.average(clump[p], weights=clump["M"]) - for p in ("x", "y", "z")] - # Particle distance from the CM - dist = numpy.sqrt( - numpy.square(clump["x"] - cmx) - + numpy.square(clump["y"] - cmy) - + numpy.square(clump["z"] - cmz) - ) - - return dist, numpy.asarray([cmx, cmy, cmz]) + cm = numpy.average(clump[:, :3], weights=clump[:, 3], axis=0) + return numpy.linalg.norm(clump[:, :3] - cm, axis=1), cm def dist_percentile(dist, qs, distmax=0.075): diff --git a/csiborgtools/read/paths.py b/csiborgtools/read/paths.py index 121a5e7..dec330d 100644 --- a/csiborgtools/read/paths.py +++ b/csiborgtools/read/paths.py @@ -132,40 +132,19 @@ class CSiBORGPaths: nsim : int IC realisation index. kind : str - Type of match. Can be either `fit` or `particles`. + Type of match. Must be one of `["particles", "fit", "halomap"]`. Returns ------- path : str """ - assert kind in ["fit", "particles"] + assert kind in ["particles", "fit", "halomap"] + ftype = "npy" if kind == "fit" else "h5" fdir = join(self.postdir, "initmatch") if not isdir(fdir): mkdir(fdir) warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1) - return join(fdir, f"{kind}_{str(nsim).zfill(5)}.npy") - - def split_path(self, nsnap, nsim): - """ - Path to the `split` files from `pre_splithalos`. - - Parameters - ---------- - nsnap : int - Snapshot index. - nsim : int - IC realisation index. - - Returns - ------- - path : str - """ - fdir = join(self.postdir, "split") - if not isdir(fdir): - mkdir(fdir) - warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1) - return join( - fdir, f"clumps_{str(nsim).zfill(5)}_{str(nsnap).zfill(5)}.npz") + return join(fdir, f"{kind}_{str(nsim).zfill(5)}.{ftype}") def get_ics(self, tonew): """ @@ -326,30 +305,37 @@ class CSiBORGPaths: fname = f"radpos_{str(nsim).zfill(5)}_{str(nsnap).zfill(5)}.npz" return join(fdir, fname) - def particle_h5py_path(self, nsim, with_vel): + def particle_h5py_path(self, nsim, kind=None, dtype="float32"): """ - Path to the files containing all particles in a `.hdf5` file. Used for - the SPH calculation. + Path to the file containing all particles in a `.h5` file. Parameters ---------- nsim : int IC realisation index. - with_vel : bool - Whether velocities are included. + kind : str + Type of output. Must be one of `[None, 'pos', 'clumpmap']`. + dtype : str + Data type. Must be one of `['float32', 'float64']`. Returns ------- path : str """ - fdir = join(self.postdir, "environment") + assert kind in [None, "pos", "clumpmap"] + assert dtype in ["float32", "float64"] + fdir = join(self.postdir, "particles") if not isdir(fdir): makedirs(fdir) warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1) - if with_vel: + if kind is None: fname = f"parts_{str(nsim).zfill(5)}.h5" else: - fname = f"parts_pos_{str(nsim).zfill(5)}.h5" + fname = f"parts_{kind}_{str(nsim).zfill(5)}.h5" + + if dtype == "float64": + fname = fname.replace(".h5", "_f64.h5") + return join(fdir, fname) def density_field_path(self, mas, nsim): diff --git a/scripts/fit_halos.py b/scripts/fit_halos.py index 9077510..75d619d 100644 --- a/scripts/fit_halos.py +++ b/scripts/fit_halos.py @@ -20,6 +20,7 @@ from argparse import ArgumentParser from datetime import datetime from os.path import join +import h5py import numpy from mpi4py import MPI from tqdm import tqdm @@ -94,19 +95,18 @@ def fit_clump(particles, clump_info, box): return out -def load_clump_particles(clumpid, particle_archive): +def load_clump_particles(clumpid, particles, clump_map): """ - Load a clump's particles from the particle archive. If it is not there, i.e - clump has no associated particles, return `None`. + Load a clump's particles. If it is not there, i.e clump has no associated + particles, return `None`. """ try: - part = particle_archive[str(clumpid)] + return particles[clump_map[clumpid], :] except KeyError: - part = None - return part + return None -def load_parent_particles(clumpid, particle_archive, clumps_cat): +def load_parent_particles(clumpid, particles, clump_map, clumps_cat): """ Load a parent halo's particles. """ @@ -115,14 +115,13 @@ def load_parent_particles(clumpid, particle_archive, clumps_cat): # and then concatenate them for further analysis. clumps = [] for ind in indxs: - parts = load_clump_particles(ind, particle_archive) + parts = load_clump_particles(ind, particles, clump_map) if parts is not None: - clumps.append([parts, None]) + clumps.append(parts) if len(clumps) == 0: return None - return csiborgtools.match.concatenate_parts(clumps, - include_velocities=True) + return numpy.concatenate(clumps) # We now start looping over all simulations @@ -133,10 +132,10 @@ for i, nsim in enumerate(paths.get_ics(tonew=False)): nsnap = max(paths.get_snapshots(nsim)) box = csiborgtools.read.BoxUnits(nsnap, nsim, paths) - # Archive of clumps, keywords are their clump IDs - particle_archive = numpy.load(paths.split_path(nsnap, nsim)) - clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, maxdist=None, - minmass=None, rawdata=True, + # Particle archive + particles = h5py.File(paths.particle_h5py_path(nsim), 'r')["particles"] + clump_map = h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), 'r') + clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, rawdata=True, load_fitted=False) # We check whether we fit halos or clumps, will be indexing over different # iterators. @@ -159,9 +158,10 @@ for i, nsim in enumerate(paths.get_ics(tonew=False)): continue if args.kind == "halos": - part = load_parent_particles(clumpid, particle_archive, clumps_cat) + part = load_parent_particles(clumpid, particles, clump_map, + clumps_cat) else: - part = load_clump_particles(clumpid, particle_archive) + part = load_clump_particles(clumpid, particles, clump_map) # We fit the particles if there are any. If not we assign the index, # otherwise it would be NaN converted to integers (-2147483648) and diff --git a/scripts/fit_profiles.py b/scripts/fit_profiles.py index b1aa68f..d31ddc7 100644 --- a/scripts/fit_profiles.py +++ b/scripts/fit_profiles.py @@ -20,6 +20,7 @@ from argparse import ArgumentParser from datetime import datetime from gc import collect +import h5py import numpy from mpi4py import MPI from tqdm import trange @@ -46,7 +47,6 @@ if nproc > 1: raise NotImplementedError("MPI is not implemented implemented yet.") paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) -partreader = csiborgtools.read.ParticleReader(paths) cols_collect = [("r", numpy.float32), ("M", numpy.float32)] if args.ics is None or args.ics == -1: nsims = paths.get_ics(tonew=False) @@ -54,37 +54,36 @@ else: nsims = args.ics -def load_clump_particles(clumpid, particle_archive): +def load_clump_particles(clumpid, particles, clump_map): """ - Load a clump's particles from the particle archive. If it is not there, i.e - clump has no associated particles, return `None`. + Load a clump's particles. If it is not there, i.e clump has no associated + particles, return `None`. """ try: - part = particle_archive[str(clumpid)] + return particles[clump_map[clumpid], :] except KeyError: - part = None - return part + return None -def load_parent_particles(clumpid, particle_archive, clumps_cat): +def load_parent_particles(clumpid, particles, clump_map, clumps_cat): """ Load a parent halo's particles. """ indxs = clumps_cat["index"][clumps_cat["parent"] == clumpid] - # We first load the particles of each clump belonging to this - # parent and then concatenate them for further analysis. + # We first load the particles of each clump belonging to this parent + # and then concatenate them for further analysis. clumps = [] for ind in indxs: - parts = load_clump_particles(ind, particle_archive) + parts = load_clump_particles(ind, particles, clump_map) if parts is not None: clumps.append(parts) if len(clumps) == 0: return None - return csiborgtools.match.concatenate_parts(clumps) + return numpy.concatenate(clumps) -# We loop over simulations. Here later optionlaly add MPI. +# We loop over simulations. Here later optionally add MPI. for i, nsim in enumerate(nsims): if rank == 0: now = datetime.now() @@ -92,8 +91,8 @@ for i, nsim in enumerate(nsims): nsnap = max(paths.get_snapshots(nsim)) box = csiborgtools.read.BoxUnits(nsnap, nsim, paths) - # Archive of clumps, keywords are their clump IDs - particle_archive = numpy.load(paths.split_path(nsnap, nsim)) + particles = h5py.File(paths.particle_h5py_path(nsim), 'r')["particles"] + clump_map = h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), 'r') clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, maxdist=None, minmass=None, rawdata=True, load_fitted=False) @@ -109,8 +108,8 @@ for i, nsim in enumerate(nsims): continue clumpid = clumps_cat["index"][j] - - parts = load_parent_particles(clumpid, particle_archive, clumps_cat) + parts = load_parent_particles(clumpid, particles, clump_map, + clumps_cat) # If we have no particles, then do not save anything. if parts is None: continue diff --git a/scripts/pre_dumppart.py b/scripts/pre_dumppart.py index 9993fef..a93d314 100644 --- a/scripts/pre_dumppart.py +++ b/scripts/pre_dumppart.py @@ -12,16 +12,18 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ -Script to load in the simulation particles and dump them to a HDF5 file for the -SPH density field calculation. +Script to load in the simulation particles and dump them to a HDF5 file. +Creates a mapping to access directly particles of a single clump. """ from datetime import datetime -from gc import collect from distutils.util import strtobool +from gc import collect import h5py +import numpy from mpi4py import MPI +from tqdm import tqdm try: import csiborgtools @@ -41,17 +43,23 @@ nproc = comm.Get_size() # And next parse all the arguments and set up CSiBORG objects parser = ArgumentParser() parser.add_argument("--ics", type=int, nargs="+", default=None, - help="IC realisatiosn. If `-1` processes all simulations.") -parser.add_argument("--with_vel", type=lambda x: bool(strtobool(x)), - help="Whether to include velocities in the particle file.") + help="IC realisations. If `-1` processes all simulations.") +parser.add_argument("--pos_only", type=lambda x: bool(strtobool(x)), + help="Do we only dump positions?") +parser.add_argument("--dtype", type=str, choices=["float32", "float64"], + default="float32",) args = parser.parse_args() + +verbose = nproc == 1 paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) partreader = csiborgtools.read.ParticleReader(paths) -if args.with_vel: - pars_extract = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M'] -else: + +if args.pos_only: pars_extract = ['x', 'y', 'z', 'M'] -if args.ics is None or args.ics == -1: +else: + pars_extract = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M'] + +if args.ics is None or args.ics[0] == -1: ics = paths.get_ics(tonew=False) else: ics = args.ics @@ -62,14 +70,49 @@ jobs = csiborgtools.fits.split_jobs(len(ics), nproc)[rank] for i in jobs: nsim = ics[i] nsnap = max(paths.get_snapshots(nsim)) - print(f"{datetime.now()}: Rank {rank} completing simulation {nsim}.", + print(f"{datetime.now()}: Rank {rank} loading particles {nsim}.", flush=True) - out = partreader.read_particle( - nsnap, nsim, pars_extract, return_structured=False, verbose=nproc == 1) + parts = partreader.read_particle(nsnap, nsim, pars_extract, + return_structured=False, verbose=verbose) + if args.dtype == "float64": + parts = parts.astype(numpy.float64) - with h5py.File(paths.particle_h5py_path(nsim), "w") as f: - dset = f.create_dataset("particles", data=out) + kind = "pos" if args.pos_only else None - del out + print(f"{datetime.now()}: Rank {rank} dumping particles from {nsim}.", + flush=True) + + with h5py.File(paths.particle_h5py_path(nsim, kind, args.dtype), "w") as f: + f.create_dataset("particles", data=parts) + del parts + collect() + print(f"{datetime.now()}: Rank {rank} finished dumping of {nsim}.", + flush=True) + # If we are dumping only particle positions, then we are done. + if args.pos_only: + continue + + print(f"{datetime.now()}: Rank {rank} mapping particles from {nsim}.", + flush=True) + # If not, then load the clump IDs and prepare the memory mapping. We find + # which array positions correspond to which clump IDs and save it. With + # this we can then lazily load into memory the particles for each clump. + part_cids = partreader.read_clumpid(nsnap, nsim, verbose=verbose) + cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, load_fitted=False, + rawdata=True) + clumpinds = cat["index"] + # Some of the clumps have no particles, so we do not loop over them + clumpinds = clumpinds[numpy.isin(clumpinds, part_cids)] + + out = {} + for i, cid in enumerate(tqdm(clumpinds) if verbose else clumpinds): + out.update({str(cid): numpy.where(part_cids == cid)[0]}) + + # We save the mapping to a HDF5 file + with h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), "w") as f: + for cid, indxs in out.items(): + f.create_dataset(cid, data=indxs) + + del part_cids, cat, clumpinds, out collect() diff --git a/scripts/pre_initmatch.py b/scripts/pre_initmatch.py index d5ad171..cbc3e74 100644 --- a/scripts/pre_initmatch.py +++ b/scripts/pre_initmatch.py @@ -13,25 +13,20 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ -Script to calculate the particle centre of mass and Lagrangian patch size in -the initial snapshot. Optinally dumps the particle files, however this requires -a lot of memory. - -TODO: - - stop saving the particle IDs. Unnecessary. - - Switch to h5py files. This way can save the positions in the particle - array only. +Script to calculate the particle centre of mass, Lagrangian patch size in the +initial snapshot and the particle mapping. """ from argparse import ArgumentParser +from os.path import join from datetime import datetime -from distutils.util import strtobool from gc import collect +import joblib from os import remove -from os.path import isfile, join +import h5py import numpy from mpi4py import MPI -from tqdm import tqdm +from tqdm import trange try: import csiborgtools @@ -50,48 +45,80 @@ verbose = nproc == 1 # Argument parser parser = ArgumentParser() -parser.add_argument("--dump", type=lambda x: bool(strtobool(x))) +parser.add_argument("--ics", type=int, nargs="+", default=None, + help="IC realisations. If `-1` processes all simulations.") args = parser.parse_args() paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) partreader = csiborgtools.read.ParticleReader(paths) -ftemp = join(paths.temp_dumpdir, "initmatch_{}_{}_{}.npy") +ftemp = lambda kind, nsim, rank: join(paths.temp_dumpdir, f"{kind}_{nsim}_{rank}.p") # noqa -# We loop over all particles and then use MPI when matching halos to the -# initial snapshot and dumping them. -for i, nsim in enumerate(paths.get_ics(tonew=True)): +if args.ics is None or args.ics[0] == -1: + ics = paths.get_ics(tonew=True) +else: + ics = args.ics + +# We loop over simulations. Each simulation is then procesed with MPI, rank 0 +# loads the data and broadcasts it to other ranks. +for nsim in ics: + nsnap = max(paths.get_snapshots(nsim)) if rank == 0: print(f"{datetime.now()}: reading simulation {nsim}.", flush=True) - nsnap = max(paths.get_snapshots(nsim)) - # We first load particles in the initial and final snapshots and sort them - # by their particle IDs so that we can match them by array position. - # `clump_ids` are the clump IDs of particles. - part0 = partreader.read_particle(1, nsim, ["x", "y", "z", "M", "ID"], - verbose=verbose) - part0 = part0[numpy.argsort(part0["ID"])] + # We first load particles in the initial and final snapshots and sort + # them by their particle IDs so that we can match them by array + # position. `clump_ids` are the clump IDs of particles. + part0 = partreader.read_particle(1, nsim, ["x", "y", "z", "M", "ID"], + verbose=True, + return_structured=False) + part0 = part0[numpy.argsort(part0[:, -1])] + part0 = part0[:, :-1] # Now we no longer need the particle IDs - pid = partreader.read_particle(nsnap, nsim, ["ID"], verbose=verbose)["ID"] - clump_ids = partreader.read_clumpid(nsnap, nsim, verbose=verbose) - clump_ids = clump_ids[numpy.argsort(pid)] - # Release the particle IDs, we will not need them anymore now that both - # particle arrays are matched in ordering. - del pid - collect() + pid = partreader.read_particle(nsnap, nsim, ["ID"], verbose=True, + return_structured=False).reshape(-1, ) + clump_ids = partreader.read_clumpid(nsnap, nsim, verbose=True) + clump_ids = clump_ids[numpy.argsort(pid)] + # Release the particle IDs, we will not need them anymore now that both + # particle arrays are matched in ordering. + del pid + collect() - # Particles whose clump ID is 0 are unassigned to a clump, so we can get - # rid of them to speed up subsequent operations. Again we release the mask. - mask = clump_ids > 0 - clump_ids = clump_ids[mask] - part0 = part0[mask] - del mask - collect() + # Particles whose clump ID is 0 are unassigned to a clump, so we can + # get rid of them to speed up subsequent operations. We will not need + # these. Again we release the mask. + mask = clump_ids > 0 + clump_ids = clump_ids[mask] + part0 = part0[mask, :] + del mask + collect() + + print(f"{datetime.now()}: dumping particles for {nsim}.", flush=True) + with h5py.File(paths.initmatch_path(nsim, "particles"), "w") as f: + f.create_dataset("particles", data=part0) + + print(f"{datetime.now()}: broadcasting simulation {nsim}.", flush=True) + # Stop all ranks and figure out array shapes from the 0th rank + comm.Barrier() + if rank == 0: + shape = numpy.array([*part0.shape], dtype=numpy.int32) + else: + shape = numpy.empty(2, dtype=numpy.int32) + comm.Bcast(shape, root=0) + + # Now broadcast the particle arrays to all ranks + if rank > 0: + part0 = numpy.empty(shape, dtype=numpy.float32) + clump_ids = numpy.empty(shape[0], dtype=numpy.int32) + + comm.Bcast(part0, root=0) + comm.Bcast(clump_ids, root=0) + if rank == 0: + print(f"{datetime.now()}: simulation {nsim} broadcasted.", flush=True) # Calculate the centre of mass of each parent halo, the Lagrangian patch # size and optionally the initial snapshot particles belonging to this # parent halo. Dumping the particles will take majority of time. if rank == 0: - print(f"{datetime.now()}: calculating {i}th simulation {nsim}.", - flush=True) + print(f"{datetime.now()}: calculating simulation {nsim}.", flush=True) # We load up the clump catalogue which contains information about the # ultimate parent halos of each clump. We will loop only over the clump # IDs of ultimate parent halos and add their substructure particles and at @@ -99,13 +126,22 @@ for i, nsim in enumerate(paths.get_ics(tonew=True)): cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, load_fitted=False, rawdata=True) parent_ids = cat["index"][cat.ismain] + parent_ids = parent_ids + hid2arrpos = {indx: j for j, indx in enumerate(parent_ids)} + # And we pre-allocate the output array for this simulation. + dtype = {"names": ["index", "x", "y", "z", "lagpatch"], + "formats": [numpy.int32] + [numpy.float32] * 4} + # We MPI loop over the individual halos jobs = csiborgtools.fits.split_jobs(parent_ids.size, nproc)[rank] - for i in tqdm(jobs) if verbose else jobs: - clid = parent_ids[i] + _out_fits = numpy.full(len(jobs), numpy.nan, dtype=dtype) + _out_map = {} + for i in trange(len(jobs)) if verbose else range(len(jobs)): + clid = parent_ids[jobs[i]] + _out_fits["index"][i] = clid mmain_indxs = cat["index"][cat["parent"] == clid] mmain_mask = numpy.isin(clump_ids, mmain_indxs, assume_unique=True) - mmain_particles = part0[mmain_mask] + mmain_particles = part0[mmain_mask, :] # If the number of particles is too small, we skip this halo. if mmain_particles.size < 100: continue @@ -113,65 +149,51 @@ for i, nsim in enumerate(paths.get_ics(tonew=True)): raddist, cmpos = csiborgtools.match.dist_centmass(mmain_particles) patchsize = csiborgtools.match.dist_percentile(raddist, [99], distmax=0.075) - with open(ftemp.format(nsim, clid, "fit"), "wb") as f: - numpy.savez(f, cmpos=cmpos, patchsize=patchsize) + # Write the temporary results + _out_fits["x"][i], _out_fits["y"][i], _out_fits["z"][i] = cmpos + _out_fits["lagpatch"][i] = patchsize + _out_map.update({str(clid): numpy.where(mmain_mask)[0]}) - if args.dump: - with open(ftemp.format(nsim, clid, "particles"), "wb") as f: - numpy.save(f, mmain_particles) + # Dump the results of this rank to a temporary file. + joblib.dump(_out_fits, ftemp("fits", nsim, rank)) + joblib.dump(_out_map, ftemp("map", nsim, rank)) - # We force clean up the memory before continuing. - del part0, clump_ids + del part0, clump_ids, collect() - # We now wait for all processes and then use the 0th process to collect - # the results. We first collect just the Lagrangian patch size information. + # Now we wait for all ranks, then collect the results and save it. comm.Barrier() if rank == 0: - print(f"{datetime.now()}: collecting fits...", flush=True) - dtype = {"names": ["index", "x", "y", "z", "lagpatch"], - "formats": [numpy.int32] + [numpy.float32] * 4} - out = numpy.full(parent_ids.size, numpy.nan, dtype=dtype) - for i, clid in enumerate(parent_ids): - fpath = ftemp.format(nsim, clid, "fit") - # There is no file if the halo was skipped due to too few - # particles. - if not isfile(fpath): - continue - with open(fpath, "rb") as f: - inp = numpy.load(f) - out["index"][i] = clid - out["x"][i] = inp["cmpos"][0] - out["y"][i] = inp["cmpos"][1] - out["z"][i] = inp["cmpos"][2] - out["lagpatch"][i] = inp["patchsize"] - remove(fpath) + print(f"{datetime.now()}: collecting results for {nsim}.", flush=True) + out_fits = numpy.full(parent_ids.size, numpy.nan, dtype=dtype) + out_map = {} + for i in range(nproc): + # Merge the map dictionaries + out_map = out_map | joblib.load(ftemp("map", nsim, i)) + # Now merge the structured arrays + _out_fits = joblib.load(ftemp("fits", nsim, i)) + for j in range(_out_fits.size): + k = hid2arrpos[_out_fits["index"][j]] + for par in dtype["names"]: + out_fits[par][k] = _out_fits[par][j] - fout = paths.initmatch_path(nsim, "fit") - print(f"{datetime.now()}: dumping fits to .. `{fout}`.", flush=True) - with open(fout, "wb") as f: - numpy.save(f, out) + remove(ftemp("fits", nsim, i)) + remove(ftemp("map", nsim, i)) - # We now optionally collect the individual clumps and store them in an - # archive, which has the benefit of being a single file that can be - # easily read in. - if args.dump: - print(f"{datetime.now()}: collecting particles...", flush=True) - out = {} - for clid in parent_ids: - fpath = ftemp.format(nsim, clid, "particles") - if not isfile(fpath): - continue - with open(fpath, "rb") as f: - out.update({str(clid): numpy.load(f)}) - remove(fpath) + # Now save it + fout_fit = paths.initmatch_path(nsim, "fit") + print(f"{datetime.now()}: dumping fits to .. `{fout_fit}`.", + flush=True) + with open(fout_fit, "wb") as f: + numpy.save(f, out_fits) - fout = paths.initmatch_path(nsim, "particles") - print(f"{datetime.now()}: dumping particles to .. `{fout}`.", - flush=True) - with open(fout, "wb") as f: - numpy.savez(f, **out) + fout_map = paths.initmatch_path(nsim, "halomap") + print(f"{datetime.now()}: dumping mapping to .. `{fout_map}`.", + flush=True) + with h5py.File(fout_map, "w") as f: + for hid, indxs in out_map.items(): + f.create_dataset(hid, data=indxs) - # Again we force clean up the memory before continuing. - del out - collect() + # We force clean up the memory before continuing. + del out_map, out_fits + collect() diff --git a/scripts/pre_splithalos.py b/scripts/pre_splithalos.py deleted file mode 100644 index 3691a1b..0000000 --- a/scripts/pre_splithalos.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright (C) 2022 Richard Stiskalek -# This program is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation; either version 3 of the License, or (at your -# option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -# Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -""" -Script to split particles to individual files according to their clump. This is -useful for calculating the halo properties directly from the particles. -""" -from datetime import datetime -from gc import collect -from glob import glob -from os import remove -from os.path import join - -import numpy -from mpi4py import MPI -from taskmaster import master_process, worker_process -from tqdm import tqdm - -try: - import csiborgtools -except ModuleNotFoundError: - import sys - - sys.path.append("../") - import csiborgtools - -# Get MPI things -comm = MPI.COMM_WORLD -rank = comm.Get_rank() -nproc = comm.Get_size() - -paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) -verbose = nproc == 1 -partcols = ["x", "y", "z", "vx", "vy", "vz", "M"] - - -def do_split(nsim): - nsnap = max(paths.get_snapshots(nsim)) - reader = csiborgtools.read.ParticleReader(paths) - ftemp_base = join( - paths.temp_dumpdir, - "split_{}_{}".format(str(nsim).zfill(5), str(nsnap).zfill(5)), - ) - ftemp = ftemp_base + "_{}.npz" - - # Load the particles and their clump IDs - particles = reader.read_particle(nsnap, nsim, partcols, verbose=verbose) - particle_clumps = reader.read_clumpid(nsnap, nsim, verbose=verbose) - # Drop all particles whose clump index is 0 (not assigned to any clump) - assigned_mask = particle_clumps != 0 - particle_clumps = particle_clumps[assigned_mask] - particles = particles[assigned_mask] - del assigned_mask - collect() - - # Load the clump indices - clumpinds = reader.read_clumps(nsnap, nsim, cols="index")["index"] - # Some of the clumps have no particles, so we do not loop over them - clumpinds = clumpinds[numpy.isin(clumpinds, particle_clumps)] - - # Loop over the clump indices and save the particles to a temporary file - # every 10000 clumps. We will later read this back and combine into a - # single file. - out = {} - for i, clind in enumerate(tqdm(clumpinds) if verbose else clumpinds): - key = str(clind) - out.update({str(clind): particles[particle_clumps == clind]}) - - # REMOVE bump this back up - if i % 10000 == 0 or i == clumpinds.size - 1: - numpy.savez(ftemp.format(i), **out) - out = {} - - # Clear up memory because we will be loading everything back - del particles, particle_clumps, clumpinds - collect() - - # Now load back in every temporary file, combine them into a single - # dictionary and save as a single .npz file. - out = {} - for file in glob(ftemp_base + "*"): - inp = numpy.load(file) - for key in inp.files: - out.update({key: inp[key]}) - remove(file) - - numpy.savez(paths.split_path(nsnap, nsim), **out) - - -############################################################################### -# MPI task delegation # -############################################################################### - - -if nproc > 1: - if rank == 0: - tasks = list(paths.get_ics(tonew=False)) - master_process(tasks, comm, verbose=True) - else: - worker_process(do_split, comm, verbose=False) -else: - tasks = paths.get_ics(tonew=False) - for task in tasks: - print("{}: completing task `{}`.".format(datetime.now(), task)) - do_split(task) - -comm.Barrier()