diff --git a/scripts/convert_snapshot_to_density.py b/convert_snapshot_to_density.py similarity index 100% rename from scripts/convert_snapshot_to_density.py rename to convert_snapshot_to_density.py diff --git a/scripts/gather_tiles.py b/scripts/gather_tiles.py deleted file mode 100644 index 33ded24..0000000 --- a/scripts/gather_tiles.py +++ /dev/null @@ -1,96 +0,0 @@ -import numpy as np -from pysbmy.snapshot import read_tile -from pysbmy.field import Field -from pysbmy.density import get_density_pm, density_to_delta -from pysbmy import c_float_t -import tqdm -import argparse - -def get_indices(tile, N_TILES0, N_TILES1, N_TILES2): - """Get the indices of the tile in a 3D grid.""" - i = (tile // (N_TILES1 * N_TILES2))%N_TILES0 - j = ((tile - N_TILES1 * N_TILES2 * i) // N_TILES2)%N_TILES1 - k = (tile - N_TILES2 * j - N_TILES1 * N_TILES2 * i)%N_TILES2 - return i, j, k - - -def tile_to_density_with_buffer(T, N, d, buffer): - """ - Convert a tile to density with a buffer. - """ - # Create a buffer for the density - A = np.zeros((N+2*buffer, N+2*buffer, N+2*buffer), dtype=np.float32) - - # Get the density of the tile - A = get_density_pm(T.pos-T.corner_position.astype(c_float_t)+d*buffer, A, d) - return A - - -def add_density_tile(A,A_tile,corner_indices): - N = A.shape[0] - Np = A_tile.shape[0] - i0, j0, k0 = corner_indices - - # Create index arrays for the tile - i_idx = (np.arange(Np) + i0) % N - j_idx = (np.arange(Np) + j0) % N - k_idx = (np.arange(Np) + k0) % N - - # Use np.ix_ to get all combinations of indices - A[np.ix_(i_idx, j_idx, k_idx)] += A_tile - - -def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES): - """ - Gather the density from the tiles. - """ - for tile in tqdm.tqdm(range(N_TILES**3), desc="Density of tiles", unit="tiles"): - T=read_tile(folder+tile_base+str(tile+1)+".h5", int(Np_tile**3)) - # Get the corner position of the tile - corner_position = T.corner_position - # Get the corner indices of the tile - i,j,k = get_indices(tile, N_TILES, N_TILES, N_TILES) - corner_grid_indices = (i*Np_tile-buffer, j*Np_tile-buffer, k*Np_tile-buffer) - # Get the density of the tile with a buffer - A_tile = tile_to_density_with_buffer(T, Np_tile, dpm, buffer) - # Add the density of the tile to the grid - add_density_tile(A, A_tile, corner_grid_indices) - - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Gather density from tiles.") - parser.add_argument("-d","--folder", type=str, default="./", help="Folder containing the tiles") - parser.add_argument("--tile_base", type=str, default="sCOLA_tile", help="Base name of the tiles") - parser.add_argument("-L","--L", type=int, default=1920, help="Size of the box in Mpc/h") - parser.add_argument("-Np","--Np", type=int, default=80, help="Number of cells per dimension for the full box") - parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.") - parser.add_argument("--buffer", type=int, default=40, help="Buffer size for the density field of tiles") - args = parser.parse_args() - - L = args.L - Np = args.Np - N_TILES = args.N_tiles - buffer = args.buffer - - folder = args.folder - tile_base = args.tile_base - - Np_tile = Np//N_TILES - dpm = L/Np_tile - - print("Memory allocation for the grid...") - A=np.zeros((Np,Np,Np), dtype=np.float32) - - print("Starting to read the tiles...") - gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES) - - print("Finished reading the tiles.") - A=density_to_delta(A,-1) - - print("Converting to field...") - F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A) - - print("Saving field...") - F.write(folder+"../results/final_density_sCOLA.h5") \ No newline at end of file diff --git a/scripts/scola_submit.py b/scripts/scola_submit.py deleted file mode 100644 index e54714e..0000000 --- a/scripts/scola_submit.py +++ /dev/null @@ -1,347 +0,0 @@ -import tqdm -import argparse -import numpy as np -import os -import subprocess -import time - - -def create_scola_slurm_script(slurmfile, box): - """ - Create a slurm script for sCOLA. - """ - # Read the slurm file - with open(slurmfile, "r") as f: - slurm_script = f.read() - - # Replace the placeholders in the slurm script - slurm_script = slurm_script.replace("BOX", box) - - # Write the modified slurm script to a new file - with open(slurmfile+f".{box}", "w") as f: - f.write(slurm_script) - - -def submit_slurm_job(slurmfile): - """ - Submit a slurm job using the sbatch command. - """ - # Submit the job - result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True) - - # Check if the submission was successful - if result.returncode != 0: - print(f"Error submitting job: {result.stderr}") - return None - - # Get the job ID from the output - job_id = result.stdout.split()[-1] - - return job_id - - -def convert_time_to_seconds(time_str): - """ - Convert a time string of the format D-HH:MM:SS or HH:MM:SS or MM:SS to seconds. - """ - time_parts = time_str.split("-") - if len(time_parts) == 2: - days = int(time_parts[0]) - time_str = time_parts[1] - else: - days = 0 - - time_parts = time_str.split(":") - if len(time_parts) == 3: - hours, minutes, seconds = map(int, time_parts) - elif len(time_parts) == 2: - hours, minutes = map(int, time_parts) - seconds = 0 - else: - raise ValueError("Invalid time format") - - total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds - return total_seconds - - -def convert_seconds_to_time(seconds): - """ - Convert seconds to a time string of the format D-HH:MM:SS. - """ - seconds = int(seconds) - if seconds < 0: - return "N/A" - days = seconds // 86400 - seconds %= 86400 - hours = seconds // 3600 - seconds %= 3600 - minutes = seconds // 60 - seconds %= 60 - - if days > 0: - return f"{days}-{hours:02}:{minutes:02}:{seconds:02}" - if hours > 0: - return f"{hours:02}:{minutes:02}:{seconds:02}" - return f"{minutes:02}:{seconds:02}" - - -def check_job_status(job_id): - """ - Check the status of a job using the squeue command. - Returns the job status and running time. - """ - # Check the job status - result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True) - - # Check if the job is still running - if result.returncode == 1: - return "X", -1 - - if len(result.stdout.split("\n")[1].split()) == 0: - return "X", -1 - - status = result.stdout.split("\n")[1].split()[4] - job_time = convert_time_to_seconds(result.stdout.split("\n")[1].split()[5]) - - return status, job_time - - -def get_job_id(jobname): - """ - Get the job ID from the job name using the squeue command. - """ - # Check the job status - result = subprocess.run(["squeue", "-n", jobname], capture_output=True, text=True) - - # Check if the job is still running - if result.returncode == 1: - return None - if len(result.stdout.split("\n")[1].split()) == 0: - return None - - # Get the job ID - job_id = result.stdout.split("\n")[1].split()[0] - - return job_id - - -def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): - """ - Get the status of all previously submitted jobs. - For each job, check if it is running, completed, or failed. - If the job is failed, resubmit it. - """ - job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]} - - # Check the status of every previously submitted job - for prev_box in tqdm.tqdm(range(1,box), desc="Checking jobs", unit="boxes", leave=False, position=1): - # Check the job status - status, job_time = check_job_status(job_ids_array[prev_box-1]) - # Add the job status to the dictionary - if status not in job_status_categories: - job_status_categories[status] = [] - # If the status is 'X', check if the tile file was created - if status == 'X': - # Check if the tile file was created - if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"): - job_status_categories['CP'].append(prev_box) # Classify as completed - else: - # Resubmit the job - job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box)) - # Check if the job was submitted successfully - if job_id is None: - print(f"Error submitting job for box {box}") - error_count+=1 - else: - job_ids_array[prev_box-1] = int(job_id) - resubmit_count += 1 - job_status_categories[status].append(prev_box) # Classify as failed - # Sleep for a while before resubmitting the next job - time.sleep(args.sleep) - # If the status is not 'X', record the job status - else: - job_status_categories[status].append(prev_box) - - return job_status_categories, resubmit_count, error_count - - - -def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time): - """ - Cap the number of jobs to a maximum number. - """ - discard_categories = ['CP', 'X'] # Completed and Failed - # Check the number of running /pending jobs - job_num = 0 - for status in job_status_categories.keys(): - if status not in discard_categories: - job_num += len(job_status_categories[status]) - - # We wait until the number of jobs is below the maximum - while job_num > max_jobs: - print(f"Number of open jobs: {job_num} > {max_jobs}. Waiting...") - for status in job_status_categories.keys(): - if status not in discard_categories: - for box in job_status_categories[status]: - # Check the new job status - new_status, job_time = check_job_status(job_ids_array[box-1]) - time.sleep(sleep_time) - if new_status in discard_categories: - job_num -= 1 - job_status_categories[new_status].append(box) # WARNING: We do not reclassify 'X' jobs as 'CP' - job_status_categories[status].remove(box) - - return job_status_categories - - -def print_summary_job_status(job_status_categories, box, resubmit_count, error_count): - print("---------------------------------------------------") - # Print summary of job statuses - print(f"Job statuses after box {box}:") - # Print a table with columns for each status and below the % of jobs in that status - row0 = f"{'Status':<10}" - for status in job_status_categories.keys(): - row0 += f"{status:>10}" - print(row0) - row1 = f"{'Percentage':<10}" - for status in job_status_categories.keys(): - row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%" - print(row1) - # Print the rate of resubmissions - print(f"Resubmission rate: {resubmit_count/box*100:.2f}%") - print(f"Error count: {error_count}") - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Submit slurm jobs for sCOLA tiles.") - - parser.add_argument("-d", "--directory", type=str, default="./", help="Main directory where the output will be saved (if other dir and filenames are not specified).") - parser.add_argument("-sd", "--slurmdir", type=str, default=None, help="Directory where the slurm scripts are saved (default is -d/slurm_scripts).") - parser.add_argument("-wd", "--workdir", type=str, default=None, help="Directory where the tiles are saved (default is -d/work).") - - parser.add_argument("-sf","--slurmfile", type=str, default="scola_sCOLA.sh", help="Slurm script file (located in slurmdir).") - parser.add_argument("-tf","--tilefile", type=str, default="scola_tile", help="Tile file name (located in workdir).") - parser.add_argument("--jobname", type=str, default="sCOLA_", help="Job name for the slurm jobs.") - - parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.") - - parser.add_argument("--sleep", type=float, default=1.0, help="Sleep time between each job submission (in s).") - - args=parser.parse_args() - - - if args.slurmdir is None: - args.slurmdir = args.directory + "slurm_scripts/" - if args.workdir is None: - args.workdir = args.directory + "work/" - - - # Check that the slurm file exists - if not os.path.exists(args.slurmdir+args.slurmfile): - raise FileNotFoundError(f"Slurm file {args.slurmdir+args.slurmfile} does not exist.") - # Check that the work directory exists - if not os.path.exists(args.workdir): - raise FileNotFoundError(f"Work directory {args.workdir} does not exist.") - - MAX_ERRORS = 10 - MAX_RESUBMIT = int(0.1*args.N_tiles**3) # 10% of the total number of jobs - MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core - CHECK_EVERY = 100 - - error_count = 0 - resubmit_count = 0 - counter_for_checks = 0 - - job_ids_array = np.zeros((args.N_tiles**3,), dtype=int) - - - print("---------------------------------------------------") - print("Starting job submission for sCOLA tiles with the following parameters:") - print(f"Directory: {args.directory}") - print(f"Slurm file: {args.slurmdir}{args.slurmfile}") - print(f"Work directory: {args.workdir}") - print(f"Number of tiles: {args.N_tiles**3} tiles") - print(f"Sleep time: {args.sleep} s") - print(f"Max errors: {MAX_ERRORS} errors") - print(f"Max resubmits: {MAX_RESUBMIT} resubmits") - print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs") - print(f"Check every: {CHECK_EVERY} jobs") - print("---------------------------------------------------") - print(f"ETA: {convert_seconds_to_time(args.N_tiles**3*args.sleep*1.05)}") - print("Starting job submission...") - - - - for box in tqdm.tqdm(range(1,args.N_tiles**3+1), desc="Submitting jobs", unit="boxes"): - - # Check if the tile file already exists - if os.path.exists(args.workdir+f"{args.tilefile}{box}.h5"): - continue - - # Check if the slurm job is already running - job_id = get_job_id(f"{args.jobname}{box}") - if job_id is not None: - job_ids_array[box-1] = int(job_id) - time.sleep(args.sleep) - continue - - # Create the slurm script for the box - create_scola_slurm_script(args.slurmdir+args.slurmfile, str(box)) - - # Submit the job - job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(box)) - - # Check if the job was submitted successfully - if job_id is None: - print(f"Error submitting job for box {box}") - error_count+=1 - else: - job_ids_array[box-1] = int(job_id) - - # Sleep for a while before submitting the next job - time.sleep(args.sleep) - - counter_for_checks += 1 - - # Check if the error count exceeds the maximum - if error_count >= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - # Check if the resubmit count exceeds the maximum - if resubmit_count >= MAX_RESUBMIT: - raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") - - # Check the job status every CHECK_EVERY jobs - if counter_for_checks >= CHECK_EVERY: - - counter_for_checks = 0 - - job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count) - print_summary_job_status(job_status_categories, box, resubmit_count, error_count) - job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) - - - - print("All jobs submitted. Now checking the status of the jobs.") - - - job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) - # Now wait for all jobs to finish - while len(job_status_categories['CP'])= MAX_ERRORS: - raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") - # Check if the resubmit count exceeds the maximum - if resubmit_count >= MAX_RESUBMIT: - raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") - job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) - print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count) - job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) - - - print("All jobs finished.") - # Remove the slurm scripts - for box in range(1,args.N_tiles**3+1): - os.remove(args.slurmdir+args.slurmfile+"."+str(box)) - \ No newline at end of file