Compare commits
3 commits
9108ec488c
...
d0a8439be7
Author | SHA1 | Date | |
---|---|---|---|
d0a8439be7 | |||
39e101c2fa | |||
bdf38e6dd0 |
3 changed files with 443 additions and 0 deletions
96
scripts/gather_tiles.py
Normal file
96
scripts/gather_tiles.py
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
import numpy as np
|
||||||
|
from pysbmy.snapshot import read_tile
|
||||||
|
from pysbmy.field import Field
|
||||||
|
from pysbmy.density import get_density_pm, density_to_delta
|
||||||
|
from pysbmy import c_float_t
|
||||||
|
import tqdm
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def get_indices(tile, N_TILES0, N_TILES1, N_TILES2):
|
||||||
|
"""Get the indices of the tile in a 3D grid."""
|
||||||
|
i = (tile // (N_TILES1 * N_TILES2))%N_TILES0
|
||||||
|
j = ((tile - N_TILES1 * N_TILES2 * i) // N_TILES2)%N_TILES1
|
||||||
|
k = (tile - N_TILES2 * j - N_TILES1 * N_TILES2 * i)%N_TILES2
|
||||||
|
return i, j, k
|
||||||
|
|
||||||
|
|
||||||
|
def tile_to_density_with_buffer(T, N, d, buffer):
|
||||||
|
"""
|
||||||
|
Convert a tile to density with a buffer.
|
||||||
|
"""
|
||||||
|
# Create a buffer for the density
|
||||||
|
A = np.zeros((N+2*buffer, N+2*buffer, N+2*buffer), dtype=np.float32)
|
||||||
|
|
||||||
|
# Get the density of the tile
|
||||||
|
A = get_density_pm(T.pos-T.corner_position.astype(c_float_t)+d*buffer, A, d)
|
||||||
|
return A
|
||||||
|
|
||||||
|
|
||||||
|
def add_density_tile(A,A_tile,corner_indices):
|
||||||
|
N = A.shape[0]
|
||||||
|
Np = A_tile.shape[0]
|
||||||
|
i0, j0, k0 = corner_indices
|
||||||
|
|
||||||
|
# Create index arrays for the tile
|
||||||
|
i_idx = (np.arange(Np) + i0) % N
|
||||||
|
j_idx = (np.arange(Np) + j0) % N
|
||||||
|
k_idx = (np.arange(Np) + k0) % N
|
||||||
|
|
||||||
|
# Use np.ix_ to get all combinations of indices
|
||||||
|
A[np.ix_(i_idx, j_idx, k_idx)] += A_tile
|
||||||
|
|
||||||
|
|
||||||
|
def gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES):
|
||||||
|
"""
|
||||||
|
Gather the density from the tiles.
|
||||||
|
"""
|
||||||
|
for tile in tqdm.tqdm(range(N_TILES**3), desc="Density of tiles", unit="tiles"):
|
||||||
|
T=read_tile(folder+tile_base+str(tile+1)+".h5", int(Np_tile**3))
|
||||||
|
# Get the corner position of the tile
|
||||||
|
corner_position = T.corner_position
|
||||||
|
# Get the corner indices of the tile
|
||||||
|
i,j,k = get_indices(tile, N_TILES, N_TILES, N_TILES)
|
||||||
|
corner_grid_indices = (i*Np_tile-buffer, j*Np_tile-buffer, k*Np_tile-buffer)
|
||||||
|
# Get the density of the tile with a buffer
|
||||||
|
A_tile = tile_to_density_with_buffer(T, Np_tile, dpm, buffer)
|
||||||
|
# Add the density of the tile to the grid
|
||||||
|
add_density_tile(A, A_tile, corner_grid_indices)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Gather density from tiles.")
|
||||||
|
parser.add_argument("-d","--folder", type=str, default="./", help="Folder containing the tiles")
|
||||||
|
parser.add_argument("--tile_base", type=str, default="sCOLA_tile", help="Base name of the tiles")
|
||||||
|
parser.add_argument("-L","--L", type=int, default=1920, help="Size of the box in Mpc/h")
|
||||||
|
parser.add_argument("-Np","--Np", type=int, default=80, help="Number of cells per dimension for the full box")
|
||||||
|
parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.")
|
||||||
|
parser.add_argument("--buffer", type=int, default=40, help="Buffer size for the density field of tiles")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
L = args.L
|
||||||
|
Np = args.Np
|
||||||
|
N_TILES = args.N_tiles
|
||||||
|
buffer = args.buffer
|
||||||
|
|
||||||
|
folder = args.folder
|
||||||
|
tile_base = args.tile_base
|
||||||
|
|
||||||
|
Np_tile = Np//N_TILES
|
||||||
|
dpm = L/Np_tile
|
||||||
|
|
||||||
|
print("Memory allocation for the grid...")
|
||||||
|
A=np.zeros((Np,Np,Np), dtype=np.float32)
|
||||||
|
|
||||||
|
print("Starting to read the tiles...")
|
||||||
|
gather_density(A, folder, tile_base, Np_tile, dpm, buffer, N_TILES)
|
||||||
|
|
||||||
|
print("Finished reading the tiles.")
|
||||||
|
A=density_to_delta(A,-1)
|
||||||
|
|
||||||
|
print("Converting to field...")
|
||||||
|
F=Field(L,L,L, 0.,0.,0., 1, Np,Np,Np, 1., A)
|
||||||
|
|
||||||
|
print("Saving field...")
|
||||||
|
F.write(folder+"../results/final_density_sCOLA.h5")
|
347
scripts/scola_submit.py
Normal file
347
scripts/scola_submit.py
Normal file
|
@ -0,0 +1,347 @@
|
||||||
|
import tqdm
|
||||||
|
import argparse
|
||||||
|
import numpy as np
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def create_scola_slurm_script(slurmfile, box):
|
||||||
|
"""
|
||||||
|
Create a slurm script for sCOLA.
|
||||||
|
"""
|
||||||
|
# Read the slurm file
|
||||||
|
with open(slurmfile, "r") as f:
|
||||||
|
slurm_script = f.read()
|
||||||
|
|
||||||
|
# Replace the placeholders in the slurm script
|
||||||
|
slurm_script = slurm_script.replace("BOX", box)
|
||||||
|
|
||||||
|
# Write the modified slurm script to a new file
|
||||||
|
with open(slurmfile+f".{box}", "w") as f:
|
||||||
|
f.write(slurm_script)
|
||||||
|
|
||||||
|
|
||||||
|
def submit_slurm_job(slurmfile):
|
||||||
|
"""
|
||||||
|
Submit a slurm job using the sbatch command.
|
||||||
|
"""
|
||||||
|
# Submit the job
|
||||||
|
result = subprocess.run(["sbatch", slurmfile], capture_output=True, text=True)
|
||||||
|
|
||||||
|
# Check if the submission was successful
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"Error submitting job: {result.stderr}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Get the job ID from the output
|
||||||
|
job_id = result.stdout.split()[-1]
|
||||||
|
|
||||||
|
return job_id
|
||||||
|
|
||||||
|
|
||||||
|
def convert_time_to_seconds(time_str):
|
||||||
|
"""
|
||||||
|
Convert a time string of the format D-HH:MM:SS or HH:MM:SS or MM:SS to seconds.
|
||||||
|
"""
|
||||||
|
time_parts = time_str.split("-")
|
||||||
|
if len(time_parts) == 2:
|
||||||
|
days = int(time_parts[0])
|
||||||
|
time_str = time_parts[1]
|
||||||
|
else:
|
||||||
|
days = 0
|
||||||
|
|
||||||
|
time_parts = time_str.split(":")
|
||||||
|
if len(time_parts) == 3:
|
||||||
|
hours, minutes, seconds = map(int, time_parts)
|
||||||
|
elif len(time_parts) == 2:
|
||||||
|
hours, minutes = map(int, time_parts)
|
||||||
|
seconds = 0
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid time format")
|
||||||
|
|
||||||
|
total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds
|
||||||
|
return total_seconds
|
||||||
|
|
||||||
|
|
||||||
|
def convert_seconds_to_time(seconds):
|
||||||
|
"""
|
||||||
|
Convert seconds to a time string of the format D-HH:MM:SS.
|
||||||
|
"""
|
||||||
|
seconds = int(seconds)
|
||||||
|
if seconds < 0:
|
||||||
|
return "N/A"
|
||||||
|
days = seconds // 86400
|
||||||
|
seconds %= 86400
|
||||||
|
hours = seconds // 3600
|
||||||
|
seconds %= 3600
|
||||||
|
minutes = seconds // 60
|
||||||
|
seconds %= 60
|
||||||
|
|
||||||
|
if days > 0:
|
||||||
|
return f"{days}-{hours:02}:{minutes:02}:{seconds:02}"
|
||||||
|
if hours > 0:
|
||||||
|
return f"{hours:02}:{minutes:02}:{seconds:02}"
|
||||||
|
return f"{minutes:02}:{seconds:02}"
|
||||||
|
|
||||||
|
|
||||||
|
def check_job_status(job_id):
|
||||||
|
"""
|
||||||
|
Check the status of a job using the squeue command.
|
||||||
|
Returns the job status and running time.
|
||||||
|
"""
|
||||||
|
# Check the job status
|
||||||
|
result = subprocess.run(["squeue", "-j", str(job_id)], capture_output=True, text=True)
|
||||||
|
|
||||||
|
# Check if the job is still running
|
||||||
|
if result.returncode == 1:
|
||||||
|
return "X", -1
|
||||||
|
|
||||||
|
if len(result.stdout.split("\n")[1].split()) == 0:
|
||||||
|
return "X", -1
|
||||||
|
|
||||||
|
status = result.stdout.split("\n")[1].split()[4]
|
||||||
|
job_time = convert_time_to_seconds(result.stdout.split("\n")[1].split()[5])
|
||||||
|
|
||||||
|
return status, job_time
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_id(jobname):
|
||||||
|
"""
|
||||||
|
Get the job ID from the job name using the squeue command.
|
||||||
|
"""
|
||||||
|
# Check the job status
|
||||||
|
result = subprocess.run(["squeue", "-n", jobname], capture_output=True, text=True)
|
||||||
|
|
||||||
|
# Check if the job is still running
|
||||||
|
if result.returncode == 1:
|
||||||
|
return None
|
||||||
|
if len(result.stdout.split("\n")[1].split()) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Get the job ID
|
||||||
|
job_id = result.stdout.split("\n")[1].split()[0]
|
||||||
|
|
||||||
|
return job_id
|
||||||
|
|
||||||
|
|
||||||
|
def check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count):
|
||||||
|
"""
|
||||||
|
Get the status of all previously submitted jobs.
|
||||||
|
For each job, check if it is running, completed, or failed.
|
||||||
|
If the job is failed, resubmit it.
|
||||||
|
"""
|
||||||
|
job_status_categories = {'R':[],'CP':[],'PD':[],'X':[]}
|
||||||
|
|
||||||
|
# Check the status of every previously submitted job
|
||||||
|
for prev_box in tqdm.tqdm(range(1,box), desc="Checking jobs", unit="boxes", leave=False, position=1):
|
||||||
|
# Check the job status
|
||||||
|
status, job_time = check_job_status(job_ids_array[prev_box-1])
|
||||||
|
# Add the job status to the dictionary
|
||||||
|
if status not in job_status_categories:
|
||||||
|
job_status_categories[status] = []
|
||||||
|
# If the status is 'X', check if the tile file was created
|
||||||
|
if status == 'X':
|
||||||
|
# Check if the tile file was created
|
||||||
|
if os.path.exists(args.workdir+f"{args.tilefile}{prev_box}.h5"):
|
||||||
|
job_status_categories['CP'].append(prev_box) # Classify as completed
|
||||||
|
else:
|
||||||
|
# Resubmit the job
|
||||||
|
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(prev_box))
|
||||||
|
# Check if the job was submitted successfully
|
||||||
|
if job_id is None:
|
||||||
|
print(f"Error submitting job for box {box}")
|
||||||
|
error_count+=1
|
||||||
|
else:
|
||||||
|
job_ids_array[prev_box-1] = int(job_id)
|
||||||
|
resubmit_count += 1
|
||||||
|
job_status_categories[status].append(prev_box) # Classify as failed
|
||||||
|
# Sleep for a while before resubmitting the next job
|
||||||
|
time.sleep(args.sleep)
|
||||||
|
# If the status is not 'X', record the job status
|
||||||
|
else:
|
||||||
|
job_status_categories[status].append(prev_box)
|
||||||
|
|
||||||
|
return job_status_categories, resubmit_count, error_count
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def cap_number_of_jobs(job_status_categories,job_ids_array, max_jobs, sleep_time):
|
||||||
|
"""
|
||||||
|
Cap the number of jobs to a maximum number.
|
||||||
|
"""
|
||||||
|
discard_categories = ['CP', 'X'] # Completed and Failed
|
||||||
|
# Check the number of running /pending jobs
|
||||||
|
job_num = 0
|
||||||
|
for status in job_status_categories.keys():
|
||||||
|
if status not in discard_categories:
|
||||||
|
job_num += len(job_status_categories[status])
|
||||||
|
|
||||||
|
# We wait until the number of jobs is below the maximum
|
||||||
|
while job_num > max_jobs:
|
||||||
|
print(f"Number of open jobs: {job_num} > {max_jobs}. Waiting...")
|
||||||
|
for status in job_status_categories.keys():
|
||||||
|
if status not in discard_categories:
|
||||||
|
for box in job_status_categories[status]:
|
||||||
|
# Check the new job status
|
||||||
|
new_status, job_time = check_job_status(job_ids_array[box-1])
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
if new_status in discard_categories:
|
||||||
|
job_num -= 1
|
||||||
|
job_status_categories[new_status].append(box) # WARNING: We do not reclassify 'X' jobs as 'CP'
|
||||||
|
job_status_categories[status].remove(box)
|
||||||
|
|
||||||
|
return job_status_categories
|
||||||
|
|
||||||
|
|
||||||
|
def print_summary_job_status(job_status_categories, box, resubmit_count, error_count):
|
||||||
|
print("---------------------------------------------------")
|
||||||
|
# Print summary of job statuses
|
||||||
|
print(f"Job statuses after box {box}:")
|
||||||
|
# Print a table with columns for each status and below the % of jobs in that status
|
||||||
|
row0 = f"{'Status':<10}"
|
||||||
|
for status in job_status_categories.keys():
|
||||||
|
row0 += f"{status:>10}"
|
||||||
|
print(row0)
|
||||||
|
row1 = f"{'Percentage':<10}"
|
||||||
|
for status in job_status_categories.keys():
|
||||||
|
row1 += f"{len(job_status_categories[status])/box*100:>9.1f}%"
|
||||||
|
print(row1)
|
||||||
|
# Print the rate of resubmissions
|
||||||
|
print(f"Resubmission rate: {resubmit_count/box*100:.2f}%")
|
||||||
|
print(f"Error count: {error_count}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
parser = argparse.ArgumentParser(description="Submit slurm jobs for sCOLA tiles.")
|
||||||
|
|
||||||
|
parser.add_argument("-d", "--directory", type=str, default="./", help="Main directory where the output will be saved (if other dir and filenames are not specified).")
|
||||||
|
parser.add_argument("-sd", "--slurmdir", type=str, default=None, help="Directory where the slurm scripts are saved (default is -d/slurm_scripts).")
|
||||||
|
parser.add_argument("-wd", "--workdir", type=str, default=None, help="Directory where the tiles are saved (default is -d/work).")
|
||||||
|
|
||||||
|
parser.add_argument("-sf","--slurmfile", type=str, default="scola_sCOLA.sh", help="Slurm script file (located in slurmdir).")
|
||||||
|
parser.add_argument("-tf","--tilefile", type=str, default="scola_tile", help="Tile file name (located in workdir).")
|
||||||
|
parser.add_argument("--jobname", type=str, default="sCOLA_", help="Job name for the slurm jobs.")
|
||||||
|
|
||||||
|
parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.")
|
||||||
|
|
||||||
|
parser.add_argument("--sleep", type=float, default=1.0, help="Sleep time between each job submission (in s).")
|
||||||
|
|
||||||
|
args=parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
if args.slurmdir is None:
|
||||||
|
args.slurmdir = args.directory + "slurm_scripts/"
|
||||||
|
if args.workdir is None:
|
||||||
|
args.workdir = args.directory + "work/"
|
||||||
|
|
||||||
|
|
||||||
|
# Check that the slurm file exists
|
||||||
|
if not os.path.exists(args.slurmdir+args.slurmfile):
|
||||||
|
raise FileNotFoundError(f"Slurm file {args.slurmdir+args.slurmfile} does not exist.")
|
||||||
|
# Check that the work directory exists
|
||||||
|
if not os.path.exists(args.workdir):
|
||||||
|
raise FileNotFoundError(f"Work directory {args.workdir} does not exist.")
|
||||||
|
|
||||||
|
MAX_ERRORS = 10
|
||||||
|
MAX_RESUBMIT = int(0.1*args.N_tiles**3) # 10% of the total number of jobs
|
||||||
|
MAX_JOBS_AT_ONCE = int(3*128/8) # 3 nodes with 128 cores each, 8 jobs per core
|
||||||
|
CHECK_EVERY = 100
|
||||||
|
|
||||||
|
error_count = 0
|
||||||
|
resubmit_count = 0
|
||||||
|
counter_for_checks = 0
|
||||||
|
|
||||||
|
job_ids_array = np.zeros((args.N_tiles**3,), dtype=int)
|
||||||
|
|
||||||
|
|
||||||
|
print("---------------------------------------------------")
|
||||||
|
print("Starting job submission for sCOLA tiles with the following parameters:")
|
||||||
|
print(f"Directory: {args.directory}")
|
||||||
|
print(f"Slurm file: {args.slurmdir}{args.slurmfile}")
|
||||||
|
print(f"Work directory: {args.workdir}")
|
||||||
|
print(f"Number of tiles: {args.N_tiles**3} tiles")
|
||||||
|
print(f"Sleep time: {args.sleep} s")
|
||||||
|
print(f"Max errors: {MAX_ERRORS} errors")
|
||||||
|
print(f"Max resubmits: {MAX_RESUBMIT} resubmits")
|
||||||
|
print(f"Max jobs at once: {MAX_JOBS_AT_ONCE} jobs")
|
||||||
|
print(f"Check every: {CHECK_EVERY} jobs")
|
||||||
|
print("---------------------------------------------------")
|
||||||
|
print(f"ETA: {convert_seconds_to_time(args.N_tiles**3*args.sleep*1.05)}")
|
||||||
|
print("Starting job submission...")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
for box in tqdm.tqdm(range(1,args.N_tiles**3+1), desc="Submitting jobs", unit="boxes"):
|
||||||
|
|
||||||
|
# Check if the tile file already exists
|
||||||
|
if os.path.exists(args.workdir+f"{args.tilefile}{box}.h5"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check if the slurm job is already running
|
||||||
|
job_id = get_job_id(f"{args.jobname}{box}")
|
||||||
|
if job_id is not None:
|
||||||
|
job_ids_array[box-1] = int(job_id)
|
||||||
|
time.sleep(args.sleep)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Create the slurm script for the box
|
||||||
|
create_scola_slurm_script(args.slurmdir+args.slurmfile, str(box))
|
||||||
|
|
||||||
|
# Submit the job
|
||||||
|
job_id = submit_slurm_job(args.slurmdir+args.slurmfile+"."+str(box))
|
||||||
|
|
||||||
|
# Check if the job was submitted successfully
|
||||||
|
if job_id is None:
|
||||||
|
print(f"Error submitting job for box {box}")
|
||||||
|
error_count+=1
|
||||||
|
else:
|
||||||
|
job_ids_array[box-1] = int(job_id)
|
||||||
|
|
||||||
|
# Sleep for a while before submitting the next job
|
||||||
|
time.sleep(args.sleep)
|
||||||
|
|
||||||
|
counter_for_checks += 1
|
||||||
|
|
||||||
|
# Check if the error count exceeds the maximum
|
||||||
|
if error_count >= MAX_ERRORS:
|
||||||
|
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||||
|
# Check if the resubmit count exceeds the maximum
|
||||||
|
if resubmit_count >= MAX_RESUBMIT:
|
||||||
|
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
|
||||||
|
|
||||||
|
# Check the job status every CHECK_EVERY jobs
|
||||||
|
if counter_for_checks >= CHECK_EVERY:
|
||||||
|
|
||||||
|
counter_for_checks = 0
|
||||||
|
|
||||||
|
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,box,resubmit_count,error_count)
|
||||||
|
print_summary_job_status(job_status_categories, box, resubmit_count, error_count)
|
||||||
|
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
print("All jobs submitted. Now checking the status of the jobs.")
|
||||||
|
|
||||||
|
|
||||||
|
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
|
||||||
|
# Now wait for all jobs to finish
|
||||||
|
while len(job_status_categories['CP'])<args.N_tiles**3:
|
||||||
|
time.sleep(10*args.sleep)
|
||||||
|
# Check if the error count exceeds the maximum
|
||||||
|
if error_count >= MAX_ERRORS:
|
||||||
|
raise RuntimeError(f"Error count exceeded {MAX_ERRORS}. Stopping job submission.")
|
||||||
|
# Check if the resubmit count exceeds the maximum
|
||||||
|
if resubmit_count >= MAX_RESUBMIT:
|
||||||
|
raise RuntimeError(f"Resubmit count exceeded {MAX_RESUBMIT}. Stopping job submission.")
|
||||||
|
job_status_categories, resubmit_count, error_count = check_previous_jobs(args,job_ids_array,args.N_tiles**3+1,resubmit_count,error_count)
|
||||||
|
print_summary_job_status(job_status_categories, args.N_tiles**3+1, resubmit_count, error_count)
|
||||||
|
job_status_categories = cap_number_of_jobs(job_status_categories,job_ids_array,MAX_JOBS_AT_ONCE,args.sleep)
|
||||||
|
|
||||||
|
|
||||||
|
print("All jobs finished.")
|
||||||
|
# Remove the slurm scripts
|
||||||
|
for box in range(1,args.N_tiles**3+1):
|
||||||
|
os.remove(args.slurmdir+args.slurmfile+"."+str(box))
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue