Switch to h5py format (#52)

* Edit the particle paths

* Remove script

* Add h5py to dumping

* Minor adjustments

* add h5py support

* remove split path

* h5py support

* Type

* Edit initmatch paths

* Shorten func

* dist_centmass to work with 2D arrays

* Forgot to return the centre of mass

* Fixed code

* Fix halo bug

* Start MPI broadcasting

* Mini bug

* Remove commenting

* Remove test statement

* Fix index

* Printing from rank 0 only

* Move where clump index stored

* Add dtype options

* Add dtype options
This commit is contained in:
Richard Stiskalek 2023-05-02 13:57:13 +01:00 committed by GitHub
parent 553eec8228
commit 1a9e6177d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 236 additions and 323 deletions

View file

@ -40,8 +40,7 @@ class BaseStructure(ABC):
@particles.setter @particles.setter
def particles(self, particles): def particles(self, particles):
pars = ["x", "y", "z", "M"] assert particles.ndim == 2 and particles.shape[1] == 7
assert all(p in particles.dtype.names for p in pars)
self._particles = particles self._particles = particles
@property @property
@ -256,24 +255,14 @@ class BaseStructure(ABC):
return numpy.nan, numpy.nan return numpy.nan, numpy.nan
return rs[k], cmass[k] return rs[k], cmass[k]
@property
def keys(self):
"""
Particle array keys.
Returns
-------
key : list of str
"""
return self.particles.dtype.names
def __getitem__(self, key): def __getitem__(self, key):
keys = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M']
if key not in self.keys: if key not in self.keys:
raise RuntimeError("Invalid key `{}`!".format(key)) raise RuntimeError(f"Invalid key `{key}`!")
return self.particles[key] return self.particles[:, keys.index(key)]
def __len__(self): def __len__(self):
return self.particles.size return self.particles.shape[0]
class Clump(BaseStructure): class Clump(BaseStructure):

View file

@ -827,8 +827,8 @@ def dist_centmass(clump):
Parameters Parameters
---------- ----------
clump : structurered arrays clump : 2-dimensional array of shape (n_particles, 7)
Clump structured array. Keyes must include `x`, `y`, `z` and `M`. Particle array. The first four columns must be `x`, `y`, `z` and `M`.
Returns Returns
------- -------
@ -838,16 +838,8 @@ def dist_centmass(clump):
Center of mass coordinates. Center of mass coordinates.
""" """
# CM along each dimension # CM along each dimension
cmx, cmy, cmz = [numpy.average(clump[p], weights=clump["M"]) cm = numpy.average(clump[:, :3], weights=clump[:, 3], axis=0)
for p in ("x", "y", "z")] return numpy.linalg.norm(clump[:, :3] - cm, axis=1), cm
# Particle distance from the CM
dist = numpy.sqrt(
numpy.square(clump["x"] - cmx)
+ numpy.square(clump["y"] - cmy)
+ numpy.square(clump["z"] - cmz)
)
return dist, numpy.asarray([cmx, cmy, cmz])
def dist_percentile(dist, qs, distmax=0.075): def dist_percentile(dist, qs, distmax=0.075):

View file

@ -132,40 +132,19 @@ class CSiBORGPaths:
nsim : int nsim : int
IC realisation index. IC realisation index.
kind : str kind : str
Type of match. Can be either `fit` or `particles`. Type of match. Must be one of `["particles", "fit", "halomap"]`.
Returns Returns
------- -------
path : str path : str
""" """
assert kind in ["fit", "particles"] assert kind in ["particles", "fit", "halomap"]
ftype = "npy" if kind == "fit" else "h5"
fdir = join(self.postdir, "initmatch") fdir = join(self.postdir, "initmatch")
if not isdir(fdir): if not isdir(fdir):
mkdir(fdir) mkdir(fdir)
warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1) warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1)
return join(fdir, f"{kind}_{str(nsim).zfill(5)}.npy") return join(fdir, f"{kind}_{str(nsim).zfill(5)}.{ftype}")
def split_path(self, nsnap, nsim):
"""
Path to the `split` files from `pre_splithalos`.
Parameters
----------
nsnap : int
Snapshot index.
nsim : int
IC realisation index.
Returns
-------
path : str
"""
fdir = join(self.postdir, "split")
if not isdir(fdir):
mkdir(fdir)
warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1)
return join(
fdir, f"clumps_{str(nsim).zfill(5)}_{str(nsnap).zfill(5)}.npz")
def get_ics(self, tonew): def get_ics(self, tonew):
""" """
@ -326,30 +305,37 @@ class CSiBORGPaths:
fname = f"radpos_{str(nsim).zfill(5)}_{str(nsnap).zfill(5)}.npz" fname = f"radpos_{str(nsim).zfill(5)}_{str(nsnap).zfill(5)}.npz"
return join(fdir, fname) return join(fdir, fname)
def particle_h5py_path(self, nsim, with_vel): def particle_h5py_path(self, nsim, kind=None, dtype="float32"):
""" """
Path to the files containing all particles in a `.hdf5` file. Used for Path to the file containing all particles in a `.h5` file.
the SPH calculation.
Parameters Parameters
---------- ----------
nsim : int nsim : int
IC realisation index. IC realisation index.
with_vel : bool kind : str
Whether velocities are included. Type of output. Must be one of `[None, 'pos', 'clumpmap']`.
dtype : str
Data type. Must be one of `['float32', 'float64']`.
Returns Returns
------- -------
path : str path : str
""" """
fdir = join(self.postdir, "environment") assert kind in [None, "pos", "clumpmap"]
assert dtype in ["float32", "float64"]
fdir = join(self.postdir, "particles")
if not isdir(fdir): if not isdir(fdir):
makedirs(fdir) makedirs(fdir)
warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1) warn(f"Created directory `{fdir}`.", UserWarning, stacklevel=1)
if with_vel: if kind is None:
fname = f"parts_{str(nsim).zfill(5)}.h5" fname = f"parts_{str(nsim).zfill(5)}.h5"
else: else:
fname = f"parts_pos_{str(nsim).zfill(5)}.h5" fname = f"parts_{kind}_{str(nsim).zfill(5)}.h5"
if dtype == "float64":
fname = fname.replace(".h5", "_f64.h5")
return join(fdir, fname) return join(fdir, fname)
def density_field_path(self, mas, nsim): def density_field_path(self, mas, nsim):

View file

@ -20,6 +20,7 @@ from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
from os.path import join from os.path import join
import h5py
import numpy import numpy
from mpi4py import MPI from mpi4py import MPI
from tqdm import tqdm from tqdm import tqdm
@ -94,19 +95,18 @@ def fit_clump(particles, clump_info, box):
return out return out
def load_clump_particles(clumpid, particle_archive): def load_clump_particles(clumpid, particles, clump_map):
""" """
Load a clump's particles from the particle archive. If it is not there, i.e Load a clump's particles. If it is not there, i.e clump has no associated
clump has no associated particles, return `None`. particles, return `None`.
""" """
try: try:
part = particle_archive[str(clumpid)] return particles[clump_map[clumpid], :]
except KeyError: except KeyError:
part = None return None
return part
def load_parent_particles(clumpid, particle_archive, clumps_cat): def load_parent_particles(clumpid, particles, clump_map, clumps_cat):
""" """
Load a parent halo's particles. Load a parent halo's particles.
""" """
@ -115,14 +115,13 @@ def load_parent_particles(clumpid, particle_archive, clumps_cat):
# and then concatenate them for further analysis. # and then concatenate them for further analysis.
clumps = [] clumps = []
for ind in indxs: for ind in indxs:
parts = load_clump_particles(ind, particle_archive) parts = load_clump_particles(ind, particles, clump_map)
if parts is not None: if parts is not None:
clumps.append([parts, None]) clumps.append(parts)
if len(clumps) == 0: if len(clumps) == 0:
return None return None
return csiborgtools.match.concatenate_parts(clumps, return numpy.concatenate(clumps)
include_velocities=True)
# We now start looping over all simulations # We now start looping over all simulations
@ -133,10 +132,10 @@ for i, nsim in enumerate(paths.get_ics(tonew=False)):
nsnap = max(paths.get_snapshots(nsim)) nsnap = max(paths.get_snapshots(nsim))
box = csiborgtools.read.BoxUnits(nsnap, nsim, paths) box = csiborgtools.read.BoxUnits(nsnap, nsim, paths)
# Archive of clumps, keywords are their clump IDs # Particle archive
particle_archive = numpy.load(paths.split_path(nsnap, nsim)) particles = h5py.File(paths.particle_h5py_path(nsim), 'r')["particles"]
clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, maxdist=None, clump_map = h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), 'r')
minmass=None, rawdata=True, clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, rawdata=True,
load_fitted=False) load_fitted=False)
# We check whether we fit halos or clumps, will be indexing over different # We check whether we fit halos or clumps, will be indexing over different
# iterators. # iterators.
@ -159,9 +158,10 @@ for i, nsim in enumerate(paths.get_ics(tonew=False)):
continue continue
if args.kind == "halos": if args.kind == "halos":
part = load_parent_particles(clumpid, particle_archive, clumps_cat) part = load_parent_particles(clumpid, particles, clump_map,
clumps_cat)
else: else:
part = load_clump_particles(clumpid, particle_archive) part = load_clump_particles(clumpid, particles, clump_map)
# We fit the particles if there are any. If not we assign the index, # We fit the particles if there are any. If not we assign the index,
# otherwise it would be NaN converted to integers (-2147483648) and # otherwise it would be NaN converted to integers (-2147483648) and

