import tqdm import argparse import numpy as np import os import subprocess import time def create_scola_slurm_script(slurmfile, box): """ Create a slurm script for sCOLA. Parameters ---------- slurmfile : str Path to the slurm script file. box : str Box number to be replaced in the slurm script. """ # Read the slurm file with open(slurmfile, "r") as f: slurm_script = f.read() # Replace the placeholders in the slurm script slurm_script = slurm_script.replace("BOX", box) # Write the modified slurm script to a new file with open(slurmfile+f".{box}", "w") as f: f.write(slurm_script) def submit_slurm_job(slurmfile): """ Submit a slurm job using the sbatch command. Parameters ---------- slurmfile : str Path to the slurm script file. Returns ------- str Job ID of the submitted job. None if the submission failed. """ # Submit the job result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True) # Check if the submission was successful if result.returncode != 0: print(f"Error submitting job: {result.stderr}") return None # Get the job ID from the output job_id = result.stdout.split()[-1] return job_id def convert_time_to_seconds(time_str): """ Convert a time string of the format D-HH:MM:SS or HH:MM:SS or MM:SS to seconds. """ time_parts = time_str.split("-") if len(time_parts) == 2: days = int(time_parts[0]) time_str = time_parts[1] else: days = 0 time_parts = time_str.split(":") if len(time_parts) == 3: hours, minutes, seconds = map(int, time_parts) elif len(time_parts) == 2: hours, minutes = map(int, time_parts) seconds = 0 else: raise ValueError("Invalid time format") total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds return total_seconds def convert_seconds_to_time(seconds): """ Convert seconds to a time string of the format D-HH:MM:SS. """ seconds = int(seconds) if seconds < 0: return "N/A" days = seconds // 86400 seconds %= 86400 hours = seconds // 3600 seconds %= 3600 minutes = seconds // 60 seconds %= 60 if days > 0: return f"{days}-{hours:02}:{minutes:02}:{seconds:02}" if hours > 0: return f"{hours:02}:{minutes:02}:{seconds:02}" return f"{minutes:02}:{seconds:02}" def check_job_status(job_id): """ Check the status of a job using the squeue command. Returns the job status and running time. Parameters ---------- job_id : str Job ID of the job to check. Returns ------- str Job status. Possible values are 'R' (running), 'PD' (pending), 'X' (failed), 'CP' (completed). int Running time in seconds. -1 if the job is not found. """ # Check the job status result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True) # Check if the job is still running if result.returncode == 1: return "X", -1 if len(result.stdout.split("\n")[1].split()) == 0: return "X", -1 status = result.stdout.split("\n")[1].split()[4] job_time = convert_time_to_seconds(result.stdout.split("\n")[1].split()[5]) return status, job_time def get_job_id(jobname): """ Get the job ID from the job name using the squeue command. """ # Check the job status result = subprocess.run(["squeue", "-n", jobname], capture_output=True, text=True) # Check if the job is still running if result.returncode == 1: return None if len(result.stdout.split("\n")[1].split()) == 0: return None # Get the job ID job_id = result.stdout.split("\n")[1].split()[0] return job_id def resubmit_job(slurmdir,slurmfile,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): """ Resubmit a job if it has failed. Parameters ---------- slurmdir : str Directory where the slurm scripts are saved. slurmfile : str Slurm script file. job_ids_array : array Array of job IDs for all previously submitted jobs. Indexed by box-1 number. box : int Box number of the job to resubmit. resubmit_count : int Number of resubmissions so far. error_count : int Number of errors so far. MAX_RESUBMIT : int Maximum number of resubmissions allowed. MAX_ERRORS : int Maximum number of errors allowed. Returns ------- int Updated resubmit count. int Updated error count. """ # Resubmit the job job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) # Check if the job was submitted successfully if job_id is None: print(f"Error resubmitting job for box {box}") error_count+=1 # Check if the error count exceeds the maximum if error_count >= MAX_ERRORS: raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") else: job_ids_array[box-1] = int(job_id) resubmit_count += 1 # Check if the resubmit count exceeds the maximum if resubmit_count >= MAX_RESUBMIT: raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.") return resubmit_count, error_count def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10): """ Get the status of all previously submitted jobs. For each job, check if it is running, completed, or failed. If the job is failed, resubmit it. Parameters ---------- workdir : str Directory where the tiles are saved. slurmdir : str Directory where the slurm scripts are saved. slurmfile : str Slurm script file. tilefile : str Tile file name. sleeptime : float Sleep time between each job submission (in s). job_ids_array : array Array of job IDs for all previously submitted jobs. Indexed by box-1 number. box : int Up to which box the job status is checked. resubmit_count : int Number of resubmissions so far. error_count : int Number of errors so far. MAX_RESUBMIT : int Maximum number of resubmissions allowed. MAX_ERRORS : int Maximum number of errors allowed. Returns ------- dict Dictionary with the job status categories and their corresponding box numbers. int Updated resubmit count. int Updated error count. """ job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]} # Check the status of every previously submitted job for prev_box in tqdm.tqdm(range(1,box), desc="Checking jobs", unit="boxes", leave=False, position=1): # Check the job status status, job_time = check_job_status(job_ids_array[prev_box-1]) # Add the job status to the dictionary if status not in job_status_categories: job_status_categories[status] = [] # If the status is 'X', check if the tile file was created if status == 'X': # Check if the tile file was created if os.path.exists(workdir+f"{tilefile}{prev_box}.h5"): job_status_categories['CP'].append(prev_box) # Classify as completed else: resubmit_job(slurmdir,slurmfile,job_ids_array,prev_box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) job_status_categories[status].append(prev_box) # Classify as failed # Sleep for a while before resubmitting the next job time.sleep(sleeptime) # If the status is not 'X', record the job status else: job_status_categories[status].append(prev_box) return job_status_categories, resubmit_count, error_count def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time): """ Cap the number of jobs to a maximum number. Parameters ---------- job_status_categories : dict Dictionary with the job status categories and their corresponding box numbers. job_ids_array : array Array of job IDs for all previously submitted jobs. Indexed by box-1 number. max_jobs : int Maximum number of jobs allowed. sleep_time : float Sleep time between each job submission (in s). Returns ------- dict Updated dictionary with the job status categories and their corresponding box numbers. """ discard_categories = ['CP', 'X'] # Completed and Failed # Check the number of running /pending jobs job_num = 0 for status in job_status_categories.keys(): if status not in discard_categories: job_num += len(job_status_categories[status]) # We wait until the number of jobs is below the maximum while job_num > max_jobs: print(f"Number of open jobs: {job_num} > {max_jobs}. Waiting...") for status in job_status_categories.keys(): if status not in discard_categories: for box in job_status_categories[status]: # Check the new job status new_status, job_time = check_job_status(job_ids_array[box-1]) time.sleep(sleep_time) if new_status in discard_categories: job_num -= 1 job_status_categories[new_status].append(box) # WARNING: We do not reclassify 'X' jobs as 'CP' job_status_categories[status].remove(box) return job_status_categories def print_summary_job_status(job_status_categories, box, resubmit_count, error_count): print("---------------------------------------------------") # Print summary of job statuses print(f"Job statuses after box {box}:") # Print a table with columns for each status and below the % of jobs in that status row0 = f"{'Status':<14}" for status in job_status_categories.keys(): row0 += f"{status:>9} " print(row0) row1 = f"{'Percentage':<14}" for status in job_status_categories.keys(): row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%" print(row1) # Print the rate of resubmissions print(f"Resubmission rate: {resubmit_count/box*100:.2f}%") print(f"Error count: {error_count}") def scola_submit(directory, slurmdir=None, workdir=None, slurmfile="scola_sCOLA.sh", tilefile="scola_tile", jobname="sCOLA_", N_tiles=4, sleep=1.5, force=False, MAX_ERRORS=10, MAX_RESUBMIT=10, MAX_JOBS_AT_ONCE=48, CHECK_EVERY=100): if slurmdir is None: slurmdir = directory + "slurm_scripts/" if workdir is None: workdir = directory + "work/" # Check that the slurm file exists if not os.path.exists(slurmdir+slurmfile): raise FileNotFoundError(f"Slurm file {slurmdir+slurmfile} does not exist.") # Check that the work directory exists if not os.path.exists(workdir): raise FileNotFoundError(f"Work directory {workdir} does not exist.") # If force, remove all pre-existing tile files if force: count_removed = 0 for box in range(1,N_tiles**3+1): if os.path.exists(workdir+f"{tilefile}{box}.h5"): os.remove(workdir+f"{tilefile}{box}.h5") count_removed += 1 print(f"Removed {count_removed} ({100*count_removed/N_tiles**3:.1f}%) pre-existing tile files.") # MAX_ERRORS = 10 if MAX_RESUBMIT is None: MAX_RESUBMIT = int(0.1*N_tiles**3) # 10% of the total number of jobs # MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core # CHECK_EVERY = 100 error_count = 0 resubmit_count = 0 counter_for_checks = 0 job_ids_array = np.zeros((N_tiles**3,), dtype=int) print("---------------------------------------------------") print("Starting job submission for sCOLA tiles with the following parameters:") print(f"Directory: {directory}") print(f"Slurm file: {slurmdir}{slurmfile}") print(f"Work directory: {workdir}") print(f"Number of tiles: {N_tiles**3} tiles") print(f"Sleep time: {sleep} s") print(f"Max errors: {MAX_ERRORS} errors") print(f"Max resubmits: {MAX_RESUBMIT} resubmits") print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs") print(f"Check every: {CHECK_EVERY} jobs") print("---------------------------------------------------") print(f"ETA: {convert_seconds_to_time(N_tiles**3*sleep*1.2)}") print("Starting job submission...") for box in tqdm.tqdm(range(1,N_tiles**3+1), desc="Submitting jobs", unit="boxes"): # Check if the tile file already exists if os.path.exists(workdir+f"{tilefile}{box}.h5"): continue # Check if the slurm job is already running job_id = get_job_id(f"{jobname}{box}") if job_id is not None: job_ids_array[box-1] = int(job_id) time.sleep(sleep) continue # Create the slurm script for the box create_scola_slurm_script(slurmdir+slurmfile, str(box)) # Submit the job job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box)) # Check if the job was submitted successfully if job_id is None: print(f"Error submitting job for box {box}") error_count+=1 else: job_ids_array[box-1] = int(job_id) # Sleep for a while before submitting the next job time.sleep(sleep) counter_for_checks += 1 # Check if the error count exceeds the maximum if error_count >= MAX_ERRORS: raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.") # Check the job status every CHECK_EVERY jobs if counter_for_checks >= CHECK_EVERY: counter_for_checks = 0 job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) print_summary_job_status(job_status_categories, box, resubmit_count, error_count) job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep) print("All jobs submitted. Now checking the status of the jobs.") job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS) # Now wait for all jobs to finish while len(job_status_categories['CP'])