From d0a8439be74cab313b7d5ef5d777e33f5f3e94d6 Mon Sep 17 00:00:00 2001 From: Mayeul Aubin Date: Tue, 13 May 2025 15:41:31 +0200 Subject: [PATCH] scola_submit script for submitting and monitoring scola tiles jobs --- scripts/scola_submit.py | 347 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 347 insertions(+) create mode 100644 scripts/scola_submit.py diff --git a/scripts/scola_submit.py b/scripts/scola_submit.py new file mode 100644 index 0000000..e54714e --- /dev/null +++ b/scripts/scola_submit.py @@ -0,0 +1,347 @@ +import tqdm +import argparse +import numpy as np +import os +import subprocess +import time + + +def create_scola_slurm_script(slurmfile, box): + """ + Create a slurm script for sCOLA. + """ + # Read the slurm file + with open(slurmfile, "r") as f: + slurm_script = f.read() + + # Replace the placeholders in the slurm script + slurm_script = slurm_script.replace("BOX", box) + + # Write the modified slurm script to a new file + with open(slurmfile+f".{box}", "w") as f: + f.write(slurm_script) + + +def submit_slurm_job(slurmfile): + """ + Submit a slurm job using the sbatch command. + """ + # Submit the job + result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True) + + # Check if the submission was successful + if result.returncode != 0: + print(f"Error submitting job: {result.stderr}") + return None + + # Get the job ID from the output + job_id = result.stdout.split()[-1] + + return job_id + + +def convert_time_to_seconds(time_str): + """ + Convert a time string of the format D-HH:MM:SS or HH:MM:SS or MM:SS to seconds. + """ + time_parts = time_str.split("-") + if len(time_parts) == 2: + days = int(time_parts[0]) + time_str = time_parts[1] + else: + days = 0 + + time_parts = time_str.split(":") + if len(time_parts) == 3: + hours, minutes, seconds = map(int, time_parts) + elif len(time_parts) == 2: + hours, minutes = map(int, time_parts) + seconds = 0 + else: + raise ValueError("Invalid time format") + + total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds + return total_seconds + + +def convert_seconds_to_time(seconds): + """ + Convert seconds to a time string of the format D-HH:MM:SS. + """ + seconds = int(seconds) + if seconds < 0: + return "N/A" + days = seconds // 86400 + seconds %= 86400 + hours = seconds // 3600 + seconds %= 3600 + minutes = seconds // 60 + seconds %= 60 + + if days > 0: + return f"{days}-{hours:02}:{minutes:02}:{seconds:02}" + if hours > 0: + return f"{hours:02}:{minutes:02}:{seconds:02}" + return f"{minutes:02}:{seconds:02}" + + +def check_job_status(job_id): + """ + Check the status of a job using the squeue command. + Returns the job status and running time. + """ + # Check the job status + result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True) + + # Check if the job is still running + if result.returncode == 1: + return "X", -1 + + if len(result.stdout.split("\n")[1].split()) == 0: + return "X", -1 + + status = result.stdout.split("\n")[1].split()[4] + job_time = convert_time_to_seconds(result.stdout.split("\n")[1].split()[5]) + + return status, job_time + + +def get_job_id(jobname): + """ + Get the job ID from the job name using the squeue command. + """ + # Check the job status + result = subprocess.run(["squeue", "-n", jobname], capture_output=True, text=True) + + # Check if the job is still running + if result.returncode == 1: + return None + if len(result.stdout.split("\n")[1].split()) == 0: + return None + + # Get the job ID + job_id = result.stdout.split("\n")[1].split()[0] + + return job_id + + +def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count): + """ + Get the status of all previously submitted jobs. + For each job, check if it is running, completed, or failed. + If the job is failed, resubmit it. + """ + job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]} + + # Check the status of every previously submitted job + for prev_box in tqdm.tqdm(range(1,box), desc="Checking jobs", unit="boxes", leave=False, position=1): + # Check the job status + status, job_time = check_job_status(job_ids_array[prev_box-1]) + # Add the job status to the dictionary + if status not in job_status_categories: + job_status_categories[status] = [] + # If the status is 'X', check if the tile file was created + if status == 'X': + # Check if the tile file was created + if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"): + job_status_categories['CP'].append(prev_box) # Classify as completed + else: + # Resubmit the job + job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box)) + # Check if the job was submitted successfully + if job_id is None: + print(f"Error submitting job for box {box}") + error_count+=1 + else: + job_ids_array[prev_box-1] = int(job_id) + resubmit_count += 1 + job_status_categories[status].append(prev_box) # Classify as failed + # Sleep for a while before resubmitting the next job + time.sleep(args.sleep) + # If the status is not 'X', record the job status + else: + job_status_categories[status].append(prev_box) + + return job_status_categories, resubmit_count, error_count + + + +def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time): + """ + Cap the number of jobs to a maximum number. + """ + discard_categories = ['CP', 'X'] # Completed and Failed + # Check the number of running /pending jobs + job_num = 0 + for status in job_status_categories.keys(): + if status not in discard_categories: + job_num += len(job_status_categories[status]) + + # We wait until the number of jobs is below the maximum + while job_num > max_jobs: + print(f"Number of open jobs: {job_num} > {max_jobs}. Waiting...") + for status in job_status_categories.keys(): + if status not in discard_categories: + for box in job_status_categories[status]: + # Check the new job status + new_status, job_time = check_job_status(job_ids_array[box-1]) + time.sleep(sleep_time) + if new_status in discard_categories: + job_num -= 1 + job_status_categories[new_status].append(box) # WARNING: We do not reclassify 'X' jobs as 'CP' + job_status_categories[status].remove(box) + + return job_status_categories + + +def print_summary_job_status(job_status_categories, box, resubmit_count, error_count): + print("---------------------------------------------------") + # Print summary of job statuses + print(f"Job statuses after box {box}:") + # Print a table with columns for each status and below the % of jobs in that status + row0 = f"{'Status':<10}" + for status in job_status_categories.keys(): + row0 += f"{status:>10}" + print(row0) + row1 = f"{'Percentage':<10}" + for status in job_status_categories.keys(): + row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%" + print(row1) + # Print the rate of resubmissions + print(f"Resubmission rate: {resubmit_count/box*100:.2f}%") + print(f"Error count: {error_count}") + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Submit slurm jobs for sCOLA tiles.") + + parser.add_argument("-d", "--directory", type=str, default="./", help="Main directory where the output will be saved (if other dir and filenames are not specified).") + parser.add_argument("-sd", "--slurmdir", type=str, default=None, help="Directory where the slurm scripts are saved (default is -d/slurm_scripts).") + parser.add_argument("-wd", "--workdir", type=str, default=None, help="Directory where the tiles are saved (default is -d/work).") + + parser.add_argument("-sf","--slurmfile", type=str, default="scola_sCOLA.sh", help="Slurm script file (located in slurmdir).") + parser.add_argument("-tf","--tilefile", type=str, default="scola_tile", help="Tile file name (located in workdir).") + parser.add_argument("--jobname", type=str, default="sCOLA_", help="Job name for the slurm jobs.") + + parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.") + + parser.add_argument("--sleep", type=float, default=1.0, help="Sleep time between each job submission (in s).") + + args=parser.parse_args() + + + if args.slurmdir is None: + args.slurmdir = args.directory + "slurm_scripts/" + if args.workdir is None: + args.workdir = args.directory + "work/" + + + # Check that the slurm file exists + if not os.path.exists(args.slurmdir+args.slurmfile): + raise FileNotFoundError(f"Slurm file {args.slurmdir+args.slurmfile} does not exist.") + # Check that the work directory exists + if not os.path.exists(args.workdir): + raise FileNotFoundError(f"Work directory {args.workdir} does not exist.") + + MAX_ERRORS = 10 + MAX_RESUBMIT = int(0.1*args.N_tiles**3) # 10% of the total number of jobs + MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core + CHECK_EVERY = 100 + + error_count = 0 + resubmit_count = 0 + counter_for_checks = 0 + + job_ids_array = np.zeros((args.N_tiles**3,), dtype=int) + + + print("---------------------------------------------------") + print("Starting job submission for sCOLA tiles with the following parameters:") + print(f"Directory: {args.directory}") + print(f"Slurm file: {args.slurmdir}{args.slurmfile}") + print(f"Work directory: {args.workdir}") + print(f"Number of tiles: {args.N_tiles**3} tiles") + print(f"Sleep time: {args.sleep} s") + print(f"Max errors: {MAX_ERRORS} errors") + print(f"Max resubmits: {MAX_RESUBMIT} resubmits") + print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs") + print(f"Check every: {CHECK_EVERY} jobs") + print("---------------------------------------------------") + print(f"ETA: {convert_seconds_to_time(args.N_tiles**3*args.sleep*1.05)}") + print("Starting job submission...") + + + + for box in tqdm.tqdm(range(1,args.N_tiles**3+1), desc="Submitting jobs", unit="boxes"): + + # Check if the tile file already exists + if os.path.exists(args.workdir+f"{args.tilefile}{box}.h5"): + continue + + # Check if the slurm job is already running + job_id = get_job_id(f"{args.jobname}{box}") + if job_id is not None: + job_ids_array[box-1] = int(job_id) + time.sleep(args.sleep) + continue + + # Create the slurm script for the box + create_scola_slurm_script(args.slurmdir+args.slurmfile, str(box)) + + # Submit the job + job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(box)) + + # Check if the job was submitted successfully + if job_id is None: + print(f"Error submitting job for box {box}") + error_count+=1 + else: + job_ids_array[box-1] = int(job_id) + + # Sleep for a while before submitting the next job + time.sleep(args.sleep) + + counter_for_checks += 1 + + # Check if the error count exceeds the maximum + if error_count >= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + # Check if the resubmit count exceeds the maximum + if resubmit_count >= MAX_RESUBMIT: + raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + + # Check the job status every CHECK_EVERY jobs + if counter_for_checks >= CHECK_EVERY: + + counter_for_checks = 0 + + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count) + print_summary_job_status(job_status_categories, box, resubmit_count, error_count) + job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) + + + + print("All jobs submitted. Now checking the status of the jobs.") + + + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) + # Now wait for all jobs to finish + while len(job_status_categories['CP'])= MAX_ERRORS: + raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") + # Check if the resubmit count exceeds the maximum + if resubmit_count >= MAX_RESUBMIT: + raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") + job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count) + print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count) + job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep) + + + print("All jobs finished.") + # Remove the slurm scripts + for box in range(1,args.N_tiles**3+1): + os.remove(args.slurmdir+args.slurmfile+"."+str(box)) + \ No newline at end of file