defined a main function for the scripts so they can be also used as modules

This commit is contained in:
Mayeul Aubin 2025-05-20 10:42:57 +02:00
parent a485db9465
commit a693da3db0
3 changed files with 356 additions and 153 deletions

View file

@ -2,6 +2,41 @@ from pysbmy.density import get_density_pm_snapshot
from pysbmy.snapshot import read_snapshot
import argparse
def convert_snapshot_to_density(snapshot_path, output_path, N=None, corner=(0.0, 0.0, 0.0)):
"""
Convert a snapshot to a density field.
Parameters
----------
snapshot_path : str
Path to the snapshot file.
output_path : str
Path to the output density file.
N : int
Size of the density field grid (N x N x N).
corner : tuple of float
Corner of the box (x, y, z).
"""
# Read the snapshot
print("Reading snapshot...")
snap = read_snapshot(snapshot_path)
if N is None:
N = snap.Np0
# Calculate density
print("Calculating density...")
F = get_density_pm_snapshot(snap, N, N, N, corner[0], corner[1], corner[2])
# Write density to file
print("Writing density...")
F.write(output_path)
print("Density written to", output_path)
print("Done.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert snapshot to density.")
parser.add_argument(
@ -36,19 +71,9 @@ if __name__ == "__main__":
args = parser.parse_args()
# Read the snapshot
print("Reading snapshot...")
snap = read_snapshot(args.snapshot)
if args.N is None:
N = snap.Np0
else:
N = args.N
print("Calculating density...")
F=get_density_pm_snapshot(snap, N,N,N, args.corner[0],args.corner[1],args.corner[2])
print("Writing density...")
F.write(args.output)
print("Density written to", args.output)
print("Done.")
convert_snapshot_to_density(
snapshot_path=args.snapshot,
output_path=args.output,
N=args.N,
corner=args.corner,
)

View file

@ -58,6 +58,47 @@ def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES):
def gather_tiles(folder, tile_base, L, Np, N_TILES, buffer):
"""
Gather sCOLA tiles into a single density field.
Parameters
----------
folder : str
Folder containing the tiles.
tile_base : str
Base name of the tiles.
L : float
Size of the box in Mpc/h.
Np : int
Number of cells per dimension for the full box.
N_TILES : int
Number of tiles per dimension.
buffer : int
Buffer size for the density field of tiles.
"""
Np_tile = Np//N_TILES
dpm = L/Np_tile
print("Memory allocation for the grid...")
A=np.zeros((Np,Np,Np), dtype=np.float32)
print("Starting to read the tiles...")
gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES)
print("Finished reading the tiles.")
A=density_to_delta(A,-1)
print("Converting to field...")
F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A)
print("Saving field...")
F.write(folder+"../results/final_density_sCOLA.h5")
print("Density field saved to", folder+"../results/final_density_sCOLA.h5")
print("Done.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Gather density from tiles.")
@ -80,17 +121,4 @@ if __name__ == "__main__":
Np_tile = Np//N_TILES
dpm = L/Np_tile
print("Memory allocation for the grid...")
A=np.zeros((Np,Np,Np), dtype=np.float32)
print("Starting to read the tiles...")
gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES)
print("Finished reading the tiles.")
A=density_to_delta(A,-1)
print("Converting to field...")
F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A)
print("Saving field...")
F.write(folder+"../results/final_density_sCOLA.h5")
gather_tiles(folder, tile_base, L, Np, N_TILES, buffer)

View file

