diff --git a/analysis/slices.py b/analysis/slices.py old mode 100755 new mode 100644 diff --git a/scripts/__init__.py b/scripts/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/convert_snapshot_to_density.py b/scripts/convert_snapshot_to_density.py index aed11ee..2959594 100644 --- a/scripts/convert_snapshot_to_density.py +++ b/scripts/convert_snapshot_to_density.py @@ -2,41 +2,6 @@ from pysbmy.density import get_density_pm_snapshot from pysbmy.snapshot import read_snapshot import argparse - -def convert_snapshot_to_density(snapshot_path, output_path, N=None, corner=(0.0, 0.0, 0.0)): - """ - Convert a snapshot to a density field. - - Parameters - ---------- - snapshot_path : str - Path to the snapshot file. - output_path : str - Path to the output density file. - N : int - Size of the density field grid (N x N x N). - corner : tuple of float - Corner of the box (x, y, z). - """ - # Read the snapshot - print("Reading snapshot...") - snap = read_snapshot(snapshot_path) - - if N is None: - N = snap.Np0 - - # Calculate density - print("Calculating density...") - F = get_density_pm_snapshot(snap, N, N, N, corner[0], corner[1], corner[2]) - - # Write density to file - print("Writing density...") - F.write(output_path) - print("Density written to", output_path) - print("Done.") - - - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Convert snapshot to density.") parser.add_argument( @@ -71,9 +36,19 @@ if __name__ == "__main__": args = parser.parse_args() - convert_snapshot_to_density( - snapshot_path=args.snapshot, - output_path=args.output, - N=args.N, - corner=args.corner, - ) \ No newline at end of file + # Read the snapshot + print("Reading snapshot...") + snap = read_snapshot(args.snapshot) + + if args.N is None: + N = snap.Np0 + else: + N = args.N + + print("Calculating density...") + F=get_density_pm_snapshot(snap, N,N,N, args.corner[0],args.corner[1],args.corner[2]) + + print("Writing density...") + F.write(args.output) + print("Density written to", args.output) + print("Done.") \ No newline at end of file diff --git a/scripts/field_to_field.py b/scripts/field_to_field.py deleted file mode 100644 index 236e283..0000000 --- a/scripts/field_to_field.py +++ /dev/null @@ -1,110 +0,0 @@ -from pysbmy.density import mesh_to_mesh -from pysbmy.Field import Field, read_field -import numpy as np -import os - - -def field_to_field( - input_file:str, - output_file:str, - output_size:int|tuple[int,int,int]|list[int], - output_dpm:float|tuple[float,float,float]|list[float], - output_corner:tuple[float,float,float]|list[float], - boundary_conditions:int, - ): - - - if isinstance(output_size, int): - output_size = (output_size, output_size, output_size) # N0, N1, N2 - - if isinstance(output_dpm, float): - output_dpm = (output_dpm, output_dpm, output_dpm) # d0, d1, d2 - elif output_dpm is None: - output_dpm = (-1, -1, -1) - - if not os.path.exists(input_file): - raise FileNotFoundError(f"Input file {input_file} does not exist.") - - # Read the input field - print(f"Reading input field from {input_file}") - input_field = read_field(input_file) - - if input_field.rank != 1 or input_field.data.ndim != 3: - raise NotImplementedError("Only 3D scalar fields are supported for now.") - - L0 = input_field.L0 - L1 = input_field.L1 - L2 = input_field.L2 - N0 = input_field.N0 - N1 = input_field.N1 - N2 = input_field.N2 - corner0 = input_field.corner0 - corner1 = input_field.corner1 - corner2 = input_field.corner2 - - d0_in = L0 / N0 - d1_in = L1 / N1 - d2_in = L2 / N2 - - d0_out = output_dpm[0] if output_dpm[0] > 0 else d0_in - d1_out = output_dpm[1] if output_dpm[1] > 0 else d1_in - d2_out = output_dpm[2] if output_dpm[2] > 0 else d2_in - - offset_out_x = (output_corner[0] - corner0) - offset_out_y = (output_corner[1] - corner1) - offset_out_z = (output_corner[2] - corner2) - - print("-----------------------------------------") - print(f"Input field size: {N0} x {N1} x {N2}") - print(f"Output field size: {output_size[0]} x {output_size[1]} x {output_size[2]}") - print(f"Input field dpm: {d0_in:.3f} x {d1_in:.3f} x {d2_in:.3f}") - print(f"Output field dpm: {d0_out:.3f} x {d1_out:.3f} x {d2_out:.3f}") - print(f"Input field corner: {corner0:.1f} x {corner1:.1f} x {corner2:.1f}") - print(f"Output field corner: {output_corner[0]:.1f} x {output_corner[1]:.1f} x {output_corner[2]:.1f}") - print(f"Boundary conditions: {boundary_conditions}") - print("-----------------------------------------") - - input_grid = input_field.data - - output_grid = np.zeros(output_size, dtype=input_grid.dtype) - - - # Mesh to mesh interpolation - print("Interpolating field...") - mesh_to_mesh(input_grid, output_grid, d0_in, d1_in, d2_in, d0_out, d1_out, d2_out, offset_out_x, offset_out_y, offset_out_z, boundary_conditions) - - # Create the output field - output_field = Field( - L0=output_size[0] * d0_out, - L1=output_size[1] * d1_out, - L2=output_size[2] * d2_out, - corner0=output_corner[0], - corner1=output_corner[1], - corner2=output_corner[2], - rank=input_field.rank, - N0=output_size[0], - N1=output_size[1], - N2=output_size[2], - time=input_field.time, - data=output_grid - ) - - # Write the output field - print(f"Writing output field to {output_file}") - output_field.write(output_file) - print("Done.") - - - -if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser(description="Convert a field from one size to another.") - parser.add_argument("-i","--input_file", type=str, help="Input field file") - parser.add_argument("-o","--output_file", type=str, help="Output field file") - parser.add_argument("-N","--output_size", type=int, nargs=3, help="Output field size (N0, N1, N2)") - parser.add_argument("-dpm","--output_dpm", type=float, nargs=3, default=None, help="Output field dpm (d0, d1, d2)") - parser.add_argument("-corner","--output_corner", type=float, nargs=3, default=(0.,0.,0.), help="Output field corner (corner0, corner1, corner2)") - parser.add_argument("-BC","--boundary_conditions", type=int, default=1, help="Boundary conditions (0: periodic, 1: non-periodic)") - args = parser.parse_args() - - field_to_field(args.input_file, args.output_file, args.output_size, args.output_dpm, args.output_corner, args.boundary_conditions) \ No newline at end of file diff --git a/scripts/gather_tiles.py b/scripts/gather_tiles.py index a71a273..33ded24 100644 --- a/scripts/gather_tiles.py +++ b/scripts/gather_tiles.py @@ -58,47 +58,6 @@ def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES): -def gather_tiles(folder, tile_base, L, Np, N_TILES, buffer): - """ - Gather sCOLA tiles into a single density field. - - Parameters - ---------- - folder : str - Folder containing the tiles. - tile_base : str - Base name of the tiles. - L : float - Size of the box in Mpc/h. - Np : int - Number of cells per dimension for the full box. - N_TILES : int - Number of tiles per dimension. - buffer : int - Buffer size for the density field of tiles. - """ - Np_tile = Np//N_TILES - dpm = L/Np_tile - - print("Memory allocation for the grid...") - A=np.zeros((Np,Np,Np), dtype=np.float32) - - print("Starting to read the tiles...") - gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES) - - print("Finished reading the tiles.") - A=density_to_delta(A,-1) - - print("Converting to field...") - F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A) - - print("Saving field...") - F.write(folder+"../results/final_density_sCOLA.h5") - print("Density field saved to", folder+"../results/final_density_sCOLA.h5") - print("Done.") - - - if __name__ == "__main__": parser = argparse.ArgumentParser(description="Gather density from tiles.") @@ -121,4 +80,17 @@ if __name__ == "__main__": Np_tile = Np//N_TILES dpm = L/Np_tile - gather_tiles(folder, tile_base, L, Np, N_TILES, buffer) \ No newline at end of file + print("Memory allocation for the grid...") + A=np.zeros((Np,Np,Np), dtype=np.float32) + + print("Starting to read the tiles...") + gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES) + + print("Finished reading the tiles.") + A=density_to_delta(A,-1) + + print("Converting to field...") + F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A) + + print("Saving field...") + F.write(folder+"../results/final_density_sCOLA.h5") \ No newline at end of file diff --git a/scripts/scola_submit.py b/scripts/scola_submit.py index d0b1007..e54714e 100644 --- a/scripts/scola_submit.py +++ b/scripts/scola_submit.py @@ -9,12 +9,6 @@ import time def create_scola_slurm_script(slurmfile, box): """ Create a slurm script for sCOLA. - Parameters - ---------- - slurmfile : str - Path to the slurm script file. - box : str - Box number to be replaced in the slurm script. """ # Read the slurm file with open(slurmfile, "r") as f: @@ -31,14 +25,6 @@ def create_scola_slurm_script(slurmfile, box): def submit_slurm_job(slurmfile): """ Submit a slurm job using the sbatch command. - Parameters - ---------- - slurmfile : str - Path to the slurm script file. - Returns - ------- - str - Job ID of the submitted job. None if the submission failed. """ # Submit the job result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True) @@ -103,16 +89,6 @@ def check_job_status(job_id): """ Check the status of a job using the squeue command. Returns the job status and running time. - Parameters - ---------- - job_id : str - Job ID of the job to check. - Returns - ------- - str - Job status. Possible values are 'R' (running), 'PD' (pending), 'X' (failed), 'CP' (completed). - int - Running time in seconds. -1 if the job is not found. """ # Check the job status result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True) @@ -149,96 +125,11 @@ def get_job_id(jobname): return job_id -def resubmit_job(slurmdir,slurmfile,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): - """ - Resubmit a job if it has failed. - Parameters - ---------- - slurmdir : str - Directory where the slurm scripts are saved. - slurmfile : str - Slurm script file. - job_ids_array : array - Array of job IDs for all previously submitted jobs. Indexed by box-1 number. - box : int - Box number of the job to resubmit. - resubmit_count : int - Number of resubmissions so far. - error_count : int - Number of errors so far. - MAX_RESUBMIT : int - Maximum number of resubmissions allowed. - MAX_ERRORS : int - Maximum number of errors allowed. - - Returns - ------- - int - Updated resubmit count. - int - Updated error count. - """ - - # Resubmit the job - job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) - - # Check if the job was submitted successfully - if job_id is None: - print(f"Error resubmitting job for box {box}") - error_count+=1 - # Check if the error count exceeds the maximum - if error_count >= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - else: - job_ids_array[box-1] = int(job_id) - - resubmit_count += 1 - # Check if the resubmit count exceeds the maximum - if resubmit_count >= MAX_RESUBMIT: - raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") - - return resubmit_count, error_count - - - -def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): +def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): """ Get the status of all previously submitted jobs. For each job, check if it is running, completed, or failed. If the job is failed, resubmit it. - Parameters - ---------- - workdir : str - Directory where the tiles are saved. - slurmdir : str - Directory where the slurm scripts are saved. - slurmfile : str - Slurm script file. - tilefile : str - Tile file name. - sleeptime : float - Sleep time between each job submission (in s). - job_ids_array : array - Array of job IDs for all previously submitted jobs. Indexed by box-1 number. - box : int - Up to which box the job status is checked. - resubmit_count : int - Number of resubmissions so far. - error_count : int - Number of errors so far. - MAX_RESUBMIT : int - Maximum number of resubmissions allowed. - MAX_ERRORS : int - Maximum number of errors allowed. - - Returns - ------- - dict - Dictionary with the job status categories and their corresponding box numbers. - int - Updated resubmit count. - int - Updated error count. """ job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]} @@ -252,13 +143,21 @@ def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_ar # If the status is 'X', check if the tile file was created if status == 'X': # Check if the tile file was created - if os.path.exists(workdir+f"{tilefile}{prev_box}.h5"): + if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"): job_status_categories['CP'].append(prev_box) # Classify as completed else: - resubmit_job(slurmdir,slurmfile,job_ids_array,prev_box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) + # Resubmit the job + job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box)) + # Check if the job was submitted successfully + if job_id is None: + print(f"Error submitting job for box {box}") + error_count+=1 + else: + job_ids_array[prev_box-1] = int(job_id) + resubmit_count += 1 job_status_categories[status].append(prev_box) # Classify as failed # Sleep for a while before resubmitting the next job - time.sleep(sleeptime) + time.sleep(args.sleep) # If the status is not 'X', record the job status else: job_status_categories[status].append(prev_box) @@ -270,21 +169,6 @@ def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_ar def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time): """ Cap the number of jobs to a maximum number. - Parameters - ---------- - job_status_categories : dict - Dictionary with the job status categories and their corresponding box numbers. - job_ids_array : array - Array of job IDs for all previously submitted jobs. Indexed by box-1 number. - max_jobs : int - Maximum number of jobs allowed. - sleep_time : float - Sleep time between each job submission (in s). - - Returns - ------- - dict - Updated dictionary with the job status categories and their corresponding box numbers. """ discard_categories = ['CP', 'X'] # Completed and Failed # Check the number of running /pending jobs @@ -315,11 +199,11 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c # Print summary of job statuses print(f"Job statuses after box {box}:") # Print a table with columns for each status and below the % of jobs in that status - row0 = f"{'Status':<14}" + row0 = f"{'Status':<10}" for status in job_status_categories.keys(): - row0 += f"{status:>9} " + row0 += f"{status:>10}" print(row0) - row1 = f"{'Percentage':<14}" + row1 = f"{'Percentage':<10}" for status in job_status_categories.keys(): row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%" print(row1) @@ -329,140 +213,6 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c -def scola_submit(directory, - slurmdir=None, - workdir=None, - slurmfile="scola_sCOLA.sh", - tilefile="scola_tile", - jobname="sCOLA_", - N_tiles=4, - sleep=1.5, - force=False, - MAX_ERRORS=10, - MAX_RESUBMIT=10, - MAX_JOBS_AT_ONCE=48, - CHECK_EVERY=100): - - if slurmdir is None: - slurmdir = directory + "slurm_scripts/" - if workdir is None: - workdir = directory + "work/" - - - # Check that the slurm file exists - if not os.path.exists(slurmdir+slurmfile): - raise FileNotFoundError(f"Slurm file {slurmdir+slurmfile} does not exist.") - # Check that the work directory exists - if not os.path.exists(workdir): - raise FileNotFoundError(f"Work directory {workdir} does not exist.") - - # If force, remove all pre-existing tile files - if force: - count_removed = 0 - for box in range(1,N_tiles**3+1): - if os.path.exists(workdir+f"{tilefile}{box}.h5"): - os.remove(workdir+f"{tilefile}{box}.h5") - count_removed += 1 - print(f"Removed {count_removed} ({100*count_removed/N_tiles**3:.1f}%) pre-existing tile files.") - - # MAX_ERRORS = 10 - if MAX_RESUBMIT is None: - MAX_RESUBMIT = int(0.1*N_tiles**3) # 10% of the total number of jobs - # MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core - # CHECK_EVERY = 100 - - error_count = 0 - resubmit_count = 0 - counter_for_checks = 0 - - job_ids_array = np.zeros((N_tiles**3,), dtype=int) - - - print("---------------------------------------------------") - print("Starting job submission for sCOLA tiles with the following parameters:") - print(f"Directory: {directory}") - print(f"Slurm file: {slurmdir}{slurmfile}") - print(f"Work directory: {workdir}") - print(f"Number of tiles: {N_tiles**3} tiles") - print(f"Sleep time: {sleep} s") - print(f"Max errors: {MAX_ERRORS} errors") - print(f"Max resubmits: {MAX_RESUBMIT} resubmits") - print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs") - print(f"Check every: {CHECK_EVERY} jobs") - print("---------------------------------------------------") - print(f"ETA: {convert_seconds_to_time(N_tiles**3*sleep*1.2)}") - print("Starting job submission...") - - - - for box in tqdm.tqdm(range(1,N_tiles**3+1), desc="Submitting jobs", unit="boxes"): - - # Check if the tile file already exists - if os.path.exists(workdir+f"{tilefile}{box}.h5"): - continue - - # Check if the slurm job is already running - job_id = get_job_id(f"{jobname}{box}") - if job_id is not None: - job_ids_array[box-1] = int(job_id) - time.sleep(sleep) - continue - - # Create the slurm script for the box - create_scola_slurm_script(slurmdir+slurmfile, str(box)) - - # Submit the job - job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) - - # Check if the job was submitted successfully - if job_id is None: - print(f"Error submitting job for box {box}") - error_count+=1 - else: - job_ids_array[box-1] = int(job_id) - - # Sleep for a while before submitting the next job - time.sleep(sleep) - - counter_for_checks += 1 - - # Check if the error count exceeds the maximum - if error_count >= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - - # Check the job status every CHECK_EVERY jobs - if counter_for_checks >= CHECK_EVERY: - - counter_for_checks = 0 - - job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) - print_summary_job_status(job_status_categories, box, resubmit_count, error_count) - job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep) - - - - print("All jobs submitted. Now checking the status of the jobs.") - - - job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) - # Now wait for all jobs to finish - while len(job_status_categories['CP'])= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + # Check if the resubmit count exceeds the maximum + if resubmit_count >= MAX_RESUBMIT: + raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + + # Check the job status every CHECK_EVERY jobs + if counter_for_checks >= CHECK_EVERY: + + counter_for_checks = 0 + + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count) + print_summary_job_status(job_status_categories, box, resubmit_count, error_count) + job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) + print("All jobs submitted. Now checking the status of the jobs.") + + + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) + # Now wait for all jobs to finish + while len(job_status_categories['CP'])= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + # Check if the resubmit count exceeds the maximum + if resubmit_count >= MAX_RESUBMIT: + raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) + print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count) + job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) + + + print("All jobs finished.") + # Remove the slurm scripts + for box in range(1,args.N_tiles**3+1): + os.remove(args.slurmdir+args.slurmfile+"."+str(box)) \ No newline at end of file