193 lines
No EOL
7.8 KiB
Python
193 lines
No EOL
7.8 KiB
Python
|
|
def main_ICs(parsed_args):
|
|
"""
|
|
Main function for the initial conditions generator.
|
|
"""
|
|
from args_main import parse_arguments_main
|
|
from parameters_card import parse_arguments_card_for_ICs
|
|
from low_level import print_starting_module, print_message, print_ending_module
|
|
from os.path import isfile
|
|
|
|
main_dict = parse_arguments_main(parsed_args)
|
|
card_dict = parse_arguments_card_for_ICs(parsed_args)
|
|
|
|
print_starting_module("ICs", verbose=parsed_args.verbose)
|
|
|
|
match main_dict["ICs_gen"]:
|
|
|
|
case "ext":
|
|
if not isfile(card_dict["ICs"]):
|
|
raise FileNotFoundError(f"External initial conditions file {card_dict["ICs"]} not found.")
|
|
if not card_dict["ICsMode"] == 2:
|
|
raise ValueError(f"ICs_gen is in ext mode, but ICsMode is not 2: ICsMode={card_dict["ICsMode"]}")
|
|
print_message(f"External initial conditions file {card_dict["ICs"]} found.", 1, "ICs", verbose=parsed_args.verbose)
|
|
|
|
case "sbmy":
|
|
if card_dict["ICsMode"] == 0:
|
|
print_message("ICsMode is 0 so the initial conditions will be generated during the sbmy run.", 1, "ICs", verbose=parsed_args.verbose)
|
|
elif card_dict["ICsMode"] == 1:
|
|
print_message("ICsMode is 1 so the initial conditions will be generated using the sbmy ICs generator.", 1, "ICs", verbose=parsed_args.verbose)
|
|
ICs_sbmy(parsed_args)
|
|
elif card_dict["ICsMode"] == 2:
|
|
raise ValueError("ICsMode is 2 but ICs_gen is sbmy (not ext).")
|
|
else:
|
|
raise ValueError(f"ICsMode {card_dict["ICsMode"]} not recognized.")
|
|
|
|
case "monofonic":
|
|
if card_dict["ICsMode"] == 1 or card_dict["ICsMode"] == 0:
|
|
raise ValueError("ICsMode is 1 or 0 but ICs_gen is monofonic.")
|
|
elif card_dict["ICsMode"] == 2:
|
|
print_message("ICsMode is 2 so the initial conditions will be generated using the monofonic ICs generator.", 1, "ICs", verbose=parsed_args.verbose)
|
|
ICs_monofonic(parsed_args)
|
|
else:
|
|
raise ValueError(f"ICsMode {card_dict["ICsMode"]} not recognized.")
|
|
|
|
case _:
|
|
raise ValueError(f"ICs_gen {main_dict["ICs_gen"]} not recognized.")
|
|
|
|
print_ending_module("ICs", verbose=parsed_args.verbose)
|
|
|
|
|
|
|
|
|
|
|
|
def ICs_monofonic(parsed_args):
|
|
from monofonic import main_monofonic
|
|
from os.path import isfile
|
|
from low_level import print_message
|
|
from parameters_monofonic import parse_arguments_monofonic
|
|
|
|
monofonic_dict = parse_arguments_monofonic(parsed_args)
|
|
|
|
if not isfile(monofonic_dict["output"]+"DM_delta.h5") or not isfile(monofonic_dict["output"]+"DM_phi2.h5") or not isfile(monofonic_dict["output"]+"DM_phi.h5") or parsed_args.force:
|
|
main_monofonic(parsed_args)
|
|
else:
|
|
print_message(f"Monofonic output files found in {monofonic_dict['output']}. Use -F to overwrite.", 1, "monofonic", verbose=parsed_args.verbose)
|
|
|
|
|
|
|
|
def ICs_sbmy(parsed_args):
|
|
from low_level import print_starting_module, print_message
|
|
from os.path import isfile
|
|
from parameters_card import parse_arguments_card_for_ICs
|
|
|
|
print_starting_module("sbmy IC", verbose=parsed_args.verbose)
|
|
|
|
card_dict = parse_arguments_card_for_ICs(parsed_args)
|
|
|
|
power_spectrum_file = card_dict["InputPowerSpectrum"]
|
|
|
|
if not isfile(power_spectrum_file) or parsed_args.force:
|
|
print_message(f"Power spectrum file {power_spectrum_file} does not exist or forced. Creating it", 1, "sbmy IC", verbose=parsed_args.verbose)
|
|
create_sbmy_power_spectrum_file(parsed_args, card_dict, power_spectrum_file)
|
|
print_message(f"Power spectrum file written to {power_spectrum_file}", 2, "sbmy IC", verbose=parsed_args.verbose)
|
|
else:
|
|
print_message(f"Power spectrum file {power_spectrum_file} exists.", 1, "sbmy IC", verbose=parsed_args.verbose)
|
|
|
|
white_noise_field_file = card_dict["InputWhiteNoise"]
|
|
|
|
if not isfile(white_noise_field_file) or parsed_args.force:
|
|
print_message(f"White noise field file {white_noise_field_file} does not exist or forced. Creating it", 1, "sbmy IC", verbose=parsed_args.verbose)
|
|
create_sbmy_white_noise_field(parsed_args, card_dict, white_noise_field_file)
|
|
print_message(f"White noise field file written to {white_noise_field_file}", 2, "sbmy IC", verbose=parsed_args.verbose)
|
|
else:
|
|
print_message(f"White noise field file {white_noise_field_file} exists.", 1, "sbmy IC", verbose=parsed_args.verbose)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_sbmy_power_spectrum_file(parsed_args, card_dict, power_spectrum_file):
|
|
from cosmo_params import parse_arguments_cosmo
|
|
from pysbmy.power import PowerSpectrum
|
|
|
|
cosmo_dict = parse_arguments_cosmo(parsed_args)
|
|
|
|
if parsed_args.verbose < 2:
|
|
from io import BytesIO
|
|
from low_level import stdout_redirector, stderr_redirector
|
|
f = BytesIO()
|
|
g = BytesIO()
|
|
with stdout_redirector(f):
|
|
with stderr_redirector(g):
|
|
Pk = PowerSpectrum(card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
cosmo_dict,
|
|
)
|
|
Pk.write(power_spectrum_file)
|
|
g.close()
|
|
f.close()
|
|
|
|
else:
|
|
Pk = PowerSpectrum(card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
cosmo_dict,
|
|
)
|
|
Pk.write(power_spectrum_file)
|
|
|
|
|
|
def create_sbmy_white_noise_field(parsed_args, card_dict, white_noise_field_file):
|
|
import numpy as np
|
|
from gc import collect
|
|
from low_level import print_message
|
|
from pysbmy.field import BaseField
|
|
|
|
print_message(f"Seed: {parsed_args.seed}", 3, "sbmy IC", verbose=parsed_args.verbose)
|
|
rng = np.random.default_rng(parsed_args.seed)
|
|
print_message(f"Numpy seed {str(rng.bit_generator.state)}", 3, "sbmy IC", verbose=parsed_args.verbose)
|
|
data = rng.standard_normal(size=card_dict["N_LPT_mesh"]**3)
|
|
wn = BaseField(card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["L"],
|
|
card_dict["corner"][0],
|
|
card_dict["corner"][1],
|
|
card_dict["corner"][2],
|
|
1,
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
card_dict["N_LPT_mesh"],
|
|
data)
|
|
del data
|
|
collect()
|
|
|
|
if parsed_args.verbose < 2:
|
|
from io import BytesIO
|
|
from low_level import stdout_redirector, stderr_redirector
|
|
f = BytesIO()
|
|
g = BytesIO()
|
|
with stdout_redirector(f):
|
|
with stderr_redirector(g):
|
|
wn.write(white_noise_field_file)
|
|
g.close()
|
|
f.close()
|
|
else:
|
|
wn.write(white_noise_field_file)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from argparse import ArgumentParser
|
|
from args_main import register_arguments_main
|
|
from parameters_card import register_arguments_card_for_ICs
|
|
from cosmo_params import register_arguments_cosmo
|
|
from parameters_monofonic import register_arguments_monofonic
|
|
from slurm_submission import register_arguments_slurm
|
|
|
|
parser = ArgumentParser(description="Generate initial conditions for a Simbelmyne simulation.")
|
|
# TODO: reduce the volume of arguments
|
|
register_arguments_main(parser)
|
|
register_arguments_monofonic(parser)
|
|
register_arguments_slurm(parser)
|
|
register_arguments_card_for_ICs(parser)
|
|
register_arguments_cosmo(parser)
|
|
parsed_args = parser.parse_args()
|
|
|
|
main_ICs(parsed_args) |