from os.path import isfile from pysbmy import param_file import numpy as np from argparse import ArgumentParser def register_arguments_card(parser:ArgumentParser): """ Register the arguments for the parameter card. """ parser.add_argument("-pf","--paramfile", type=str, default=None, help="Parameter card file.") parser.add_argument("-Np","--N_particles", type=int, default=128, help="Number of particles per axis.") parser.add_argument("-Nlpt","--N_LPT_mesh", type=int, default=None, help="Number of mesh points per axis for the LPT mesh.") parser.add_argument("-Npm","--N_PM_mesh", type=int, default=None, help="Number of mesh points per axis for the PM mesh.") parser.add_argument("-L","--L", type=float, default=100.0, help="Size of the simulation box (in Mpc/h).") parser.add_argument("-corner","--corner", type=float, nargs=3, default=[0.0, 0.0, 0.0], help="Corner of the simulation box.") parser.add_argument("-zini","--RedshiftLPT", type=float, default=19.0, help="Redshift for the LPT evolution.") parser.add_argument("-zfinal","--RedshiftFCs", type=float, default=0.0, help="Redshift for the FCs evolution.") parser.add_argument("-Nt","--N_tiles", type=int, default=4, help="Number of tiles per dimension.") parser.add_argument("-Npb","--Np_buffer", type=int, default=16, help="Number of particles in the buffer.") parser.add_argument("-Nplptb","--Np_lpt_buffer", type=int, default=8, help="Number of particles in the LPT buffer.") parser.add_argument("--EvolutionMode", type=int, default=None, help="Evolution mode.") parser.add_argument("--ICsMode", type=int, default=None, help="Initial conditions mode.") parser.add_argument("--InputWhiteNoise", type=str, default=None, help="Input white noise file.") parser.add_argument("--ICs", type=str, default=None, help="Initial conditions file.") parser.add_argument("--InputPowerSpectrum", type=str, default=None, help="Input power spectrum file.") parser.add_argument("--WriteInitialConditions", type=bool, default=True, help="Write initial conditions.") parser.add_argument("--WriteLPTSnapshot", type=bool, default=False, help="Write LPT snapshot.") parser.add_argument("--OutputLPTSnapshot", type=str, default=None, help="Output LPT snapshot file.") parser.add_argument("--WriteLPTDensity", type=bool, default=False, help="Write LPT density.") parser.add_argument("--OutputLPTDensity", type=str, default=None, help="Output LPT density file.") parser.add_argument("--ModulePMCOLA", type=bool, default=None, help="Use PM/COLA module.") parser.add_argument("--TimeSteppingFileName", type=str, default=None, help="Time stepping file.") parser.add_argument("--WriteDensities", type=bool, default=False, help="Write densities.") parser.add_argument("--OutputDensitiesBase", type=str, default=None, help="Output densities base.") parser.add_argument("--WriteSnapshots", type=bool, default=False, help="Write snapshots.") parser.add_argument("--OutputSnapshotsBase", type=str, default=None, help="Output snapshots base.") parser.add_argument("--WriteFinalSnapshot", type=bool, default=False, help="Write final snapshot.") parser.add_argument("--OutputFinalSnapshot", type=str, default=None, help="Output final snapshot file.") parser.add_argument("--WriteFinalDensity", type=bool, default=True, help="Write final density.") parser.add_argument("--OutputFinalDensity", type=str, default=None, help="Output final density file.") parser.add_argument("--OutputTilesBase", type=str, default=None, help="Output tiles base.") parser.add_argument("--OutputLPTPotential1", type=str, default=None, help="Output LPT potential 1 file.") parser.add_argument("--OutputLPTPotential2", type=str, default=None, help="Output LPT potential 2 file.") parser.add_argument("-lc","--GenerateLightcone", type=bool, default=False, help="Generate lightcone.") parser.add_argument("-fcs","--OutputAlsoFCs", type=bool, default=True, help="Output also FCs.") parser.add_argument("-obs","--Observer", type=float, nargs=3, default=[0.0, 0.0, 0.0], help="Observer position.") parser.add_argument("--OutputFCsDensity", type=str, default=None, help="Output FCs density file.") parser.add_argument("--OutputFCsSnapshot", type=str, default=None, help="Output FCs snapshot file.") parser.add_argument("--OutputRngStateLPT", type=str, default=None, help="Output RNG state file.") def register_arguments_card_for_ICs(parser:ArgumentParser): parser.add_argument("-Np","--N_particles", type=int, default=128, help="Number of particles per axis.") parser.add_argument("-Nlpt","--N_LPT_mesh", type=int, default=None, help="Number of mesh points per axis for the LPT mesh.") parser.add_argument("-L","--L", type=float, default=100.0, help="Size of the simulation box (in Mpc/h).") parser.add_argument("-corner","--corner", type=float, nargs=3, default=[0.0, 0.0, 0.0], help="Corner of the simulation box.") parser.add_argument("--ICsMode", type=int, default=None, help="Initial conditions mode.") parser.add_argument("--ICs", type=str, default=None, help="Initial conditions file.") parser.add_argument("--InputWhiteNoise", type=str, default=None, help="Input white noise file.") parser.add_argument("--InputPowerSpectrum", type=str, default=None, help="Input power spectrum file.") def register_arguments_card_for_timestepping(parser:ArgumentParser): parser.add_argument("-zini","--RedshiftLPT", type=float, default=19.0, help="Redshift for the LPT evolution.") parser.add_argument("-zfinal","--RedshiftFCs", type=float, default=0.0, help="Redshift for the FCs evolution.") parser.add_argument("--TimeSteppingFileName", type=str, default=None, help="Time stepping file.") parser.add_argument("-lc","--GenerateLightcone", type=bool, default=False, help="Generate lightcone.") def parse_arguments_card(parsed_args): """ Parse the arguments for the parameter card. """ from args_main import parse_arguments_main from cosmo_params import parse_arguments_cosmo cosmo_dict=parse_arguments_cosmo(parsed_args) card_dict=dict( paramfile=parsed_args.paramfile, N_particles=parsed_args.N_particles, N_LPT_mesh=parsed_args.N_LPT_mesh, N_PM_mesh=parsed_args.N_PM_mesh, L=parsed_args.L, corner=parsed_args.corner, RedshiftLPT=parsed_args.RedshiftLPT, RedshiftFCs=parsed_args.RedshiftFCs, EvolutionMode=parsed_args.EvolutionMode, ICsMode=parsed_args.ICsMode, InputWhiteNoise=parsed_args.InputWhiteNoise, ICs=parsed_args.ICs, InputPowerSpectrum=parsed_args.InputPowerSpectrum, WriteInitialConditions=parsed_args.WriteInitialConditions, WriteLPTSnapshot=parsed_args.WriteLPTSnapshot, OutputLPTSnapshot=parsed_args.OutputLPTSnapshot, WriteLPTDensity=parsed_args.WriteLPTDensity, OutputLPTDensity=parsed_args.OutputLPTDensity, ModulePMCOLA=parsed_args.ModulePMCOLA, TimeSteppingFileName=parsed_args.TimeSteppingFileName, WriteDensities=parsed_args.WriteDensities, OutputDensitiesBase=parsed_args.OutputDensitiesBase, WriteSnapshots=parsed_args.WriteSnapshots, OutputSnapshotsBase=parsed_args.OutputSnapshotsBase, WriteFinalSnapshot=parsed_args.WriteFinalSnapshot, OutputFinalSnapshot=parsed_args.OutputFinalSnapshot, WriteFinalDensity=parsed_args.WriteFinalDensity, OutputFinalDensity=parsed_args.OutputFinalDensity, OutputTilesBase=parsed_args.OutputTilesBase, OutputLPTPotential1=parsed_args.OutputLPTPotential1, OutputLPTPotential2=parsed_args.OutputLPTPotential2, GenerateLightcone=parsed_args.GenerateLightcone, OutputAlsoFCs=parsed_args.OutputAlsoFCs, Observer=parsed_args.Observer, N_tiles=parsed_args.N_tiles, Np_buffer=parsed_args.Np_buffer, Np_lpt_buffer=parsed_args.Np_lpt_buffer, OutputFCsDensity=parsed_args.OutputFCsDensity, OutputFCsSnapshot=parsed_args.OutputFCsSnapshot, OutputRngStateLPT=parsed_args.OutputRngStateLPT, h=cosmo_dict["h"], Omega_m=cosmo_dict["Omega_m"], Omega_b=cosmo_dict["Omega_b"], Omega_q=cosmo_dict["Omega_q"], Omega_k=cosmo_dict["Omega_k"], n_s=cosmo_dict["n_s"], sigma8=cosmo_dict["sigma8"], w0=cosmo_dict["w_0"], wa=cosmo_dict["w_a"], ) ### Now for all parameters that are None (unspecified), we set them to the default value using the main parameters ## First we parse the main parameters main_dict = parse_arguments_main(parsed_args) ligthcone_prefix = "lightcone_" if card_dict["GenerateLightcone"] else "" ## Now we set the parameters that are None to the default values if card_dict["paramfile"] is None: card_dict["paramfile"] = main_dict["paramdir"]+"parameters_"+main_dict["simname"]+".sbmy" if card_dict["N_LPT_mesh"] is None: card_dict["N_LPT_mesh"] = card_dict["N_particles"] # Default is the same as the number of particles if card_dict["N_PM_mesh"] is None: card_dict["N_PM_mesh"] = card_dict["N_particles"]*2 # Default is twice the number of particles if card_dict["EvolutionMode"] is None: match main_dict["mode"]: case "ICs" | "LPT": card_dict["EvolutionMode"] = 0 case "PM": card_dict["EvolutionMode"] = 1 case "tCOLA" | "alltCOLA": card_dict["EvolutionMode"] = 2 case "sCOLA" | "pre_sCOLA" | "post_sCOLA" | "allsCOLA": card_dict["EvolutionMode"] = 3 case _: raise ValueError(f"Mode {main_dict['mode']} not recognized.") if card_dict["ICsMode"] is None: match main_dict["ICs_gen"]: case "ext" | "monofonic": card_dict["ICsMode"] = 2 case "sbmy": card_dict["ICsMode"] = 1 case _: raise ValueError(f"ICs generator {main_dict['ICs_gen']} not recognized.") if card_dict["InputWhiteNoise"] is None: card_dict["InputWhiteNoise"] = main_dict["workdir"]+"white_noise.h5" if card_dict["ICs"] is None: card_dict["ICs"] = main_dict["workdir"]+"initial_conditions_DM_delta.h5" if card_dict["InputPowerSpectrum"] is None: card_dict["InputPowerSpectrum"] = main_dict["workdir"]+"power_spectrum.h5" if card_dict["OutputLPTSnapshot"] is None: card_dict["OutputLPTSnapshot"] = main_dict["resultdir"]+"lpt_particles.gadget3" if card_dict["OutputLPTDensity"] is None: card_dict["OutputLPTDensity"] = main_dict["resultdir"]+"lpt_density.h5" if card_dict["ModulePMCOLA"] is None: match main_dict["mode"]: case "ICs" | "LPT": card_dict["ModulePMCOLA"] = 0 case "PM" | "tCOLA" | "sCOLA" | "pre_sCOLA" | "post_sCOLA" | "alltCOLA" | "allsCOLA": card_dict["ModulePMCOLA"] = 1 case _: raise ValueError(f"Mode {main_dict['mode']} not recognized.") if card_dict["TimeSteppingFileName"] is None: card_dict["TimeSteppingFileName"] = main_dict["paramdir"]+"time_stepping.h5" if card_dict["OutputDensitiesBase"] is None: card_dict["OutputDensitiesBase"] = main_dict["resultdir"]+"density_"+main_dict["simname"] if card_dict["OutputSnapshotsBase"] is None: card_dict["OutputSnapshotsBase"] = main_dict["resultdir"]+"particles_"+main_dict["simname"] if card_dict["OutputFinalSnapshot"] is None: card_dict["OutputFinalSnapshot"] = main_dict["resultdir"]+ligthcone_prefix+"final_particles_"+main_dict["simname"]+("_lc" if card_dict["GenerateLightcone"] else "")+".gadget3" if card_dict["OutputFinalDensity"] is None: card_dict["OutputFinalDensity"] = main_dict["resultdir"]+ligthcone_prefix+"final_density_"+main_dict["simname"]+("_lc" if card_dict["GenerateLightcone"] else "")+".h5" if card_dict["OutputTilesBase"] is None: card_dict["OutputTilesBase"] = main_dict["workdir"]+main_dict["simname"]+"_tile" if card_dict["OutputLPTPotential1"] is None: card_dict["OutputLPTPotential1"] = main_dict["workdir"]+"initial_conditions_DM_phi.h5" if card_dict["OutputLPTPotential2"] is None: card_dict["OutputLPTPotential2"] = main_dict["workdir"]+"initial_conditions_DM_phi2.h5" if card_dict["OutputFCsDensity"] is None: card_dict["OutputFCsDensity"] = main_dict["resultdir"]+"final_density_"+main_dict["simname"]+".h5" if card_dict["OutputFCsSnapshot"] is None: card_dict["OutputFCsSnapshot"] = main_dict["resultdir"]+"final_particles_"+main_dict["simname"]+".gadget3" if card_dict["OutputRngStateLPT"] is None: card_dict["OutputRngStateLPT"] = main_dict["workdir"]+"rng_state.h5" return card_dict def parse_arguments_card_for_ICs(parsed_args): """ Parse the arguments for the parameter card for ICs. """ from args_main import parse_arguments_main main_dict = parse_arguments_main(parsed_args) card_dict = dict( N_particles=parsed_args.N_particles, N_LPT_mesh=parsed_args.N_LPT_mesh, L=parsed_args.L, corner=parsed_args.corner, ICsMode=parsed_args.ICsMode, ICs=parsed_args.ICs, InputWhiteNoise=parsed_args.InputWhiteNoise, InputPowerSpectrum=parsed_args.InputPowerSpectrum, ) ## Now we set the parameters that are None to the default values if card_dict["N_LPT_mesh"] is None: card_dict["N_LPT_mesh"] = card_dict["N_particles"] # Default is the same as the number of particles if card_dict["InputWhiteNoise"] is None: card_dict["InputWhiteNoise"] = main_dict["workdir"]+"white_noise.h5" if card_dict["ICs"] is None: card_dict["ICs"] = main_dict["workdir"]+"initial_conditions_DM_delta.h5" if card_dict["InputPowerSpectrum"] is None: card_dict["InputPowerSpectrum"] = main_dict["workdir"]+"power_spectrum.h5" if card_dict["ICsMode"] is None: match main_dict["ICs_gen"]: case "ext" | "monofonic": card_dict["ICsMode"] = 2 case "sbmy": card_dict["ICsMode"] = 1 case _: raise ValueError(f"ICs generator {main_dict['ICs_gen']} not recognized.") return card_dict def parse_arguments_card_for_timestepping(parsed_args): """ Parse the arguments for the parameter card for timestepping. """ from args_main import parse_arguments_main main_dict = parse_arguments_main(parsed_args) card_dict = dict( RedshiftLPT=parsed_args.RedshiftLPT, RedshiftFCs=parsed_args.RedshiftFCs, TimeSteppingFileName=parsed_args.TimeSteppingFileName, GenerateLightcone=parsed_args.GenerateLightcone, ) ## Now we set the parameters that are None to the default values if card_dict["TimeSteppingFileName"] is None: card_dict["TimeSteppingFileName"] = main_dict["paramdir"]+"time_stepping.h5" return card_dict def create_parameter_card_dict( ## General parameters N_particles:int = 128, N_LPT_mesh:int = 128, N_PM_mesh:int = 128, L:float = 100.0, corner:tuple[float]|list[float]|np.ndarray[float] = (0.0, 0.0, 0.0), RedshiftLPT:float = 19.0, RedshiftFCs:float = 0.0, EvolutionMode:int = 2, ## Initial conditions ICsMode:int = 1, InputWhiteNoise:str = 'white_noise.h5', ICs:str = 'initial_conditions_delta.h5', InputPowerSpectrum:str = 'power_spectrum.h5', WriteInitialConditions:bool = True, WriteLPTSnapshot:bool = False, OutputLPTSnapshot:str = 'lpt_particles.gadget3', WriteLPTDensity:bool = False, OutputLPTDensity:str = 'lpt_density.h5', OutputRngStateLPT:str = 'rng_state.h5', ## PM/COLA module ModulePMCOLA:bool = True, TimeSteppingFileName:str = 'time_stepping.h5', WriteDensities:bool = False, OutputDensitiesBase:str = 'density', WriteSnapshots:bool = False, OutputSnapshotsBase:str = 'particles', WriteFinalSnapshot:bool = False, OutputFinalSnapshot:str = 'final_particles.gadget3', WriteFinalDensity:bool = True, OutputFinalDensity:str = 'final_density.h5', ## sCOLA parameters N_tiles:int = 2, Np_buffer:int = 16, Np_lpt_buffer:int = 8, OutputTilesBase:str = 'sCOLA_tile', OutputLPTPotential1:str = 'initial_conditions_phi.h5', OutputLPTPotential2:str = 'initial_conditions_phi2.h5', ## Lightcone GenerateLightcone:bool = False, OutputAlsoFCs:bool = True, Observer:tuple[float]|list[float]|np.ndarray[float] = (0.0, 0.0, 0.0), OutputFCsDensity:str = 'fcs_density.h5', OutputFCsSnapshot:str = 'fcs_particles.gadget3', ## Cosmological parameters h:float = 0.6732, Omega_m:float = 0.302, Omega_b:float = 0.049, Omega_q:float = 0.6842, Omega_k:float = 0.0, n_s:float = 0.968, sigma8:float = 0.815, w0:float = -1.0, wa:float = 0.0, ## Other parameters **kwargs ): parameter_card_dict = dict( OutputRngStateLPT=OutputRngStateLPT, Particles=N_particles, Mesh=N_LPT_mesh, BoxSize=L, corner0=corner[0], corner1=corner[1], corner2=corner[2], ICsMode=ICsMode, InputInitialConditions=ICs, InputWhiteNoise=InputWhiteNoise, InputPowerSpectrum=InputPowerSpectrum, WriteInitialConditions=int(WriteInitialConditions), OutputInitialConditions=ICs, OutputLPTSnapshot=OutputLPTSnapshot, OutputLPTDensity=OutputLPTDensity, RedshiftLPT=RedshiftLPT, WriteLPTSnapshot=int(WriteLPTSnapshot), WriteLPTDensity=int(WriteLPTDensity), ModulePMCOLA=int(ModulePMCOLA), EvolutionMode=EvolutionMode, ParticleMesh=N_PM_mesh, TimeStepDistribution=TimeSteppingFileName, RedshiftFCs=RedshiftFCs, WriteDensities=int(WriteDensities), OutputDensitiesBase=OutputDensitiesBase, WriteSnapshots=int(WriteSnapshots), OutputSnapshotsBase=OutputSnapshotsBase, WriteFinalSnapshot=WriteFinalSnapshot, OutputFinalSnapshot=OutputFinalSnapshot, WriteFinalDensity=WriteFinalDensity, OutputFinalDensity=OutputFinalDensity, GenerateLightcone=int(GenerateLightcone), OutputAlsoFCs=int(OutputAlsoFCs), Observer0=Observer[0], Observer1=Observer[1], Observer2=Observer[2], OutputFCsDensity=OutputFCsDensity, OutputFCsSnapshot=OutputFCsSnapshot, NumberOfTilesPerDimension=N_tiles, NumberOfParticlesInBuffer=Np_buffer, NumberOfParticlesInLPTBuffer=Np_lpt_buffer, OutputLPTPotential1=OutputLPTPotential1, OutputLPTPotential2=OutputLPTPotential2, OutputTilesBase=OutputTilesBase, h=h, Omega_m=Omega_m, Omega_b=Omega_b, n_s=n_s, sigma8=sigma8, Omega_q=Omega_q, Omega_k=Omega_k, w0_fld=w0, wa_fld=wa, **kwargs ) return parameter_card_dict def write_parameter_card(parameter_card_dict:dict, filename:str, verbose:int = 1): S=param_file(**parameter_card_dict) if verbose < 2: from io import BytesIO from low_level import stdout_redirector, stderr_redirector f = BytesIO() g = BytesIO() with stdout_redirector(f): with stderr_redirector(g): S.write(filename) g.close() f.close() else: S.write(filename) def main_parameter_card(parsed_args): """ Main function for the parameter card. """ from low_level import print_message, print_starting_module, print_ending_module print_starting_module("card", verbose=parsed_args.verbose) print_message("Parsing arguments for the parameter card.", 1, "card", verbose=parsed_args.verbose) card_dict = parse_arguments_card(parsed_args) paramfile = card_dict["paramfile"] parameter_card_dict = create_parameter_card_dict(**card_dict) if isfile(paramfile) and not parsed_args.force: print_message(f"Parameter card {paramfile} exists. Use --force to overwrite.", 1, "card", verbose=parsed_args.verbose) return card_dict write_parameter_card(parameter_card_dict, paramfile, verbose=parsed_args.verbose) print_message(f"Parameter card written to {paramfile}", 2, "card", verbose=parsed_args.verbose) print_ending_module("card", verbose=parsed_args.verbose) return card_dict if __name__ == "__main__": from args_main import register_arguments_main from cosmo_params import register_arguments_cosmo parser = ArgumentParser(description="Create sbmy parameter card.") register_arguments_main(parser) register_arguments_card(parser) register_arguments_cosmo(parser) parsed_args = parser.parse_args() main_parameter_card(parsed_args)