View file

@ -20,6 +20,7 @@ from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
from gc import collect from gc import collect
import h5py
import numpy import numpy
from mpi4py import MPI from mpi4py import MPI
from tqdm import trange from tqdm import trange
@ -46,7 +47,6 @@ if nproc > 1:
raise NotImplementedError("MPI is not implemented implemented yet.") raise NotImplementedError("MPI is not implemented implemented yet.")
paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring)
partreader = csiborgtools.read.ParticleReader(paths)
cols_collect = [("r", numpy.float32), ("M", numpy.float32)] cols_collect = [("r", numpy.float32), ("M", numpy.float32)]
if args.ics is None or args.ics == -1: if args.ics is None or args.ics == -1:
nsims = paths.get_ics(tonew=False) nsims = paths.get_ics(tonew=False)
@ -54,37 +54,36 @@ else:
nsims = args.ics nsims = args.ics
def load_clump_particles(clumpid, particle_archive): def load_clump_particles(clumpid, particles, clump_map):
""" """
Load a clump's particles from the particle archive. If it is not there, i.e Load a clump's particles. If it is not there, i.e clump has no associated
clump has no associated particles, return `None`. particles, return `None`.
""" """
try: try:
part = particle_archive[str(clumpid)] return particles[clump_map[clumpid], :]
except KeyError: except KeyError:
part = None return None
return part
def load_parent_particles(clumpid, particle_archive, clumps_cat): def load_parent_particles(clumpid, particles, clump_map, clumps_cat):
""" """
Load a parent halo's particles. Load a parent halo's particles.
""" """
indxs = clumps_cat["index"][clumps_cat["parent"] == clumpid] indxs = clumps_cat["index"][clumps_cat["parent"] == clumpid]
# We first load the particles of each clump belonging to this # We first load the particles of each clump belonging to this parent
# parent and then concatenate them for further analysis. # and then concatenate them for further analysis.
clumps = [] clumps = []
for ind in indxs: for ind in indxs:
parts = load_clump_particles(ind, particle_archive) parts = load_clump_particles(ind, particles, clump_map)
if parts is not None: if parts is not None:
clumps.append(parts) clumps.append(parts)
if len(clumps) == 0: if len(clumps) == 0:
return None return None
return csiborgtools.match.concatenate_parts(clumps) return numpy.concatenate(clumps)
# We loop over simulations. Here later optionlaly add MPI. # We loop over simulations. Here later optionally add MPI.
for i, nsim in enumerate(nsims): for i, nsim in enumerate(nsims):
if rank == 0: if rank == 0:
now = datetime.now() now = datetime.now()
@ -92,8 +91,8 @@ for i, nsim in enumerate(nsims):
nsnap = max(paths.get_snapshots(nsim)) nsnap = max(paths.get_snapshots(nsim))
box = csiborgtools.read.BoxUnits(nsnap, nsim, paths) box = csiborgtools.read.BoxUnits(nsnap, nsim, paths)
# Archive of clumps, keywords are their clump IDs particles = h5py.File(paths.particle_h5py_path(nsim), 'r')["particles"]
particle_archive = numpy.load(paths.split_path(nsnap, nsim)) clump_map = h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), 'r')
clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, maxdist=None, clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, maxdist=None,
minmass=None, rawdata=True, minmass=None, rawdata=True,
load_fitted=False) load_fitted=False)
@ -109,8 +108,8 @@ for i, nsim in enumerate(nsims):
continue continue
clumpid = clumps_cat["index"][j] clumpid = clumps_cat["index"][j]
parts = load_parent_particles(clumpid, particles, clump_map,
parts = load_parent_particles(clumpid, particle_archive, clumps_cat) clumps_cat)
# If we have no particles, then do not save anything. # If we have no particles, then do not save anything.
if parts is None: if parts is None:
continue continue