@ -9,6 +9,12 @@ import time
def create_scola_slurm_script(slurmfile, box):
"""
Create a slurm script for sCOLA.
Parameters
----------
slurmfile : str
Path to the slurm script file.
box : str
Box number to be replaced in the slurm script.
"""
# Read the slurm file
with open(slurmfile, "r") as f:
@ -25,6 +31,14 @@ def create_scola_slurm_script(slurmfile, box):
def submit_slurm_job(slurmfile):
"""
Submit a slurm job using the sbatch command.
Parameters
----------
slurmfile : str
Path to the slurm script file.
Returns
-------
str
Job ID of the submitted job. None if the submission failed.
"""
# Submit the job
result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True)
@ -89,6 +103,16 @@ def check_job_status(job_id):
"""
Check the status of a job using the squeue command.
Returns the job status and running time.
Parameters
----------
job_id : str
Job ID of the job to check.
Returns
-------
str
Job status. Possible values are 'R' (running), 'PD' (pending), 'X' (failed), 'CP' (completed).
int
Running time in seconds. -1 if the job is not found.
"""
# Check the job status
result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True)
@ -125,11 +149,96 @@ def get_job_id(jobname):
return job_id
def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
def resubmit_job(slurmdir,slurmfile,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10):
"""
Resubmit a job if it has failed.
Parameters
----------
slurmdir : str
Directory where the slurm scripts are saved.
slurmfile : str
Slurm script file.
job_ids_array : array
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
box : int
Box number of the job to resubmit.
resubmit_count : int
Number of resubmissions so far.
error_count : int
Number of errors so far.
MAX_RESUBMIT : int
Maximum number of resubmissions allowed.
MAX_ERRORS : int
Maximum number of errors allowed.
Returns
-------
int
Updated resubmit count.
int
Updated error count.
"""
# Resubmit the job
job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box))
# Check if the job was submitted successfully
if job_id is None:
print(f"Error resubmitting job for box {box}")
error_count+=1
# Check if the error count exceeds the maximum
if error_count >= MAX_ERRORS:
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
else:
job_ids_array[box-1] = int(job_id)
resubmit_count += 1
# Check if the resubmit count exceeds the maximum
if resubmit_count >= MAX_RESUBMIT:
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
return resubmit_count, error_count
def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10):
"""
Get the status of all previously submitted jobs.
For each job, check if it is running, completed, or failed.
If the job is failed, resubmit it.
Parameters
----------
workdir : str
Directory where the tiles are saved.
slurmdir : str
Directory where the slurm scripts are saved.
slurmfile : str
Slurm script file.
tilefile : str
Tile file name.
sleeptime : float
Sleep time between each job submission (in s).
job_ids_array : array
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
box : int
Up to which box the job status is checked.
resubmit_count : int
Number of resubmissions so far.
error_count : int
Number of errors so far.
MAX_RESUBMIT : int
Maximum number of resubmissions allowed.
MAX_ERRORS : int
Maximum number of errors allowed.
Returns
-------
dict
Dictionary with the job status categories and their corresponding box numbers.
int
Updated resubmit count.
int
Updated error count.
"""
job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]}
@ -143,27 +252,13 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
# If the status is 'X', check if the tile file was created
if status == 'X':
# Check if the tile file was created
if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"):
if os.path.exists(workdir+f"{tilefile}{prev_box}.h5"):
job_status_categories['CP'].append(prev_box) # Classify as completed
else:
# Resubmit the job
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box))
# Check if the job was submitted successfully
if job_id is None:
print(f"Error submitting job for box {box}")
error_count+=1
# Check if the error count exceeds the maximum
if error_count >= MAX_ERRORS:
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
else:
job_ids_array[prev_box-1] = int(job_id)
resubmit_count += 1
# Check if the resubmit count exceeds the maximum
if resubmit_count >= MAX_RESUBMIT:
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
resubmit_job(slurmdir,slurmfile,job_ids_array,prev_box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
job_status_categories[status].append(prev_box) # Classify as failed
# Sleep for a while before resubmitting the next job
time.sleep(args.sleep)
time.sleep(sleeptime)
# If the status is not 'X', record the job status
else:
job_status_categories[status].append(prev_box)
@ -175,6 +270,21 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time):
"""
Cap the number of jobs to a maximum number.
Parameters
----------
job_status_categories : dict
Dictionary with the job status categories and their corresponding box numbers.
job_ids_array : array
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
max_jobs : int
Maximum number of jobs allowed.
sleep_time : float
Sleep time between each job submission (in s).
Returns
-------
dict
Updated dictionary with the job status categories and their corresponding box numbers.
"""
discard_categories = ['CP', 'X'] # Completed and Failed
# Check the number of running /pending jobs
@ -219,6 +329,140 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c
def scola_submit(directory,
slurmdir=None,
workdir=None,
slurmfile="scola_sCOLA.sh",
tilefile="scola_tile",
jobname="sCOLA_",
N_tiles=4,
sleep=1.5,
force=False,
MAX_ERRORS=10,
MAX_RESUBMIT=10,
MAX_JOBS_AT_ONCE=48,
CHECK_EVERY=100):
if slurmdir is None:
slurmdir = directory + "slurm_scripts/"
if workdir is None:
workdir = directory + "work/"
# Check that the slurm file exists
if not os.path.exists(slurmdir+slurmfile):
raise FileNotFoundError(f"Slurm file {slurmdir+slurmfile} does not exist.")
# Check that the work directory exists
if not os.path.exists(workdir):
raise FileNotFoundError(f"Work directory {workdir} does not exist.")
# If force, remove all pre-existing tile files
if force:
count_removed = 0
for box in range(1,N_tiles**3+1):
if os.path.exists(workdir+f"{tilefile}{box}.h5"):
os.remove(workdir+f"{tilefile}{box}.h5")
count_removed += 1
print(f"Removed {count_removed} ({100*count_removed/N_tiles**3:.1f}%) pre-existing tile files.")
# MAX_ERRORS = 10
if MAX_RESUBMIT is None:
MAX_RESUBMIT = int(0.1*N_tiles**3) # 10% of the total number of jobs
# MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core
# CHECK_EVERY = 100
error_count = 0
resubmit_count = 0
counter_for_checks = 0
job_ids_array = np.zeros((N_tiles**3,), dtype=int)
print("---------------------------------------------------")
print("Starting job submission for sCOLA tiles with the following parameters:")
print(f"Directory: {directory}")
print(f"Slurm file: {slurmdir}{slurmfile}")
print(f"Work directory: {workdir}")
print(f"Number of tiles: {N_tiles**3} tiles")
print(f"Sleep time: {sleep} s")
print(f"Max errors: {MAX_ERRORS} errors")
print(f"Max resubmits: {MAX_RESUBMIT} resubmits")
print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs")
print(f"Check every: {CHECK_EVERY} jobs")
print("---------------------------------------------------")
print(f"ETA: {convert_seconds_to_time(N_tiles**3*sleep*1.2)}")
print("Starting job submission...")
for box in tqdm.tqdm(range(1,N_tiles**3+1), desc="Submitting jobs", unit="boxes"):
# Check if the tile file already exists
if os.path.exists(workdir+f"{tilefile}{box}.h5"):
continue
# Check if the slurm job is already running
job_id = get_job_id(f"{jobname}{box}")
if job_id is not None:
job_ids_array[box-1] = int(job_id)
time.sleep(sleep)
continue
# Create the slurm script for the box
create_scola_slurm_script(slurmdir+slurmfile, str(box))
# Submit the job
job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box))
# Check if the job was submitted successfully
if job_id is None:
print(f"Error submitting job for box {box}")
error_count+=1
else:
job_ids_array[box-1] = int(job_id)
# Sleep for a while before submitting the next job
time.sleep(sleep)
counter_for_checks += 1
# Check if the error count exceeds the maximum
if error_count >= MAX_ERRORS:
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
# Check the job status every CHECK_EVERY jobs
if counter_for_checks >= CHECK_EVERY:
counter_for_checks = 0
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
print_summary_job_status(job_status_categories, box, resubmit_count, error_count)
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep)
print("All jobs submitted. Now checking the status of the jobs.")
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
# Now wait for all jobs to finish
while len(job_status_categories['CP'])<N_tiles**3:
time.sleep(10*sleep)
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
print_summary_job_status(job_status_categories, N_tiles**3, resubmit_count, error_count)
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep)
print("All jobs finished.")
# Remove the slurm scripts
for box in range(1,N_tiles**3+1):
if os.path.exists(slurmdir+slurmfile+"."+str(box)):
os.remove(slurmdir+slurmfile+"."+str(box))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Submit slurm jobs for sCOLA tiles.")
@ -234,115 +478,21 @@ if __name__ == "__main__":
parser.add_argument("--sleep", type=float, default=1.5, help="Sleep time between each job submission (in s).")
parser.add_argument("-F","--force", action="store_true", help="Force to resimulate all tiles, even if they already exist.")
args=parser.parse_args()
if args.slurmdir is None:
args.slurmdir = args.directory + "slurm_scripts/"
if args.workdir is None:
args.workdir = args.directory + "work/"
# Check that the slurm file exists
if not os.path.exists(args.slurmdir+args.slurmfile):
raise FileNotFoundError(f"Slurm file {args.slurmdir+args.slurmfile} does not exist.")
# Check that the work directory exists
if not os.path.exists(args.workdir):
raise FileNotFoundError(f"Work directory {args.workdir} does not exist.")
MAX_ERRORS = 10
MAX_RESUBMIT = int(0.1*args.N_tiles**3) # 10% of the total number of jobs
MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core
CHECK_EVERY = 100
error_count = 0
resubmit_count = 0
counter_for_checks = 0
job_ids_array = np.zeros((args.N_tiles**3,), dtype=int)
print("---------------------------------------------------")
print("Starting job submission for sCOLA tiles with the following parameters:")
print(f"Directory: {args.directory}")
print(f"Slurm file: {args.slurmdir}{args.slurmfile}")
print(f"Work directory: {args.workdir}")
print(f"Number of tiles: {args.N_tiles**3} tiles")
print(f"Sleep time: {args.sleep} s")
print(f"Max errors: {MAX_ERRORS} errors")
print(f"Max resubmits: {MAX_RESUBMIT} resubmits")
print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs")
print(f"Check every: {CHECK_EVERY} jobs")
print("---------------------------------------------------")
print(f"ETA: {convert_seconds_to_time(args.N_tiles**3*args.sleep*1.2)}")
print("Starting job submission...")
scola_submit(args.directory,
slurmdir=args.slurmdir,
workdir=args.workdir,
slurmfile=args.slurmfile,
tilefile=args.tilefile,
jobname=args.jobname,
N_tiles=args.N_tiles,
sleep=args.sleep,
force=args.force)
for box in tqdm.tqdm(range(1,args.N_tiles**3+1), desc="Submitting jobs", unit="boxes"):
# Check if the tile file already exists
if os.path.exists(args.workdir+f"{args.tilefile}{box}.h5"):
continue
# Check if the slurm job is already running
job_id = get_job_id(f"{args.jobname}{box}")
if job_id is not None:
job_ids_array[box-1] = int(job_id)
time.sleep(args.sleep)
continue
# Create the slurm script for the box
create_scola_slurm_script(args.slurmdir+args.slurmfile, str(box))
# Submit the job
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(box))
# Check if the job was submitted successfully
if job_id is None:
print(f"Error submitting job for box {box}")
error_count+=1
else:
job_ids_array[box-1] = int(job_id)
# Sleep for a while before submitting the next job
time.sleep(args.sleep)
counter_for_checks += 1
# Check if the error count exceeds the maximum
if error_count >= MAX_ERRORS:
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
# Check if the resubmit count exceeds the maximum
if resubmit_count >= MAX_RESUBMIT:
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
# Check the job status every CHECK_EVERY jobs
if counter_for_checks >= CHECK_EVERY:
counter_for_checks = 0
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count)
print_summary_job_status(job_status_categories, box, resubmit_count, error_count)
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
print("All jobs submitted. Now checking the status of the jobs.")
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
# Now wait for all jobs to finish
while len(job_status_categories['CP'])<args.N_tiles**3:
time.sleep(10*args.sleep)
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count)
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
print("All jobs finished.")
# Remove the slurm scripts
for box in range(1,args.N_tiles**3+1):
if os.path.exists(args.slurmdir+args.slurmfile+"."+str(box)):
os.remove(args.slurmdir+args.slurmfile+"."+str(box))