Compare commits
3 commits
d0a8439be7
...
5ec799c46f
Author | SHA1 | Date | |
---|---|---|---|
5ec799c46f | |||
a693da3db0 | |||
a485db9465 |
6 changed files with 473 additions and 159 deletions
0
analysis/slices.py
Normal file → Executable file
0
analysis/slices.py
Normal file → Executable file
0
scripts/__init__.py
Normal file
0
scripts/__init__.py
Normal file
|
@ -2,6 +2,41 @@ from pysbmy.density import get_density_pm_snapshot
|
|||
from pysbmy.snapshot import read_snapshot
|
||||
import argparse
|
||||
|
||||
|
||||
def convert_snapshot_to_density(snapshot_path, output_path, N=None, corner=(0.0, 0.0, 0.0)):
|
||||
"""
|
||||
Convert a snapshot to a density field.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
snapshot_path : str
|
||||
Path to the snapshot file.
|
||||
output_path : str
|
||||
Path to the output density file.
|
||||
N : int
|
||||
Size of the density field grid (N x N x N).
|
||||
corner : tuple of float
|
||||
Corner of the box (x, y, z).
|
||||
"""
|
||||
# Read the snapshot
|
||||
print("Reading snapshot...")
|
||||
snap = read_snapshot(snapshot_path)
|
||||
|
||||
if N is None:
|
||||
N = snap.Np0
|
||||
|
||||
# Calculate density
|
||||
print("Calculating density...")
|
||||
F = get_density_pm_snapshot(snap, N, N, N, corner[0], corner[1], corner[2])
|
||||
|
||||
# Write density to file
|
||||
print("Writing density...")
|
||||
F.write(output_path)
|
||||
print("Density written to", output_path)
|
||||
print("Done.")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Convert snapshot to density.")
|
||||
parser.add_argument(
|
||||
|
@ -36,19 +71,9 @@ if __name__ == "__main__":
|
|||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Read the snapshot
|
||||
print("Reading snapshot...")
|
||||
snap = read_snapshot(args.snapshot)
|
||||
|
||||
if args.N is None:
|
||||
N = snap.Np0
|
||||
else:
|
||||
N = args.N
|
||||
|
||||
print("Calculating density...")
|
||||
F=get_density_pm_snapshot(snap, N,N,N, args.corner[0],args.corner[1],args.corner[2])
|
||||
|
||||
print("Writing density...")
|
||||
F.write(args.output)
|
||||
print("Density written to", args.output)
|
||||
print("Done.")
|
||||
convert_snapshot_to_density(
|
||||
snapshot_path=args.snapshot,
|
||||
output_path=args.output,
|
||||
N=args.N,
|
||||
corner=args.corner,
|
||||
)
|
110
scripts/field_to_field.py
Normal file
110
scripts/field_to_field.py
Normal file
|
@ -0,0 +1,110 @@
|
|||
from pysbmy.density import mesh_to_mesh
|
||||
from pysbmy.Field import Field, read_field
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
|
||||
def field_to_field(
|
||||
input_file:str,
|
||||
output_file:str,
|
||||
output_size:int|tuple[int,int,int]|list[int],
|
||||
output_dpm:float|tuple[float,float,float]|list[float],
|
||||
output_corner:tuple[float,float,float]|list[float],
|
||||
boundary_conditions:int,
|
||||
):
|
||||
|
||||
|
||||
if isinstance(output_size, int):
|
||||
output_size = (output_size, output_size, output_size) # N0, N1, N2
|
||||
|
||||
if isinstance(output_dpm, float):
|
||||
output_dpm = (output_dpm, output_dpm, output_dpm) # d0, d1, d2
|
||||
elif output_dpm is None:
|
||||
output_dpm = (-1, -1, -1)
|
||||
|
||||
if not os.path.exists(input_file):
|
||||
raise FileNotFoundError(f"Input file {input_file} does not exist.")
|
||||
|
||||
# Read the input field
|
||||
print(f"Reading input field from {input_file}")
|
||||
input_field = read_field(input_file)
|
||||
|
||||
if input_field.rank != 1 or input_field.data.ndim != 3:
|
||||
raise NotImplementedError("Only 3D scalar fields are supported for now.")
|
||||
|
||||
L0 = input_field.L0
|
||||
L1 = input_field.L1
|
||||
L2 = input_field.L2
|
||||
N0 = input_field.N0
|
||||
N1 = input_field.N1
|
||||
N2 = input_field.N2
|
||||
corner0 = input_field.corner0
|
||||
corner1 = input_field.corner1
|
||||
corner2 = input_field.corner2
|
||||
|
||||
d0_in = L0 / N0
|
||||
d1_in = L1 / N1
|
||||
d2_in = L2 / N2
|
||||
|
||||
d0_out = output_dpm[0] if output_dpm[0] > 0 else d0_in
|
||||
d1_out = output_dpm[1] if output_dpm[1] > 0 else d1_in
|
||||
d2_out = output_dpm[2] if output_dpm[2] > 0 else d2_in
|
||||
|
||||
offset_out_x = (output_corner[0] - corner0)
|
||||
offset_out_y = (output_corner[1] - corner1)
|
||||
offset_out_z = (output_corner[2] - corner2)
|
||||
|
||||
print("-----------------------------------------")
|
||||
print(f"Input field size: {N0} x {N1} x {N2}")
|
||||
print(f"Output field size: {output_size[0]} x {output_size[1]} x {output_size[2]}")
|
||||
print(f"Input field dpm: {d0_in:.3f} x {d1_in:.3f} x {d2_in:.3f}")
|
||||
print(f"Output field dpm: {d0_out:.3f} x {d1_out:.3f} x {d2_out:.3f}")
|
||||
print(f"Input field corner: {corner0:.1f} x {corner1:.1f} x {corner2:.1f}")
|
||||
print(f"Output field corner: {output_corner[0]:.1f} x {output_corner[1]:.1f} x {output_corner[2]:.1f}")
|
||||
print(f"Boundary conditions: {boundary_conditions}")
|
||||
print("-----------------------------------------")
|
||||
|
||||
input_grid = input_field.data
|
||||
|
||||
output_grid = np.zeros(output_size, dtype=input_grid.dtype)
|
||||
|
||||
|
||||
# Mesh to mesh interpolation
|
||||
print("Interpolating field...")
|
||||
mesh_to_mesh(input_grid, output_grid, d0_in, d1_in, d2_in, d0_out, d1_out, d2_out, offset_out_x, offset_out_y, offset_out_z, boundary_conditions)
|
||||
|
||||
# Create the output field
|
||||
output_field = Field(
|
||||
L0=output_size[0] * d0_out,
|
||||
L1=output_size[1] * d1_out,
|
||||
L2=output_size[2] * d2_out,
|
||||
corner0=output_corner[0],
|
||||
corner1=output_corner[1],
|
||||
corner2=output_corner[2],
|
||||
rank=input_field.rank,
|
||||
N0=output_size[0],
|
||||
N1=output_size[1],
|
||||
N2=output_size[2],
|
||||
time=input_field.time,
|
||||
data=output_grid
|
||||
)
|
||||
|
||||
# Write the output field
|
||||
print(f"Writing output field to {output_file}")
|
||||
output_field.write(output_file)
|
||||
print("Done.")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Convert a field from one size to another.")
|
||||
parser.add_argument("-i","--input_file", type=str, help="Input field file")
|
||||
parser.add_argument("-o","--output_file", type=str, help="Output field file")
|
||||
parser.add_argument("-N","--output_size", type=int, nargs=3, help="Output field size (N0, N1, N2)")
|
||||
parser.add_argument("-dpm","--output_dpm", type=float, nargs=3, default=None, help="Output field dpm (d0, d1, d2)")
|
||||
parser.add_argument("-corner","--output_corner", type=float, nargs=3, default=(0.,0.,0.), help="Output field corner (corner0, corner1, corner2)")
|
||||
parser.add_argument("-BC","--boundary_conditions", type=int, default=1, help="Boundary conditions (0: periodic, 1: non-periodic)")
|
||||
args = parser.parse_args()
|
||||
|
||||
field_to_field(args.input_file, args.output_file, args.output_size, args.output_dpm, args.output_corner, args.boundary_conditions)
|
|
@ -58,6 +58,47 @@ def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES):
|
|||
|
||||
|
||||
|
||||
def gather_tiles(folder, tile_base, L, Np, N_TILES, buffer):
|
||||
"""
|
||||
Gather sCOLA tiles into a single density field.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
folder : str
|
||||
Folder containing the tiles.
|
||||
tile_base : str
|
||||
Base name of the tiles.
|
||||
L : float
|
||||
Size of the box in Mpc/h.
|
||||
Np : int
|
||||
Number of cells per dimension for the full box.
|
||||
N_TILES : int
|
||||
Number of tiles per dimension.
|
||||
buffer : int
|
||||
Buffer size for the density field of tiles.
|
||||
"""
|
||||
Np_tile = Np//N_TILES
|
||||
dpm = L/Np_tile
|
||||
|
||||
print("Memory allocation for the grid...")
|
||||
A=np.zeros((Np,Np,Np), dtype=np.float32)
|
||||
|
||||
print("Starting to read the tiles...")
|
||||
gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES)
|
||||
|
||||
print("Finished reading the tiles.")
|
||||
A=density_to_delta(A,-1)
|
||||
|
||||
print("Converting to field...")
|
||||
F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A)
|
||||
|
||||
print("Saving field...")
|
||||
F.write(folder+"../results/final_density_sCOLA.h5")
|
||||
print("Density field saved to", folder+"../results/final_density_sCOLA.h5")
|
||||
print("Done.")
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Gather density from tiles.")
|
||||
|
@ -80,17 +121,4 @@ if __name__ == "__main__":
|
|||
Np_tile = Np//N_TILES
|
||||
dpm = L/Np_tile
|
||||
|
||||
print("Memory allocation for the grid...")
|
||||
A=np.zeros((Np,Np,Np), dtype=np.float32)
|
||||
|
||||
print("Starting to read the tiles...")
|
||||
gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES)
|
||||
|
||||
print("Finished reading the tiles.")
|
||||
A=density_to_delta(A,-1)
|
||||
|
||||
print("Converting to field...")
|
||||
F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A)
|
||||
|
||||
print("Saving field...")
|
||||
F.write(folder+"../results/final_density_sCOLA.h5")
|
||||
gather_tiles(folder, tile_base, L, Np, N_TILES, buffer)
|
|
@ -9,6 +9,12 @@ import time
|
|||
def create_scola_slurm_script(slurmfile, box):
|
||||
"""
|
||||
Create a slurm script for sCOLA.
|
||||
Parameters
|
||||
----------
|
||||
slurmfile : str
|
||||
Path to the slurm script file.
|
||||
box : str
|
||||
Box number to be replaced in the slurm script.
|
||||
"""
|
||||
# Read the slurm file
|
||||
with open(slurmfile, "r") as f:
|
||||
|
@ -25,6 +31,14 @@ def create_scola_slurm_script(slurmfile, box):
|
|||
def submit_slurm_job(slurmfile):
|
||||
"""
|
||||
Submit a slurm job using the sbatch command.
|
||||
Parameters
|
||||
----------
|
||||
slurmfile : str
|
||||
Path to the slurm script file.
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Job ID of the submitted job. None if the submission failed.
|
||||
"""
|
||||
# Submit the job
|
||||
result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True)
|
||||
|
@ -89,6 +103,16 @@ def check_job_status(job_id):
|
|||
"""
|
||||
Check the status of a job using the squeue command.
|
||||
Returns the job status and running time.
|
||||
Parameters
|
||||
----------
|
||||
job_id : str
|
||||
Job ID of the job to check.
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Job status. Possible values are 'R' (running), 'PD' (pending), 'X' (failed), 'CP' (completed).
|
||||
int
|
||||
Running time in seconds. -1 if the job is not found.
|
||||
"""
|
||||
# Check the job status
|
||||
result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True)
|
||||
|
@ -125,11 +149,96 @@ def get_job_id(jobname):
|
|||
return job_id
|
||||
|
||||
|
||||
def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
|
||||
def resubmit_job(slurmdir,slurmfile,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10):
|
||||
"""
|
||||
Resubmit a job if it has failed.
|
||||
Parameters
|
||||
----------
|
||||
slurmdir : str
|
||||
Directory where the slurm scripts are saved.
|
||||
slurmfile : str
|
||||
Slurm script file.
|
||||
job_ids_array : array
|
||||
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
|
||||
box : int
|
||||
Box number of the job to resubmit.
|
||||
resubmit_count : int
|
||||
Number of resubmissions so far.
|
||||
error_count : int
|
||||
Number of errors so far.
|
||||
MAX_RESUBMIT : int
|
||||
Maximum number of resubmissions allowed.
|
||||
MAX_ERRORS : int
|
||||
Maximum number of errors allowed.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Updated resubmit count.
|
||||
int
|
||||
Updated error count.
|
||||
"""
|
||||
|
||||
# Resubmit the job
|
||||
job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box))
|
||||
|
||||
# Check if the job was submitted successfully
|
||||
if job_id is None:
|
||||
print(f"Error resubmitting job for box {box}")
|
||||
error_count+=1
|
||||
# Check if the error count exceeds the maximum
|
||||
if error_count >= MAX_ERRORS:
|
||||
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||
else:
|
||||
job_ids_array[box-1] = int(job_id)
|
||||
|
||||
resubmit_count += 1
|
||||
# Check if the resubmit count exceeds the maximum
|
||||
if resubmit_count >= MAX_RESUBMIT:
|
||||
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
|
||||
|
||||
return resubmit_count, error_count
|
||||
|
||||
|
||||
|
||||
def check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleeptime,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT=10,MAX_ERRORS=10):
|
||||
"""
|
||||
Get the status of all previously submitted jobs.
|
||||
For each job, check if it is running, completed, or failed.
|
||||
If the job is failed, resubmit it.
|
||||
Parameters
|
||||
----------
|
||||
workdir : str
|
||||
Directory where the tiles are saved.
|
||||
slurmdir : str
|
||||
Directory where the slurm scripts are saved.
|
||||
slurmfile : str
|
||||
Slurm script file.
|
||||
tilefile : str
|
||||
Tile file name.
|
||||
sleeptime : float
|
||||
Sleep time between each job submission (in s).
|
||||
job_ids_array : array
|
||||
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
|
||||
box : int
|
||||
Up to which box the job status is checked.
|
||||
resubmit_count : int
|
||||
Number of resubmissions so far.
|
||||
error_count : int
|
||||
Number of errors so far.
|
||||
MAX_RESUBMIT : int
|
||||
Maximum number of resubmissions allowed.
|
||||
MAX_ERRORS : int
|
||||
Maximum number of errors allowed.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
Dictionary with the job status categories and their corresponding box numbers.
|
||||
int
|
||||
Updated resubmit count.
|
||||
int
|
||||
Updated error count.
|
||||
"""
|
||||
job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]}
|
||||
|
||||
|
@ -143,21 +252,13 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
|
|||
# If the status is 'X', check if the tile file was created
|
||||
if status == 'X':
|
||||
# Check if the tile file was created
|
||||
if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"):
|
||||
if os.path.exists(workdir+f"{tilefile}{prev_box}.h5"):
|
||||
job_status_categories['CP'].append(prev_box) # Classify as completed
|
||||
else:
|
||||
# Resubmit the job
|
||||
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box))
|
||||
# Check if the job was submitted successfully
|
||||
if job_id is None:
|
||||
print(f"Error submitting job for box {box}")
|
||||
error_count+=1
|
||||
else:
|
||||
job_ids_array[prev_box-1] = int(job_id)
|
||||
resubmit_count += 1
|
||||
resubmit_job(slurmdir,slurmfile,job_ids_array,prev_box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
|
||||
job_status_categories[status].append(prev_box) # Classify as failed
|
||||
# Sleep for a while before resubmitting the next job
|
||||
time.sleep(args.sleep)
|
||||
time.sleep(sleeptime)
|
||||
# If the status is not 'X', record the job status
|
||||
else:
|
||||
job_status_categories[status].append(prev_box)
|
||||
|
@ -169,6 +270,21 @@ def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
|
|||
def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time):
|
||||
"""
|
||||
Cap the number of jobs to a maximum number.
|
||||
Parameters
|
||||
----------
|
||||
job_status_categories : dict
|
||||
Dictionary with the job status categories and their corresponding box numbers.
|
||||
job_ids_array : array
|
||||
Array of job IDs for all previously submitted jobs. Indexed by box-1 number.
|
||||
max_jobs : int
|
||||
Maximum number of jobs allowed.
|
||||
sleep_time : float
|
||||
Sleep time between each job submission (in s).
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
Updated dictionary with the job status categories and their corresponding box numbers.
|
||||
"""
|
||||
discard_categories = ['CP', 'X'] # Completed and Failed
|
||||
# Check the number of running /pending jobs
|
||||
|
@ -199,11 +315,11 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c
|
|||
# Print summary of job statuses
|
||||
print(f"Job statuses after box {box}:")
|
||||
# Print a table with columns for each status and below the % of jobs in that status
|
||||
row0 = f"{'Status':<10}"
|
||||
row0 = f"{'Status':<14}"
|
||||
for status in job_status_categories.keys():
|
||||
row0 += f"{status:>10}"
|
||||
row0 += f"{status:>9} "
|
||||
print(row0)
|
||||
row1 = f"{'Percentage':<10}"
|
||||
row1 = f"{'Percentage':<14}"
|
||||
for status in job_status_categories.keys():
|
||||
row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%"
|
||||
print(row1)
|
||||
|
@ -213,6 +329,140 @@ def print_summary_job_status(job_status_categories, box, resubmit_count, error_c
|
|||
|
||||
|
||||
|
||||
def scola_submit(directory,
|
||||
slurmdir=None,
|
||||
workdir=None,
|
||||
slurmfile="scola_sCOLA.sh",
|
||||
tilefile="scola_tile",
|
||||
jobname="sCOLA_",
|
||||
N_tiles=4,
|
||||
sleep=1.5,
|
||||
force=False,
|
||||
MAX_ERRORS=10,
|
||||
MAX_RESUBMIT=10,
|
||||
MAX_JOBS_AT_ONCE=48,
|
||||
CHECK_EVERY=100):
|
||||
|
||||
if slurmdir is None:
|
||||
slurmdir = directory + "slurm_scripts/"
|
||||
if workdir is None:
|
||||
workdir = directory + "work/"
|
||||
|
||||
|
||||
# Check that the slurm file exists
|
||||
if not os.path.exists(slurmdir+slurmfile):
|
||||
raise FileNotFoundError(f"Slurm file {slurmdir+slurmfile} does not exist.")
|
||||
# Check that the work directory exists
|
||||
if not os.path.exists(workdir):
|
||||
raise FileNotFoundError(f"Work directory {workdir} does not exist.")
|
||||
|
||||
# If force, remove all pre-existing tile files
|
||||
if force:
|
||||
count_removed = 0
|
||||
for box in range(1,N_tiles**3+1):
|
||||
if os.path.exists(workdir+f"{tilefile}{box}.h5"):
|
||||
os.remove(workdir+f"{tilefile}{box}.h5")
|
||||
count_removed += 1
|
||||
print(f"Removed {count_removed} ({100*count_removed/N_tiles**3:.1f}%) pre-existing tile files.")
|
||||
|
||||
# MAX_ERRORS = 10
|
||||
if MAX_RESUBMIT is None:
|
||||
MAX_RESUBMIT = int(0.1*N_tiles**3) # 10% of the total number of jobs
|
||||
# MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core
|
||||
# CHECK_EVERY = 100
|
||||
|
||||
error_count = 0
|
||||
resubmit_count = 0
|
||||
counter_for_checks = 0
|
||||
|
||||
job_ids_array = np.zeros((N_tiles**3,), dtype=int)
|
||||
|
||||
|
||||
print("---------------------------------------------------")
|
||||
print("Starting job submission for sCOLA tiles with the following parameters:")
|
||||
print(f"Directory: {directory}")
|
||||
print(f"Slurm file: {slurmdir}{slurmfile}")
|
||||
print(f"Work directory: {workdir}")
|
||||
print(f"Number of tiles: {N_tiles**3} tiles")
|
||||
print(f"Sleep time: {sleep} s")
|
||||
print(f"Max errors: {MAX_ERRORS} errors")
|
||||
print(f"Max resubmits: {MAX_RESUBMIT} resubmits")
|
||||
print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs")
|
||||
print(f"Check every: {CHECK_EVERY} jobs")
|
||||
print("---------------------------------------------------")
|
||||
print(f"ETA: {convert_seconds_to_time(N_tiles**3*sleep*1.2)}")
|
||||
print("Starting job submission...")
|
||||
|
||||
|
||||
|
||||
for box in tqdm.tqdm(range(1,N_tiles**3+1), desc="Submitting jobs", unit="boxes"):
|
||||
|
||||
# Check if the tile file already exists
|
||||
if os.path.exists(workdir+f"{tilefile}{box}.h5"):
|
||||
continue
|
||||
|
||||
# Check if the slurm job is already running
|
||||
job_id = get_job_id(f"{jobname}{box}")
|
||||
if job_id is not None:
|
||||
job_ids_array[box-1] = int(job_id)
|
||||
time.sleep(sleep)
|
||||
continue
|
||||
|
||||
# Create the slurm script for the box
|
||||
create_scola_slurm_script(slurmdir+slurmfile, str(box))
|
||||
|
||||
# Submit the job
|
||||
job_id = submit_slurm_job(slurmdir+slurmfile+"."+str(box))
|
||||
|
||||
# Check if the job was submitted successfully
|
||||
if job_id is None:
|
||||
print(f"Error submitting job for box {box}")
|
||||
error_count+=1
|
||||
else:
|
||||
job_ids_array[box-1] = int(job_id)
|
||||
|
||||
# Sleep for a while before submitting the next job
|
||||
time.sleep(sleep)
|
||||
|
||||
counter_for_checks += 1
|
||||
|
||||
# Check if the error count exceeds the maximum
|
||||
if error_count >= MAX_ERRORS:
|
||||
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||
|
||||
# Check the job status every CHECK_EVERY jobs
|
||||
if counter_for_checks >= CHECK_EVERY:
|
||||
|
||||
counter_for_checks = 0
|
||||
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,box,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
|
||||
print_summary_job_status(job_status_categories, box, resubmit_count, error_count)
|
||||
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep)
|
||||
|
||||
|
||||
|
||||
print("All jobs submitted. Now checking the status of the jobs.")
|
||||
|
||||
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
|
||||
# Now wait for all jobs to finish
|
||||
while len(job_status_categories['CP'])<N_tiles**3:
|
||||
time.sleep(10*sleep)
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(workdir,slurmdir,slurmfile,tilefile,sleep,job_ids_array,N_tiles**3+1,resubmit_count,error_count,MAX_RESUBMIT,MAX_ERRORS)
|
||||
print_summary_job_status(job_status_categories, N_tiles**3, resubmit_count, error_count)
|
||||
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,sleep)
|
||||
|
||||
|
||||
print("All jobs finished.")
|
||||
# Remove the slurm scripts
|
||||
for box in range(1,N_tiles**3+1):
|
||||
if os.path.exists(slurmdir+slurmfile+"."+str(box)):
|
||||
os.remove(slurmdir+slurmfile+"."+str(box))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Submit slurm jobs for sCOLA tiles.")
|
||||
|
||||
|
@ -220,128 +470,29 @@ if __name__ == "__main__":
|
|||
parser.add_argument("-sd", "--slurmdir", type=str, default=None, help="Directory where the slurm scripts are saved (default is -d/slurm_scripts).")
|
||||
parser.add_argument("-wd", "--workdir", type=str, default=None, help="Directory where the tiles are saved (default is -d/work).")
|
||||
|
||||
parser.add_argument("-sf","--slurmfile", type=str, default="scola_sCOLA.sh", help="Slurm script file (located in slurmdir).")
|
||||
parser.add_argument("-tf","--tilefile", type=str, default="scola_tile", help="Tile file name (located in workdir).")
|
||||
parser.add_argument("--jobname", type=str, default="sCOLA_", help="Job name for the slurm jobs.")
|
||||
parser.add_argument("-sf","--slurmfile", type=str, default="scola_sCOLA.sh", help="Slurm script file (located in slurmdir, default is scola_sCOLA.sh).")
|
||||
parser.add_argument("-tf","--tilefile", type=str, default="scola_tile", help="Tile file name (located in workdir, default is scola_tile).")
|
||||
parser.add_argument("--jobname", type=str, default="sCOLA_", help="Job name for the slurm jobs (default is sCOLA_).")
|
||||
|
||||
parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.")
|
||||
|
||||
parser.add_argument("--sleep", type=float, default=1.0, help="Sleep time between each job submission (in s).")
|
||||
parser.add_argument("--sleep", type=float, default=1.5, help="Sleep time between each job submission (in s).")
|
||||
|
||||
parser.add_argument("-F","--force", action="store_true", help="Force to resimulate all tiles, even if they already exist.")
|
||||
|
||||
args=parser.parse_args()
|
||||
|
||||
|
||||
if args.slurmdir is None:
|
||||
args.slurmdir = args.directory + "slurm_scripts/"
|
||||
if args.workdir is None:
|
||||
args.workdir = args.directory + "work/"
|
||||
|
||||
|
||||
# Check that the slurm file exists
|
||||
if not os.path.exists(args.slurmdir+args.slurmfile):
|
||||
raise FileNotFoundError(f"Slurm file {args.slurmdir+args.slurmfile} does not exist.")
|
||||
# Check that the work directory exists
|
||||
if not os.path.exists(args.workdir):
|
||||
raise FileNotFoundError(f"Work directory {args.workdir} does not exist.")
|
||||
|
||||
MAX_ERRORS = 10
|
||||
MAX_RESUBMIT = int(0.1*args.N_tiles**3) # 10% of the total number of jobs
|
||||
MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core
|
||||
CHECK_EVERY = 100
|
||||
|
||||
error_count = 0
|
||||
resubmit_count = 0
|
||||
counter_for_checks = 0
|
||||
|
||||
job_ids_array = np.zeros((args.N_tiles**3,), dtype=int)
|
||||
|
||||
|
||||
print("---------------------------------------------------")
|
||||
print("Starting job submission for sCOLA tiles with the following parameters:")
|
||||
print(f"Directory: {args.directory}")
|
||||
print(f"Slurm file: {args.slurmdir}{args.slurmfile}")
|
||||
print(f"Work directory: {args.workdir}")
|
||||
print(f"Number of tiles: {args.N_tiles**3} tiles")
|
||||
print(f"Sleep time: {args.sleep} s")
|
||||
print(f"Max errors: {MAX_ERRORS} errors")
|
||||
print(f"Max resubmits: {MAX_RESUBMIT} resubmits")
|
||||
print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs")
|
||||
print(f"Check every: {CHECK_EVERY} jobs")
|
||||
print("---------------------------------------------------")
|
||||
print(f"ETA: {convert_seconds_to_time(args.N_tiles**3*args.sleep*1.05)}")
|
||||
print("Starting job submission...")
|
||||
|
||||
|
||||
|
||||
for box in tqdm.tqdm(range(1,args.N_tiles**3+1), desc="Submitting jobs", unit="boxes"):
|
||||
|
||||
# Check if the tile file already exists
|
||||
if os.path.exists(args.workdir+f"{args.tilefile}{box}.h5"):
|
||||
continue
|
||||
|
||||
# Check if the slurm job is already running
|
||||
job_id = get_job_id(f"{args.jobname}{box}")
|
||||
if job_id is not None:
|
||||
job_ids_array[box-1] = int(job_id)
|
||||
time.sleep(args.sleep)
|
||||
continue
|
||||
|
||||
# Create the slurm script for the box
|
||||
create_scola_slurm_script(args.slurmdir+args.slurmfile, str(box))
|
||||
|
||||
# Submit the job
|
||||
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(box))
|
||||
|
||||
# Check if the job was submitted successfully
|
||||
if job_id is None:
|
||||
print(f"Error submitting job for box {box}")
|
||||
error_count+=1
|
||||
else:
|
||||
job_ids_array[box-1] = int(job_id)
|
||||
|
||||
# Sleep for a while before submitting the next job
|
||||
time.sleep(args.sleep)
|
||||
|
||||
counter_for_checks += 1
|
||||
|
||||
# Check if the error count exceeds the maximum
|
||||
if error_count >= MAX_ERRORS:
|
||||
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||
# Check if the resubmit count exceeds the maximum
|
||||
if resubmit_count >= MAX_RESUBMIT:
|
||||
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
|
||||
|
||||
# Check the job status every CHECK_EVERY jobs
|
||||
if counter_for_checks >= CHECK_EVERY:
|
||||
|
||||
counter_for_checks = 0
|
||||
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count)
|
||||
print_summary_job_status(job_status_categories, box, resubmit_count, error_count)
|
||||
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
|
||||
scola_submit(args.directory,
|
||||
slurmdir=args.slurmdir,
|
||||
workdir=args.workdir,
|
||||
slurmfile=args.slurmfile,
|
||||
tilefile=args.tilefile,
|
||||
jobname=args.jobname,
|
||||
N_tiles=args.N_tiles,
|
||||
sleep=args.sleep,
|
||||
force=args.force)
|
||||
|
||||
|
||||
|
||||
print("All jobs submitted. Now checking the status of the jobs.")
|
||||
|
||||
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
|
||||
# Now wait for all jobs to finish
|
||||
while len(job_status_categories['CP'])<args.N_tiles**3:
|
||||
time.sleep(10*args.sleep)
|
||||
# Check if the error count exceeds the maximum
|
||||
if error_count >= MAX_ERRORS:
|
||||
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||
# Check if the resubmit count exceeds the maximum
|
||||
if resubmit_count >= MAX_RESUBMIT:
|
||||
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
|
||||
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
|
||||
print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count)
|
||||
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
|
||||
|
||||
|
||||
print("All jobs finished.")
|
||||
# Remove the slurm scripts
|
||||
for box in range(1,args.N_tiles**3+1):
|
||||
os.remove(args.slurmdir+args.slurmfile+"."+str(box))
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue