csiborgtools/scripts_independent/field_sph.py

202 lines
7.5 KiB
Python
Raw Normal View History

# Copyright (C) 2023 Richard Stiskalek
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Script to construct the density and velocity fields for a simulation snapshot.
The SPH filter is implemented in the cosmotool package.
"""
from argparse import ArgumentParser
from os import environ, remove
from os.path import join, exists
import subprocess
from datetime import datetime
import hdf5plugin # noqa
import numpy as np
from h5py import File
def now():
return datetime.now()
def generate_unique_id(file_path):
"""
Generate a unique ID for a file path.
"""
return file_path.replace('/', '_').replace(':', '_')
def prepare_random(temporary_output_path, npart=100, dtype=np.float32):
"""
Prepare a random dataset for the SPH filter.
"""
print("Preparing random dataset.", flush=True)
arr = np.full((npart, 7), np.nan, dtype=dtype)
arr[:, :3] = np.random.uniform(0, 1, (npart, 3))
arr[:, 3:6] = np.random.normal(0, 1, (npart, 3))
arr[:, 6] = np.ones(npart, dtype=dtype)
dset = np.random.random((npart, 7)).astype(dtype)
dset[:, 6] = np.ones(npart, dtype=dtype)
with File(temporary_output_path, 'w') as target:
target.create_dataset("particles", data=dset, dtype=dtype)
return 1.
def prepare_gadget(snapshot_path, temporary_output_path):
"""
Prepare a GADGET snapshot for the SPH filter. Assumes there is only a
single file per snapshot.
"""
with File(snapshot_path, 'r') as source, File(temporary_output_path, 'w') as target: # noqa
boxsize = source["Header"].attrs["BoxSize"]
npart = sum(source["Header"].attrs["NumPart_Total"])
nhighres = source["Header"].attrs["NumPart_Total"][1]
dset = target.create_dataset("particles", (npart, 7), dtype=np.float32)
# Copy to this dataset the high-resolution particles.
dset[:nhighres, :3] = source["PartType1/Coordinates"][:]
dset[:nhighres, 3:6] = source["PartType1/Velocities"][:]
dset[:nhighres, 6] = np.ones(nhighres, dtype=np.float32) * source["Header"].attrs["MassTable"][1] # noqa
# Now copy the low-resolution particles.
dset[nhighres:, :3] = source["PartType5/Coordinates"][:]
dset[nhighres:, 3:6] = source["PartType5/Velocities"][:]
dset[nhighres:, 6] = source["PartType5/Masses"][:]
return boxsize
def run_sph_filter(particles_path, output_path, boxsize, resolution,
SPH_executable):
"""
Run the SPH filter on a snapshot.
"""
if not exists(particles_path):
raise RuntimeError(f"Particles file `{particles_path}` does not exist.") # noqa
if not isinstance(boxsize, (int, float)):
raise TypeError("`boxsize` must be a number.")
if not isinstance(resolution, int):
raise TypeError("`resolution` must be an integer.")
if not exists(SPH_executable):
raise RuntimeError(f"SPH executable `{SPH_executable}` does not exist.") # noqa
command = [SPH_executable, particles_path, str(1e14), str(boxsize),
str(resolution), str(0), str(0), str(0), output_path, "1"]
print(f"{now()}: executing `simple3DFilter`.", flush=True)
start_time = now()
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
for line in iter(process.stdout.readline, ""):
print(line, end="", flush=True)
process.wait()
if process.returncode != 0:
raise RuntimeError("`simple3DFilter`failed.")
else:
dt = now() - start_time
print(f"{now()}: `simple3DFilter`completed successfully in {dt}.",
flush=True)
def main(snapshot_path, output_path, resolution, scratch_space, SPH_executable,
snapshot_kind):
"""
Construct the density and velocity fields for a simulation snapshot using
`cosmotool` [1].
Parameters
----------
snapshot_path : str
Path to the simulation snapshot.
output_path : str
Path to the output HDF5 file.
resolution : int
Resolution of the density field.
scratch_space : str
Path to a folder where temporary files can be stored.
SPH_executable : str
Path to the `simple3DFilter` executable [1].
snapshot_kind : str
Kind of the simulation snapshot. Currently only `gadget4` is supported.
Returns
-------
None
References
----------
[1] https://bitbucket.org/glavaux/cosmotool/src/master/sample/simple3DFilter.cpp # noqa
"""
if snapshot_kind != "gadget4":
raise NotImplementedError("Only GADGET HDF5 snapshots are supported.")
print("---------- SPH Density & Velocity Field Job Information ----------")
print(f"Snapshot path: {snapshot_path}")
print(f"Output path: {output_path}")
print(f"Resolution: {resolution}")
print(f"Scratch space: {scratch_space}")
print(f"SPH executable: {SPH_executable}")
print(f"Snapshot kind: {snapshot_kind}")
print("------------------------------------------------------------------")
print(flush=True)
temporary_output_path = join(
scratch_space, generate_unique_id(snapshot_path))
if not temporary_output_path.endswith(".hdf5"):
raise RuntimeError("Temporary output path must end with `.hdf5`.")
print(f"{now()}: preparing snapshot...", flush=True)
boxsize = prepare_gadget(snapshot_path, temporary_output_path)
print(f"{now()}: wrote temporary data to {temporary_output_path}.",
flush=True)
run_sph_filter(temporary_output_path, output_path, boxsize, resolution,
SPH_executable)
print(f"{now()}: removing the temporary snapshot file.", flush=True)
try:
remove(temporary_output_path)
except FileNotFoundError:
pass
if __name__ == "__main__":
parser = ArgumentParser(description="Generate SPH density and velocity field.") # noqa
parser.add_argument("--snapshot_path", type=str, required=True,
help="Path to the simulation snapshot.")
parser.add_argument("--output_path", type=str, required=True,
help="Path to the output HDF5 file.")
parser.add_argument("--resolution", type=int, required=True,
help="Resolution of the density and velocity field.")
parser.add_argument("--scratch_space", type=str, required=True,
help="Path to a folder where temporary files can be stored.") # noqa
parser.add_argument("--SPH_executable", type=str, required=True,
help="Path to the `simple3DFilter` executable.")
parser.add_argument("--snapshot_kind", type=str, required=True,
choices=["gadget4"],
help="Kind of the simulation snapshot.")
args = parser.parse_args()
main(args.snapshot_path, args.output_path, args.resolution,
args.scratch_space, args.SPH_executable, args.snapshot_kind)