View file

@ -12,16 +12,18 @@
# with this program; if not, write to the Free Software Foundation, Inc., # with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
""" """
Script to load in the simulation particles and dump them to a HDF5 file for the Script to load in the simulation particles and dump them to a HDF5 file.
SPH density field calculation. Creates a mapping to access directly particles of a single clump.
""" """
from datetime import datetime from datetime import datetime
from gc import collect
from distutils.util import strtobool from distutils.util import strtobool
from gc import collect
import h5py import h5py
import numpy
from mpi4py import MPI from mpi4py import MPI
from tqdm import tqdm
try: try:
import csiborgtools import csiborgtools
@ -41,17 +43,23 @@ nproc = comm.Get_size()
# And next parse all the arguments and set up CSiBORG objects # And next parse all the arguments and set up CSiBORG objects
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("--ics", type=int, nargs="+", default=None, parser.add_argument("--ics", type=int, nargs="+", default=None,
help="IC realisatiosn. If `-1` processes all simulations.") help="IC realisations. If `-1` processes all simulations.")
parser.add_argument("--with_vel", type=lambda x: bool(strtobool(x)), parser.add_argument("--pos_only", type=lambda x: bool(strtobool(x)),
help="Whether to include velocities in the particle file.") help="Do we only dump positions?")
parser.add_argument("--dtype", type=str, choices=["float32", "float64"],
default="float32",)
args = parser.parse_args() args = parser.parse_args()
verbose = nproc == 1
paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring)
partreader = csiborgtools.read.ParticleReader(paths) partreader = csiborgtools.read.ParticleReader(paths)
if args.with_vel:
pars_extract = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M'] if args.pos_only:
else:
pars_extract = ['x', 'y', 'z', 'M'] pars_extract = ['x', 'y', 'z', 'M']
if args.ics is None or args.ics == -1: else:
pars_extract = ['x', 'y', 'z', 'vx', 'vy', 'vz', 'M']
if args.ics is None or args.ics[0] == -1:
ics = paths.get_ics(tonew=False) ics = paths.get_ics(tonew=False)
else: else:
ics = args.ics ics = args.ics
@ -62,14 +70,49 @@ jobs = csiborgtools.fits.split_jobs(len(ics), nproc)[rank]
for i in jobs: for i in jobs:
nsim = ics[i] nsim = ics[i]
nsnap = max(paths.get_snapshots(nsim)) nsnap = max(paths.get_snapshots(nsim))
print(f"{datetime.now()}: Rank {rank} completing simulation {nsim}.", print(f"{datetime.now()}: Rank {rank} loading particles {nsim}.",
flush=True) flush=True)
out = partreader.read_particle( parts = partreader.read_particle(nsnap, nsim, pars_extract,
nsnap, nsim, pars_extract, return_structured=False, verbose=nproc == 1) return_structured=False, verbose=verbose)
if args.dtype == "float64":
parts = parts.astype(numpy.float64)
with h5py.File(paths.particle_h5py_path(nsim), "w") as f: kind = "pos" if args.pos_only else None
dset = f.create_dataset("particles", data=out)
del out print(f"{datetime.now()}: Rank {rank} dumping particles from {nsim}.",
flush=True)
with h5py.File(paths.particle_h5py_path(nsim, kind, args.dtype), "w") as f:
f.create_dataset("particles", data=parts)
del parts
collect()
print(f"{datetime.now()}: Rank {rank} finished dumping of {nsim}.",
flush=True)
# If we are dumping only particle positions, then we are done.
if args.pos_only:
continue
print(f"{datetime.now()}: Rank {rank} mapping particles from {nsim}.",
flush=True)
# If not, then load the clump IDs and prepare the memory mapping. We find
# which array positions correspond to which clump IDs and save it. With
# this we can then lazily load into memory the particles for each clump.
part_cids = partreader.read_clumpid(nsnap, nsim, verbose=verbose)
cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, load_fitted=False,
rawdata=True)
clumpinds = cat["index"]
# Some of the clumps have no particles, so we do not loop over them
clumpinds = clumpinds[numpy.isin(clumpinds, part_cids)]
out = {}
for i, cid in enumerate(tqdm(clumpinds) if verbose else clumpinds):
out.update({str(cid): numpy.where(part_cids == cid)[0]})
# We save the mapping to a HDF5 file
with h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), "w") as f:
for cid, indxs in out.items():
f.create_dataset(cid, data=indxs)
del part_cids, cat, clumpinds, out
collect() collect()

View file

@ -13,25 +13,20 @@
# with this program; if not, write to the Free Software Foundation, Inc., # with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
""" """
Script to calculate the particle centre of mass and Lagrangian patch size in Script to calculate the particle centre of mass, Lagrangian patch size in the
the initial snapshot. Optinally dumps the particle files, however this requires initial snapshot and the particle mapping.
a lot of memory.
TODO:
- stop saving the particle IDs. Unnecessary.
- Switch to h5py files. This way can save the positions in the particle
array only.
""" """
from argparse import ArgumentParser from argparse import ArgumentParser
from os.path import join
from datetime import datetime from datetime import datetime
from distutils.util import strtobool
from gc import collect from gc import collect
import joblib
from os import remove from os import remove
from os.path import isfile, join
import h5py
import numpy import numpy
from mpi4py import MPI from mpi4py import MPI
from tqdm import tqdm from tqdm import trange
try: try:
import csiborgtools import csiborgtools
@ -50,48 +45,80 @@ verbose = nproc == 1
# Argument parser # Argument parser
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("--dump", type=lambda x: bool(strtobool(x))) parser.add_argument("--ics", type=int, nargs="+", default=None,
help="IC realisations. If `-1` processes all simulations.")
args = parser.parse_args() args = parser.parse_args()
paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring) paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring)
partreader = csiborgtools.read.ParticleReader(paths) partreader = csiborgtools.read.ParticleReader(paths)
ftemp = join(paths.temp_dumpdir, "initmatch_{}_{}_{}.npy") ftemp = lambda kind, nsim, rank: join(paths.temp_dumpdir, f"{kind}_{nsim}_{rank}.p") # noqa
# We loop over all particles and then use MPI when matching halos to the if args.ics is None or args.ics[0] == -1:
# initial snapshot and dumping them. ics = paths.get_ics(tonew=True)
for i, nsim in enumerate(paths.get_ics(tonew=True)): else:
ics = args.ics
# We loop over simulations. Each simulation is then procesed with MPI, rank 0
# loads the data and broadcasts it to other ranks.
for nsim in ics:
nsnap = max(paths.get_snapshots(nsim))
if rank == 0: if rank == 0:
print(f"{datetime.now()}: reading simulation {nsim}.", flush=True) print(f"{datetime.now()}: reading simulation {nsim}.", flush=True)
nsnap = max(paths.get_snapshots(nsim))
# We first load particles in the initial and final snapshots and sort them # We first load particles in the initial and final snapshots and sort
# by their particle IDs so that we can match them by array position. # them by their particle IDs so that we can match them by array
# `clump_ids` are the clump IDs of particles. # position. `clump_ids` are the clump IDs of particles.
part0 = partreader.read_particle(1, nsim, ["x", "y", "z", "M", "ID"], part0 = partreader.read_particle(1, nsim, ["x", "y", "z", "M", "ID"],
verbose=verbose) verbose=True,
part0 = part0[numpy.argsort(part0["ID"])] return_structured=False)
part0 = part0[numpy.argsort(part0[:, -1])]
part0 = part0[:, :-1] # Now we no longer need the particle IDs
pid = partreader.read_particle(nsnap, nsim, ["ID"], verbose=verbose)["ID"] pid = partreader.read_particle(nsnap, nsim, ["ID"], verbose=True,
clump_ids = partreader.read_clumpid(nsnap, nsim, verbose=verbose) return_structured=False).reshape(-1, )
clump_ids = clump_ids[numpy.argsort(pid)] clump_ids = partreader.read_clumpid(nsnap, nsim, verbose=True)
# Release the particle IDs, we will not need them anymore now that both clump_ids = clump_ids[numpy.argsort(pid)]
# particle arrays are matched in ordering. # Release the particle IDs, we will not need them anymore now that both
del pid # particle arrays are matched in ordering.
collect() del pid
collect()
# Particles whose clump ID is 0 are unassigned to a clump, so we can get # Particles whose clump ID is 0 are unassigned to a clump, so we can
# rid of them to speed up subsequent operations. Again we release the mask. # get rid of them to speed up subsequent operations. We will not need
mask = clump_ids > 0 # these. Again we release the mask.
clump_ids = clump_ids[mask] mask = clump_ids > 0
part0 = part0[mask] clump_ids = clump_ids[mask]
del mask part0 = part0[mask, :]
collect() del mask
collect()
print(f"{datetime.now()}: dumping particles for {nsim}.", flush=True)
with h5py.File(paths.initmatch_path(nsim, "particles"), "w") as f:
f.create_dataset("particles", data=part0)
print(f"{datetime.now()}: broadcasting simulation {nsim}.", flush=True)
# Stop all ranks and figure out array shapes from the 0th rank
comm.Barrier()
if rank == 0:
shape = numpy.array([*part0.shape], dtype=numpy.int32)
else:
shape = numpy.empty(2, dtype=numpy.int32)
comm.Bcast(shape, root=0)
# Now broadcast the particle arrays to all ranks
if rank > 0:
part0 = numpy.empty(shape, dtype=numpy.float32)
clump_ids = numpy.empty(shape[0], dtype=numpy.int32)
comm.Bcast(part0, root=0)
comm.Bcast(clump_ids, root=0)
if rank == 0:
print(f"{datetime.now()}: simulation {nsim} broadcasted.", flush=True)
# Calculate the centre of mass of each parent halo, the Lagrangian patch # Calculate the centre of mass of each parent halo, the Lagrangian patch
# size and optionally the initial snapshot particles belonging to this # size and optionally the initial snapshot particles belonging to this
# parent halo. Dumping the particles will take majority of time. # parent halo. Dumping the particles will take majority of time.
if rank == 0: if rank == 0:
print(f"{datetime.now()}: calculating {i}th simulation {nsim}.", print(f"{datetime.now()}: calculating simulation {nsim}.", flush=True)
flush=True)
# We load up the clump catalogue which contains information about the # We load up the clump catalogue which contains information about the
# ultimate parent halos of each clump. We will loop only over the clump # ultimate parent halos of each clump. We will loop only over the clump
# IDs of ultimate parent halos and add their substructure particles and at # IDs of ultimate parent halos and add their substructure particles and at
@ -99,13 +126,22 @@ for i, nsim in enumerate(paths.get_ics(tonew=True)):
cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, load_fitted=False, cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, load_fitted=False,
rawdata=True) rawdata=True)
parent_ids = cat["index"][cat.ismain] parent_ids = cat["index"][cat.ismain]
parent_ids = parent_ids
hid2arrpos = {indx: j for j, indx in enumerate(parent_ids)}
# And we pre-allocate the output array for this simulation.
dtype = {"names": ["index", "x", "y", "z", "lagpatch"],
"formats": [numpy.int32] + [numpy.float32] * 4}
# We MPI loop over the individual halos
jobs = csiborgtools.fits.split_jobs(parent_ids.size, nproc)[rank] jobs = csiborgtools.fits.split_jobs(parent_ids.size, nproc)[rank]
for i in tqdm(jobs) if verbose else jobs: _out_fits = numpy.full(len(jobs), numpy.nan, dtype=dtype)
clid = parent_ids[i] _out_map = {}
for i in trange(len(jobs)) if verbose else range(len(jobs)):
clid = parent_ids[jobs[i]]
_out_fits["index"][i] = clid
mmain_indxs = cat["index"][cat["parent"] == clid] mmain_indxs = cat["index"][cat["parent"] == clid]
mmain_mask = numpy.isin(clump_ids, mmain_indxs, assume_unique=True) mmain_mask = numpy.isin(clump_ids, mmain_indxs, assume_unique=True)
mmain_particles = part0[mmain_mask] mmain_particles = part0[mmain_mask, :]
# If the number of particles is too small, we skip this halo. # If the number of particles is too small, we skip this halo.
if mmain_particles.size < 100: if mmain_particles.size < 100:
continue continue
@ -113,65 +149,51 @@ for i, nsim in enumerate(paths.get_ics(tonew=True)):
raddist, cmpos = csiborgtools.match.dist_centmass(mmain_particles) raddist, cmpos = csiborgtools.match.dist_centmass(mmain_particles)
patchsize = csiborgtools.match.dist_percentile(raddist, [99], patchsize = csiborgtools.match.dist_percentile(raddist, [99],
distmax=0.075) distmax=0.075)
with open(ftemp.format(nsim, clid, "fit"), "wb") as f: # Write the temporary results
numpy.savez(f, cmpos=cmpos, patchsize=patchsize) _out_fits["x"][i], _out_fits["y"][i], _out_fits["z"][i] = cmpos
_out_fits["lagpatch"][i] = patchsize
_out_map.update({str(clid): numpy.where(mmain_mask)[0]})
if args.dump: # Dump the results of this rank to a temporary file.
with open(ftemp.format(nsim, clid, "particles"), "wb") as f: joblib.dump(_out_fits, ftemp("fits", nsim, rank))
numpy.save(f, mmain_particles) joblib.dump(_out_map, ftemp("map", nsim, rank))
# We force clean up the memory before continuing. del part0, clump_ids,
del part0, clump_ids
collect() collect()
# We now wait for all processes and then use the 0th process to collect # Now we wait for all ranks, then collect the results and save it.
# the results. We first collect just the Lagrangian patch size information.
comm.Barrier() comm.Barrier()
if rank == 0: if rank == 0:
print(f"{datetime.now()}: collecting fits...", flush=True) print(f"{datetime.now()}: collecting results for {nsim}.", flush=True)
dtype = {"names": ["index", "x", "y", "z", "lagpatch"], out_fits = numpy.full(parent_ids.size, numpy.nan, dtype=dtype)
"formats": [numpy.int32] + [numpy.float32] * 4} out_map = {}
out = numpy.full(parent_ids.size, numpy.nan, dtype=dtype) for i in range(nproc):
for i, clid in enumerate(parent_ids): # Merge the map dictionaries
fpath = ftemp.format(nsim, clid, "fit") out_map = out_map | joblib.load(ftemp("map", nsim, i))
# There is no file if the halo was skipped due to too few # Now merge the structured arrays
# particles. _out_fits = joblib.load(ftemp("fits", nsim, i))
if not isfile(fpath): for j in range(_out_fits.size):
continue k = hid2arrpos[_out_fits["index"][j]]
with open(fpath, "rb") as f: for par in dtype["names"]:
inp = numpy.load(f) out_fits[par][k] = _out_fits[par][j]
out["index"][i] = clid
out["x"][i] = inp["cmpos"][0]
out["y"][i] = inp["cmpos"][1]
out["z"][i] = inp["cmpos"][2]
out["lagpatch"][i] = inp["patchsize"]
remove(fpath)
fout = paths.initmatch_path(nsim, "fit") remove(ftemp("fits", nsim, i))
print(f"{datetime.now()}: dumping fits to .. `{fout}`.", flush=True) remove(ftemp("map", nsim, i))
with open(fout, "wb") as f:
numpy.save(f, out)
# We now optionally collect the individual clumps and store them in an # Now save it
# archive, which has the benefit of being a single file that can be fout_fit = paths.initmatch_path(nsim, "fit")
# easily read in. print(f"{datetime.now()}: dumping fits to .. `{fout_fit}`.",
if args.dump: flush=True)
print(f"{datetime.now()}: collecting particles...", flush=True) with open(fout_fit, "wb") as f:
out = {} numpy.save(f, out_fits)
for clid in parent_ids:
fpath = ftemp.format(nsim, clid, "particles")
if not isfile(fpath):
continue
with open(fpath, "rb") as f:
out.update({str(clid): numpy.load(f)})
remove(fpath)
fout = paths.initmatch_path(nsim, "particles") fout_map = paths.initmatch_path(nsim, "halomap")
print(f"{datetime.now()}: dumping particles to .. `{fout}`.", print(f"{datetime.now()}: dumping mapping to .. `{fout_map}`.",
flush=True) flush=True)
with open(fout, "wb") as f: with h5py.File(fout_map, "w") as f:
numpy.savez(f, **out) for hid, indxs in out_map.items():
f.create_dataset(hid, data=indxs)
# Again we force clean up the memory before continuing. # We force clean up the memory before continuing.
del out del out_map, out_fits
collect() collect()

View file

@ -1,118 +0,0 @@
# Copyright (C) 2022 Richard Stiskalek
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Script to split particles to individual files according to their clump. This is
useful for calculating the halo properties directly from the particles.
"""
from datetime import datetime
from gc import collect
from glob import glob
from os import remove
from os.path import join
import numpy
from mpi4py import MPI
from taskmaster import master_process, worker_process
from tqdm import tqdm
try:
import csiborgtools
except ModuleNotFoundError:
import sys
sys.path.append("../")
import csiborgtools
# Get MPI things
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nproc = comm.Get_size()
paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring)
verbose = nproc == 1
partcols = ["x", "y", "z", "vx", "vy", "vz", "M"]
def do_split(nsim):
nsnap = max(paths.get_snapshots(nsim))
reader = csiborgtools.read.ParticleReader(paths)
ftemp_base = join(
paths.temp_dumpdir,
"split_{}_{}".format(str(nsim).zfill(5), str(nsnap).zfill(5)),
)
ftemp = ftemp_base + "_{}.npz"
# Load the particles and their clump IDs
particles = reader.read_particle(nsnap, nsim, partcols, verbose=verbose)
particle_clumps = reader.read_clumpid(nsnap, nsim, verbose=verbose)
# Drop all particles whose clump index is 0 (not assigned to any clump)
assigned_mask = particle_clumps != 0
particle_clumps = particle_clumps[assigned_mask]
particles = particles[assigned_mask]
del assigned_mask
collect()
# Load the clump indices
clumpinds = reader.read_clumps(nsnap, nsim, cols="index")["index"]
# Some of the clumps have no particles, so we do not loop over them
clumpinds = clumpinds[numpy.isin(clumpinds, particle_clumps)]
# Loop over the clump indices and save the particles to a temporary file
# every 10000 clumps. We will later read this back and combine into a
# single file.
out = {}
for i, clind in enumerate(tqdm(clumpinds) if verbose else clumpinds):
key = str(clind)
out.update({str(clind): particles[particle_clumps == clind]})
# REMOVE bump this back up
if i % 10000 == 0 or i == clumpinds.size - 1:
numpy.savez(ftemp.format(i), **out)
out = {}
# Clear up memory because we will be loading everything back
del particles, particle_clumps, clumpinds
collect()
# Now load back in every temporary file, combine them into a single
# dictionary and save as a single .npz file.
out = {}
for file in glob(ftemp_base + "*"):
inp = numpy.load(file)
for key in inp.files:
out.update({key: inp[key]})
remove(file)
numpy.savez(paths.split_path(nsnap, nsim), **out)
###############################################################################
# MPI task delegation #
###############################################################################
if nproc > 1:
if rank == 0:
tasks = list(paths.get_ics(tonew=False))
master_process(tasks, comm, verbose=True)
else:
worker_process(do_split, comm, verbose=False)
else:
tasks = paths.get_ics(tonew=False)
for task in tasks:
print("{}: completing task `{}`.".format(datetime.now(), task))
do_split(task)
comm.Barrier()