From a693da3db09d70536a064edcfe93b8ffaf26b8d3 Mon Sep 17 00:00:00 2001 From: Mayeul Aubin Date: Tue, 20 May 2025 10:42:57 +0200 Subject: [PATCH] defined a main function for the scripts so they can be also used as modules --- scripts/convert_snapshot_to_density.py | 57 +++- scripts/gather_tiles.py | 56 +++- scripts/scola_submit.py | 396 +++++++++++++++++-------- 3 files changed, 356 insertions(+), 153 deletions(-) diff --git a/scripts/convert_snapshot_to_density.py b/scripts/convert_snapshot_to_density.py index 2959594..aed11ee 100644 --- a/scripts/convert_snapshot_to_density.py +++ b/scripts/convert_snapshot_to_density.py @@ -2,6 +2,41 @@ from pysbmy.density import get_density_pm_snapshot from pysbmy.snapshot import read_snapshot import argparse + +def convert_snapshot_to_density(snapshot_path, output_path, N=None, corner=(0.0, 0.0, 0.0)): + """ + Convert a snapshot to a density field. + + Parameters + ---------- + snapshot_path : str + Path to the snapshot file. + output_path : str + Path to the output density file. + N : int + Size of the density field grid (N x N x N). + corner : tuple of float + Corner of the box (x, y, z). + """ + # Read the snapshot + print("Reading snapshot...") + snap = read_snapshot(snapshot_path) + + if N is None: + N = snap.Np0 + + # Calculate density + print("Calculating density...") + F = get_density_pm_snapshot(snap, N, N, N, corner[0], corner[1], corner[2]) + + # Write density to file + print("Writing density...") + F.write(output_path) + print("Density written to", output_path) + print("Done.") + + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Convert snapshot to density.") parser.add_argument( @@ -36,19 +71,9 @@ if __name__ == "__main__": args = parser.parse_args() - # Read the snapshot - print("Reading snapshot...") - snap = read_snapshot(args.snapshot) - - if args.N is None: - N = snap.Np0 - else: - N = args.N - - print("Calculating density...") - F=get_density_pm_snapshot(snap, N,N,N, args.corner[0],args.corner[1],args.corner[2]) - - print("Writing density...") - F.write(args.output) - print("Density written to", args.output) - print("Done.") \ No newline at end of file + convert_snapshot_to_density( + snapshot_path=args.snapshot, + output_path=args.output, + N=args.N, + corner=args.corner, + ) \ No newline at end of file diff --git a/scripts/gather_tiles.py b/scripts/gather_tiles.py index 33ded24..a71a273 100644 --- a/scripts/gather_tiles.py +++ b/scripts/gather_tiles.py @@ -58,6 +58,47 @@ def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES): +def gather_tiles(folder, tile_base, L, Np, N_TILES, buffer): + """ + Gather sCOLA tiles into a single density field. + + Parameters + ---------- + folder : str + Folder containing the tiles. + tile_base : str + Base name of the tiles. + L : float + Size of the box in Mpc/h. + Np : int + Number of cells per dimension for the full box. + N_TILES : int + Number of tiles per dimension. + buffer : int + Buffer size for the density field of tiles. + """ + Np_tile = Np//N_TILES + dpm = L/Np_tile + + print("Memory allocation for the grid...") + A=np.zeros((Np,Np,Np), dtype=np.float32) + + print("Starting to read the tiles...") + gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES) + + print("Finished reading the tiles.") + A=density_to_delta(A,-1) + + print("Converting to field...") + F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A) + + print("Saving field...") + F.write(folder+"../results/final_density_sCOLA.h5") + print("Density field saved to", folder+"../results/final_density_sCOLA.h5") + print("Done.") + + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Gather density from tiles.") @@ -80,17 +121,4 @@ if __name__ == "__main__": Np_tile = Np//N_TILES dpm = L/Np_tile - print("Memory allocation for the grid...") - A=np.zeros((Np,Np,Np), dtype=np.float32) - - print("Starting to read the tiles...") - gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES) - - print("Finished reading the tiles.") - A=density_to_delta(A,-1) - - print("Converting to field...") - F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A) - - print("Saving field...") - F.write(folder+"../results/final_density_sCOLA.h5") \ No newline at end of file + gather_tiles(folder, tile_base, L, Np, N_TILES, buffer) \ No newline at end of file diff --git a/scripts/scola_submit.py b/scripts/scola_submit.py index 637b122..d0b1007 100644 --- a/scripts/scola_submit.py +++ b/scripts/scola_submit.py @@ -9,6 +9,12 @@ import time def create_scola_slurm_script(slurmfile, box): """ Create a slurm script for sCOLA. + Parameters + ---------- + slurmfile : str + Path to the slurm script file. + box : str + Box number to be replaced in the slurm script. """ # Read the slurm file with open(slurmfile, "r") as f: @@ -25,6 +31,14 @@ def create_scola_slurm_script(slurmfile, box): def submit_slurm_job(slurmfile): """ Submit a slurm job using the sbatch command. + Parameters + ---------- + slurmfile : str + Path to the slurm script file. + Returns + ------- + str + Job ID of the submitted job. None if the submission failed. """ # Submit the job result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True) @@ -89,6 +103,16 @@ def check_job_status(job_id): """ Check the status of a job using the squeue command. Returns the job status and running time. + Parameters + ---------- + job_id : str + Job ID of the job to check. + Returns + ------- + str + Job status. Possible values are 'R' (running), 'PD' (pending), 'X' (failed), 'CP' (completed). + int + Running time in seconds. -1 if the job is not found. """ # Check the job status result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True) @@ -125,11 +149,96 @@ def get_job_id(jobname): return job_id -def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): +def resubmit_job(slurmdir,slurmfile,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): + """ + Resubmit a job if it has failed. + Parameters + ---------- + slurmdir : str + Directory where the slurm scripts are saved. + slurmfile : str + Slurm script file. + job_ids_array : array + Array of job IDs for all previously submitted jobs. Indexed by box-1 number. + box : int + Box number of the job to resubmit. + resubmit_count : int + Number of resubmissions so far. + error_count : int + Number of errors so far. + MAX_RESUBMIT : int + Maximum number of resubmissions allowed. + MAX_ERRORS : int + Maximum number of errors allowed. + + Returns + ------- + int + Updated resubmit count. + int + Updated error count. + """ + + # Resubmit the job + job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) + + # Check if the job was submitted successfully + if job_id is None: + print(f"Error resubmitting job for box {box}") + error_count+=1 + # Check if the error count exceeds the maximum + if error_count >= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + else: + job_ids_array[box-1] = int(job_id) + + resubmit_count += 1 + # Check if the resubmit count exceeds the maximum + if resubmit_count >= MAX_RESUBMIT: + raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + + return resubmit_count, error_count + + + +def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): """ Get the status of all previously submitted jobs. For each job, check if it is running, completed, or failed. If the job is failed, resubmit it. + Parameters + ---------- + workdir : str + Directory where the tiles are saved. + slurmdir : str + Directory where the slurm scripts are saved. + slurmfile : str + Slurm script file. + tilefile : str + Tile file name. + sleeptime : float + Sleep time between each job submission (in s). + job_ids_array : array + Array of job IDs for all previously submitted jobs. Indexed by box-1 number. + box : int + Up to which box the job status is checked. + resubmit_count : int + Number of resubmissions so far. + error_count : int + Number of errors so far. + MAX_RESUBMIT : int + Maximum number of resubmissions allowed. + MAX_ERRORS : int + Maximum number of errors allowed. + + Returns + ------- + dict + Dictionary with the job status categories and their corresponding box numbers. + int + Updated resubmit count. + int + Updated error count. """ job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]} @@ -143,27 +252,13 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): # If the status is 'X', check if the tile file was created if status == 'X': # Check if the tile file was created - if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"): + if os.path.exists(workdir+f"{tilefile}{prev_box}.h5"): job_status_categories['CP'].append(prev_box) # Classify as completed else: - # Resubmit the job - job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box)) - # Check if the job was submitted successfully - if job_id is None: - print(f"Error submitting job for box {box}") - error_count+=1 - # Check if the error count exceeds the maximum - if error_count >= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - else: - job_ids_array[prev_box-1] = int(job_id) - resubmit_count += 1 - # Check if the resubmit count exceeds the maximum - if resubmit_count >= MAX_RESUBMIT: - raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + resubmit_job(slurmdir,slurmfile,job_ids_array,prev_box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) job_status_categories[status].append(prev_box) # Classify as failed # Sleep for a while before resubmitting the next job - time.sleep(args.sleep) + time.sleep(sleeptime) # If the status is not 'X', record the job status else: job_status_categories[status].append(prev_box) @@ -175,6 +270,21 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time): """ Cap the number of jobs to a maximum number. + Parameters + ---------- + job_status_categories : dict + Dictionary with the job status categories and their corresponding box numbers. + job_ids_array : array + Array of job IDs for all previously submitted jobs. Indexed by box-1 number. + max_jobs : int + Maximum number of jobs allowed. + sleep_time : float + Sleep time between each job submission (in s). + + Returns + ------- + dict + Updated dictionary with the job status categories and their corresponding box numbers. """ discard_categories = ['CP', 'X'] # Completed and Failed # Check the number of running /pending jobs @@ -219,6 +329,140 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c +def scola_submit(directory, + slurmdir=None, + workdir=None, + slurmfile="scola_sCOLA.sh", + tilefile="scola_tile", + jobname="sCOLA_", + N_tiles=4, + sleep=1.5, + force=False, + MAX_ERRORS=10, + MAX_RESUBMIT=10, + MAX_JOBS_AT_ONCE=48, + CHECK_EVERY=100): + + if slurmdir is None: + slurmdir = directory + "slurm_scripts/" + if workdir is None: + workdir = directory + "work/" + + + # Check that the slurm file exists + if not os.path.exists(slurmdir+slurmfile): + raise FileNotFoundError(f"Slurm file {slurmdir+slurmfile} does not exist.") + # Check that the work directory exists + if not os.path.exists(workdir): + raise FileNotFoundError(f"Work directory {workdir} does not exist.") + + # If force, remove all pre-existing tile files + if force: + count_removed = 0 + for box in range(1,N_tiles**3+1): + if os.path.exists(workdir+f"{tilefile}{box}.h5"): + os.remove(workdir+f"{tilefile}{box}.h5") + count_removed += 1 + print(f"Removed {count_removed} ({100*count_removed/N_tiles**3:.1f}%) pre-existing tile files.") + + # MAX_ERRORS = 10 + if MAX_RESUBMIT is None: + MAX_RESUBMIT = int(0.1*N_tiles**3) # 10% of the total number of jobs + # MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core + # CHECK_EVERY = 100 + + error_count = 0 + resubmit_count = 0 + counter_for_checks = 0 + + job_ids_array = np.zeros((N_tiles**3,), dtype=int) + + + print("---------------------------------------------------") + print("Starting job submission for sCOLA tiles with the following parameters:") + print(f"Directory: {directory}") + print(f"Slurm file: {slurmdir}{slurmfile}") + print(f"Work directory: {workdir}") + print(f"Number of tiles: {N_tiles**3} tiles") + print(f"Sleep time: {sleep} s") + print(f"Max errors: {MAX_ERRORS} errors") + print(f"Max resubmits: {MAX_RESUBMIT} resubmits") + print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs") + print(f"Check every: {CHECK_EVERY} jobs") + print("---------------------------------------------------") + print(f"ETA: {convert_seconds_to_time(N_tiles**3*sleep*1.2)}") + print("Starting job submission...") + + + + for box in tqdm.tqdm(range(1,N_tiles**3+1), desc="Submitting jobs", unit="boxes"): + + # Check if the tile file already exists + if os.path.exists(workdir+f"{tilefile}{box}.h5"): + continue + + # Check if the slurm job is already running + job_id = get_job_id(f"{jobname}{box}") + if job_id is not None: + job_ids_array[box-1] = int(job_id) + time.sleep(sleep) + continue + + # Create the slurm script for the box + create_scola_slurm_script(slurmdir+slurmfile, str(box)) + + # Submit the job + job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) + + # Check if the job was submitted successfully + if job_id is None: + print(f"Error submitting job for box {box}") + error_count+=1 + else: + job_ids_array[box-1] = int(job_id) + + # Sleep for a while before submitting the next job + time.sleep(sleep) + + counter_for_checks += 1 + + # Check if the error count exceeds the maximum + if error_count >= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + + # Check the job status every CHECK_EVERY jobs + if counter_for_checks >= CHECK_EVERY: + + counter_for_checks = 0 + + job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) + print_summary_job_status(job_status_categories, box, resubmit_count, error_count) + job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep) + + + + print("All jobs submitted. Now checking the status of the jobs.") + + + job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) + # Now wait for all jobs to finish + while len(job_status_categories['CP'])= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - # Check if the resubmit count exceeds the maximum - if resubmit_count >= MAX_RESUBMIT: - raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") - - # Check the job status every CHECK_EVERY jobs - if counter_for_checks >= CHECK_EVERY: - - counter_for_checks = 0 - - job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count) - print_summary_job_status(job_status_categories, box, resubmit_count, error_count) - job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) + scola_submit(args.directory, + slurmdir=args.slurmdir, + workdir=args.workdir, + slurmfile=args.slurmfile, + tilefile=args.tilefile, + jobname=args.jobname, + N_tiles=args.N_tiles, + sleep=args.sleep, + force=args.force) - print("All jobs submitted. Now checking the status of the jobs.") - - - job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) - # Now wait for all jobs to finish - while len(job_status_categories['CP'])