initial commit

This commit is contained in:
Tristan Hoellinger 2025-05-17 17:26:25 +02:00
commit ae8bacd6a6
13 changed files with 3215 additions and 0 deletions

64
.gitignore vendored Normal file
View file

@ -0,0 +1,64 @@
###################
# Compiled source #
###################
*.com
*.class
*.dll
*.exe
*.o
*.so
*/build/*
############
# Packages #
############
# Better to unpack these files and commit the raw source
# git has its own built in compression methods
*.7z
*.dmg
*.gz
*.iso
*.jar
*.rar
*.tar
*.zip
######################
# Logs and databases #
######################
*.log
*.sql
*.sqlite
######################
# OS generated files #
######################
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
#####################
# IDE-related files #
#####################
.vscode*
__pycache__*
*.code-workspace
*.egg-info*
########
# Runs #
########
data/
examples/
*.npy
*.h5
*.fits
############
# Notebook #
############
*.ipynb_checkpoints*

3
README.md Normal file
View file

@ -0,0 +1,3 @@
# ParticleParticle ParticleMesh in Simbelmynë
Personal package for debugging and non-regression tests whilst implementing the P3M algorithm in Simbelmynë.

1046
notebooks/0_nonreg.ipynb Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

51
src/setup.py Normal file
View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
Setup script for the WIP-P3M package.
Personal package for debugging, testing and non-regression whilst
implementing P3M gravity in Simbelmynë.
"""
from setuptools import setup, find_packages
import os
# Read the long description from README.md
here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, "../README.md"), encoding="utf-8") as f:
long_description = f.read()
setup(
name="wip3m",
version="0.1.0",
author="Tristan Hoellinger",
author_email="tristan.hoellinger@iap.fr",
description="Personal package for debugging, testing and non-" \
"regression whilst implementing P3M gravity in Simbelmynë",
long_description=long_description,
long_description_content_type="text/markdown",
packages=find_packages(),
include_package_data=True,
package_data={"wip3m": ["preamble.tex"]},
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering :: Astronomy",
],
python_requires=">=3.7",
license="GPLv3",
keywords="cosmology large-scale-structure N-body",
)

21
src/wip3m/__init__.py Normal file
View file

@ -0,0 +1,21 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
WIP-P3M package.
Personal package for debugging, testing and non-regression whilst
implementing the P3M algorithm in Simbelmynë.
"""
from .params import *

229
src/wip3m/logger.py Normal file
View file

@ -0,0 +1,229 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
Logger routines for this project.
The printing routines and colours are adapted from the Simbelmynë
cosmological solver (https://simbelmyne.readthedocs.io/en/latest), for
enhanced logging compatibility with Simbelmynë.
"""
import sys
from typing import cast
import logging
from wip3m import DEFAULT_VERBOSE_LEVEL
# Global variables for fonts
FONT_BOLDRED = "\033[1;31m"
FONT_BOLDGREEN = "\033[1;32m"
FONT_BOLDYELLOW = "\033[1;33m"
FONT_BOLDCYAN = "\033[1;36m"
FONT_BOLDGREY = "\033[1;37m"
FONT_LIGHTPURPLE = "\033[38;5;147m"
FONT_NORMAL = "\033[00m"
# Global variables for verbosity
ERROR_VERBOSITY = 0
INFO_VERBOSITY = 1
WARNING_VERBOSITY = 2
DIAGNOSTIC_VERBOSITY = 3
DEBUG_VERBOSITY = 4
DIAGNOSTIC_LEVEL = 15
logging.addLevelName(DIAGNOSTIC_LEVEL, "DIAGNOSTIC")
G__ind__ = 0 # Global variable for logger indentation
def INDENT():
"""Indents the current level of outputs."""
global G__ind__
G__ind__ += 1
return G__ind__
def UNINDENT():
"""Unindents the current level of outputs."""
global G__ind__
G__ind__ -= 1
return G__ind__
def PrintLeftType(message_type, FONT_COLOR):
"""Prints the type of output to screen.
Parameters
----------
message_type (string) : type of message
FONT_COLOR (string) : font color for this type of message
"""
from time import localtime, strftime
sys.stdout.write(
"["
+ strftime("%H:%M:%S", localtime())
+ "|"
+ FONT_COLOR
+ message_type
+ FONT_NORMAL
+ "]"
)
sys.stdout.write("==" * G__ind__)
sys.stdout.write("|")
def PrintInfo(message):
"""Prints an information to screen.
Parameters
----------
message (string) : message
"""
if DEFAULT_VERBOSE_LEVEL >= INFO_VERBOSITY:
PrintLeftType("INFO ", FONT_BOLDCYAN)
sys.stdout.write("{}\n".format(message))
sys.stdout.flush()
def PrintDiagnostic(verbosity, message):
"""Prints a diagnostic to screen.
Parameters
----------
verbosity (int) : verbosity of the message
message (string) : message
"""
if DEFAULT_VERBOSE_LEVEL >= verbosity:
PrintLeftType("DIAGNOSTIC", FONT_BOLDGREY)
sys.stdout.write("{}\n".format(message))
def PrintWarning(message):
"""Prints a warning to screen.
Parameters
----------
message (string) : message
"""
if DEFAULT_VERBOSE_LEVEL >= WARNING_VERBOSITY:
PrintLeftType("WARNING ", FONT_BOLDYELLOW)
sys.stdout.write(FONT_BOLDYELLOW + message + FONT_NORMAL + "\n")
def PrintError(message):
"""Prints an error to screen.
Parameters
----------
message (string) : message
"""
if DEFAULT_VERBOSE_LEVEL >= ERROR_VERBOSITY:
PrintLeftType("ERROR ", FONT_BOLDRED)
sys.stdout.write(FONT_BOLDRED + message + FONT_NORMAL + "\n")
class CustomLoggerHandler(logging.Handler):
"""
Custom logging handler to redirect Python logger messages to custom
print functions, with support for verbosity levels in debug
messages.
"""
def emit(self, record):
"""
Emit a log record.
"""
try:
log_message = self.format(record)
log_level = record.levelno
if log_level >= logging.ERROR:
PrintError(log_message)
elif log_level >= logging.WARNING:
PrintWarning(log_message)
elif log_level >= logging.INFO:
PrintInfo(log_message)
elif log_level == DIAGNOSTIC_LEVEL:
# Retrieve verbosity level from the record
verbosity = getattr(record, "verbosity", DIAGNOSTIC_VERBOSITY)
PrintDiagnostic(verbosity=verbosity, message=log_message)
elif log_level >= logging.DEBUG:
PrintDiagnostic(verbosity=DEBUG_VERBOSITY, message=log_message)
else:
# Fallback for other levels
PrintInfo(log_message)
except Exception:
self.handleError(record)
class CustomLogger(logging.Logger):
"""
Custom logger class supporting custom verbosity levels in diagnostic
messages.
"""
def diagnostic(self, msg, *args, verbosity=DIAGNOSTIC_VERBOSITY, **kwargs) -> None:
"""
Log a message with DIAGNOSTIC level.
Parameters
----------
msg : str
The message to log.
verbosity : int, optional
The verbosity level required to log this message.
"""
if self.isEnabledFor(DIAGNOSTIC_LEVEL):
# Pass verbosity as part of the extra argument
extra = kwargs.get("extra", {})
extra["verbosity"] = verbosity
kwargs["extra"] = extra
self.log(DIAGNOSTIC_LEVEL, msg, *args, **kwargs)
logging.setLoggerClass(CustomLogger)
def getCustomLogger(name: str) -> CustomLogger:
"""
Get as CustomLogger instance to use the custom printing routines.
Parameters
----------
name : str
The name of the logger.
Returns
-------
logger : logging.Logger
The custom logger instance.
"""
logging.setLoggerClass(CustomLogger)
logger = cast(CustomLogger, logging.getLogger(name)) # cast for type checkers and PyLance
logger.setLevel(logging.DEBUG) # Set the desired base logging level
handler = CustomLoggerHandler()
formatter = logging.Formatter(f"{FONT_LIGHTPURPLE}(%(name)s){FONT_NORMAL} %(message)s")
handler.setFormatter(formatter)
# Attach the handler to the logger if not already present
if not logger.handlers:
logger.addHandler(handler)
return logger

127
src/wip3m/low_level.py Normal file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
Tools to deal with low-level operations.
"""
from contextlib import contextmanager
import platform
import ctypes
import io
import os, sys
import tempfile
libc = ctypes.CDLL(None)
if platform.system() == "Darwin": # macOS
stdout_symbol = "__stdoutp"
stderr_symbol = "__stderrp"
else:
stdout_symbol = "stdout"
stderr_symbol = "stderr"
c_stdout = ctypes.c_void_p.in_dll(libc, stdout_symbol)
c_stderr = ctypes.c_void_p.in_dll(libc, stderr_symbol)
# Taken from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stdout_redirector(stream):
"""A context manager that redirects stdout to the given stream. For
instance, this can be used to redirect C code stdout to None (to
avoid cluttering the log, e.g., when using tqdm).
Args:
stream (file-like object): The stream to which stdout should be
redirected.
Example:
>>> with stdout_redirector(stream):
>>> print("Hello world!") # Will be printed to stream
>>> # instead of stdout.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()
def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
libc.fflush(c_stdout)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)
# Adapted from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stderr_redirector(stream):
"""A context manager that redirects stderr to the given stream.
For instance, this can be used to redirect C code stderr to None (to
avoid cluttering the log, e.g., when using tqdm).
Use with caution.
Args:
stream (file-like object): The stream to which stdout should be
redirected.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stderr_fd = sys.stderr.fileno()
def _redirect_stderr(to_fd):
"""Redirect stderr to the given file descriptor."""
# Flush the C-level buffer stderr
libc.fflush(c_stderr)
# Flush and close sys.stderr - also closes the file descriptor (fd)
sys.stderr.close()
# Make original_stderr_fd point to the same file as to_fd
os.dup2(to_fd, original_stderr_fd)
# Create a new sys.stderr that points to the redirected fd
sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stderr_fd = os.dup(original_stderr_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stderr(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stderr(saved_stderr_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stderr_fd)

109
src/wip3m/params.py Normal file
View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""Global parameters for this project."""
import os
from pathlib import Path
import numpy as np
WHICH_SPECTRUM = "eh" # available options are "eh" and "class"
# Load global paths from environment variables
ROOT_PATH = os.getenv("WIP3M_ROOT_PATH")
if ROOT_PATH is None:
raise EnvironmentError("Please set the 'WIP3M_ROOT_PATH' environment variable.")
OUTPUT_PATH = os.getenv("WIP3M_OUTPUT_PATH")
if OUTPUT_PATH is None:
raise EnvironmentError("Please set the 'WIP3M_OUTPUT_PATH' environment variable.")
# Default verbose level
# 0: errors only, 1: info, 2: warnings+, 3: all diagnostics, 4+: debug
DEFAULT_VERBOSE_LEVEL = 2
# Baseline seeds for reproducibility
BASELINE_SEEDNORM = 100050599
BASELINE_SEEDNOISE = 200050599
BASELINE_SEEDPHASE = 300050599
# Fiducial cosmological parameters
h_planck = 0.6766
Omega_b_planck = 0.02242 / h_planck**2
Omega_m_planck = 0.3111
nS_planck = 0.9665
sigma8_planck = 0.8102
TAU_REIO = 0.066
planck_mean = np.array([h_planck, Omega_b_planck, Omega_m_planck, nS_planck, sigma8_planck])
planck_cov = np.diag(np.array([0.0042, 0.00030, 0.0056, 0.0038, 0.0060]) ** 2)
# Mapping from cosmological parameter names to corresponding indices
cosmo_params_names = [r"$h$", r"$\Omega_b$", r"$\Omega_m$", r"$n_S$", r"$\sigma_8$"]
cosmo_params_name_to_idx = {"h": 0, "Omega_b": 1, "Omega_m": 2, "n_s": 3, "sigma8": 4}
# Minimum k value used in the normalisation of the summaries
MIN_K_NORMALISATION = 4e-2
params_planck_kmax_missing = {
"h": h_planck,
"Omega_r": 0.0,
"Omega_q": 1.0 - Omega_m_planck,
"Omega_b": Omega_b_planck,
"Omega_m": Omega_m_planck,
"m_ncdm": 0.0,
"Omega_k": 0.0,
"tau_reio": TAU_REIO,
"n_s": nS_planck,
"sigma8": sigma8_planck,
"w0_fld": -1.0,
"wa_fld": 0.0,
"WhichSpectrum": WHICH_SPECTRUM,
}
def z2a(z):
return 1.0 / (1 + z)
def cosmo_small_to_full_dict(cosmo_min):
"""Return a full cosmology dictionary from a minimal one.
Parameters
----------
cosmo_min : dict
Minimal cosmology dictionary.
Returns
-------
cosmo_full : dict
Full cosmology dictionary.
"""
cosmo_full = {
"h": cosmo_min["h"],
"Omega_r": 0.0,
"Omega_q": 1 - cosmo_min["Omega_m"],
"Omega_b": cosmo_min["Omega_b"],
"Omega_m": cosmo_min["Omega_m"],
"m_ncdm": 0.0,
"Omega_k": 0.0,
"tau_reio": TAU_REIO,
"n_s": cosmo_min["n_s"],
"sigma8": cosmo_min["sigma8"],
"w0_fld": -1.0,
"wa_fld": 0.0,
"k_max": cosmo_min["k_max"],
"WhichSpectrum": cosmo_min["WhichSpectrum"],
}
return cosmo_full

80
src/wip3m/plot_params.py Normal file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
Plotting parameters and custom colormaps for the WIP-P3M package.
This module provides custom Matplotlib settings, formatter classes, and
colormaps used for visualising results in the SelfiSys project.
"""
# Global font sizes
GLOBAL_FS = 18
GLOBAL_FS_LARGE = 20
GLOBAL_FS_XLARGE = 22
GLOBAL_FS_SMALL = 16
GLOBAL_FS_TINY = 14
COLOUR_LIST = ["C{}".format(i) for i in range(10)]
def reset_plotting():
import matplotlib as mpl
mpl.rcParams.update(mpl.rcParamsDefault)
def setup_plotting():
"""
Configure Matplotlib plotting settings for consistent appearance.
"""
import matplotlib.pyplot as plt
import importlib.resources
with importlib.resources.open_text("wip3m", "preamble.tex") as f:
preamble = f.read()
# Dictionary with rcParams settings
rcparams = {
"font.family": "serif",
"font.size": GLOBAL_FS, # Base font size
"axes.titlesize": GLOBAL_FS_XLARGE,
"axes.labelsize": GLOBAL_FS_LARGE,
"axes.linewidth": 1.0,
"xtick.labelsize": GLOBAL_FS_SMALL,
"ytick.labelsize": GLOBAL_FS_SMALL,
"xtick.major.width": 1.2,
"ytick.major.width": 1.2,
"xtick.minor.width": 1.0,
"ytick.minor.width": 1.0,
"xtick.direction": "in",
"ytick.direction": "in",
"xtick.major.pad": 5,
"xtick.minor.pad": 5,
"ytick.major.pad": 5,
"ytick.minor.pad": 5,
"legend.fontsize": GLOBAL_FS_SMALL,
"legend.title_fontsize": GLOBAL_FS_LARGE,
"figure.titlesize": GLOBAL_FS_XLARGE,
"figure.dpi": 300,
"grid.color": "gray",
"grid.linestyle": "dotted",
"grid.linewidth": 0.6,
"lines.linewidth": 2,
"lines.markersize": 8,
"text.usetex": True,
"text.latex.preamble": preamble,
}
# Update rcParams
plt.rcParams.update(rcparams)

368
src/wip3m/plot_utils.py Normal file
View file

@ -0,0 +1,368 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""Plotting utilities for the WIP-P3M package."""
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import TwoSlopeNorm
from mpl_toolkits.axes_grid1 import make_axes_locatable
import cmocean.cm as cm
from wip3m.plot_params import *
# Configure global plotting settings
setup_plotting()
fs = GLOBAL_FS_SMALL
fs_titles = GLOBAL_FS_LARGE
cols = COLOUR_LIST
cmap = cm.thermal
def plotly_3d(field, size=128, L=None, colormap="RdYlBu", limits="max"):
"""
Create an interactive 3D plot of volume slices using Plotly.
Parameters
----------
field : array-like
3D data field to visualise.
size : int, optional
Size of the field along one dimension. Default is 128.
L : float, optional
Physical size of the field in Mpc/h. Used for axis labels only.
colormap : str, optional
Colour map for visualisation. Default is 'RdYlBu'.
limits : str, optional
Colour scale limits ('max', 'truncate', or 'default'). Default
is 'max'.
Returns
-------
go.Figure
Plotly figure object.
"""
import numpy as np
import plotly.graph_objects as go
volume = field.T
rows, cols = volume[0].shape
# Define colour scale limits
if limits == "max":
maxcol = np.max(np.abs(volume))
mincol = -maxcol
elif limits == "truncate":
maxcol = min(np.max(-volume), np.max(volume))
mincol = -maxcol
else:
maxcol = np.max(volume)
mincol = np.min(volume)
midcol = np.mean(volume)
# Generate frames for the animation
nb_frames = size
frames = [
go.Frame(
data=go.Surface(
z=(size - k) * np.ones((rows, cols)),
surfacecolor=np.flipud(volume[cols - 1 - k]),
cmin=mincol,
cmid=midcol,
cmax=maxcol,
),
name=str(k), # Frames must be named for proper animation
)
for k in range(nb_frames)
]
# Initial plot configuration
fig = go.Figure(
frames=frames,
data=go.Surface(
z=size * np.ones((rows, cols)),
surfacecolor=np.flipud(volume[cols // 2]),
colorscale=colormap,
cmin=mincol,
cmid=midcol,
cmax=maxcol,
colorbar=dict(thickness=20, ticklen=4),
),
)
def frame_args(duration):
"""Helper function to set animation frame arguments."""
return {
"frame": {"duration": duration},
"mode": "immediate",
"fromcurrent": True,
"transition": {"duration": duration, "easing": "linear"},
}
# Add animation slider
sliders = [
{
"pad": {"b": 10, "t": 60},
"len": 0.9,
"x": 0.1,
"y": 0,
"steps": [
{
"args": [[f.name], frame_args(0)],
"label": str(k),
"method": "animate",
}
for k, f in enumerate(fig.frames)
],
}
]
# Configure layout with or without physical size
layout_config = dict(
title="Slices in density field",
width=600,
height=600,
scene=dict(
zaxis=dict(range=[0, size - 1], autorange=False),
xaxis_title="x [Mpc/h]",
yaxis_title="y [Mpc/h]",
zaxis_title="z [Mpc/h]",
aspectratio=dict(x=1, y=1, z=1),
),
updatemenus=[
{
"buttons": [
{"args": [None, frame_args(50)], "label": "▶", "method": "animate"},
{"args": [[None], frame_args(0)], "label": "◼", "method": "animate"},
],
"direction": "left",
"pad": {"r": 10, "t": 70},
"type": "buttons",
"x": 0.1,
"y": 0,
}
],
sliders=sliders,
)
if L is not None:
layout_config["scene"]["xaxis"] = dict(
ticktext=[0, L / 2, L],
tickvals=[0, size / 2, size],
title="x [Mpc/h]",
)
layout_config["scene"]["yaxis"] = dict(
ticktext=[0, L / 2, L],
tickvals=[0, size / 2, size],
title="y [Mpc/h]",
)
layout_config["scene"]["zaxis"]["ticktext"] = [0, L / 2, L]
layout_config["scene"]["zaxis"]["tickvals"] = [0, size / 2, size]
fig.update_layout(**layout_config)
return fig
def matplotlib_to_plotly(cmap, n=255):
"""Convert a matplotlib colormap to a Plotly colorscale."""
colorscale = []
for i in range(n):
norm = i / (n - 1)
r, g, b, _ = cmap(norm)
colorscale.append([norm, f'rgb({int(r*255)}, {int(g*255)}, {int(b*255)})'])
return colorscale
thermal_plotly = matplotlib_to_plotly(cm.thermal)
def clear_large_plot(fig):
"""
Clear a figure to free up memory.
Parameters
----------
fig : matplotlib.figure.Figure
The figure to clear.
"""
from IPython.display import clear_output
del fig
clear_output()
def load_force_diagnostic(filename):
"""
Load force diagnostic data.
Parameters
----------
filename : str
Path to the CSV file written by `run_force_subtraction_test`
from p3m.c.
Returns
-------
r : ndarray
Bin-centred distances.
fmag : ndarray
Total force magnitudes |f_after - f_before|.
data : recarray
Full structured array with all columns.
"""
data = np.genfromtxt(filename, delimiter=",", names=True)
# Append the magnitude of force difference to the data
fmag = np.sqrt(data["fx"]**2 + data["fy"]**2 + data["fz"]**2)
data = np.lib.recfunctions.append_fields(data, "fmag", fmag, usemask=False, asrecarray=True)
return data["distance"], data["fmag"], data
def plot_force_distance(r, fmag, f_max=1e-1, a=None, title=None, save_path=None):
"""
Plot total force magnitude vs distance and a theoretical 1/ profile.
Parameters
----------
r : array_like
Distance values (bin centers or actual distances).
fmag : array_like
Corresponding force magnitudes.
f_max : float, optional
Maximum force magnitude for including points in the plot (default: 1e-1).
a : float or None, optional
If provided, the theoretical prefactor for the 1/ profile.
title : str, optional
Plot title.
save_path : str or None, optional
If provided, path to save the figure as a PDF.
"""
from scipy.optimize import curve_fit
# Select points to include in the plot
mask = fmag < f_max
rs = r[mask]
fmag_select = fmag[mask]
fig, ax = plt.subplots()
ax.scatter(rs, fmag_select, s=10, alpha=0.8, label="Particle/hole pairs", color="tab:blue")
# If a is provided, plot the theoretical curve
if a is not None:
def inverse_square(r, a):
return a / r**2
r = np.linspace(np.min(rs), np.max(rs), 300)
f_th = inverse_square(r, a)
ax.plot(r, f_th, "r-", label=r"$\propto 1/r^2$")
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel(r"Distance $r$ [Mpc/$h$]")
ax.set_ylabel(r"Force magnitude [code units]")
if title is not None:
ax.set_title(title)
ax.legend()
ax.grid(True, which="both", ls=":", lw=0.5)
plt.tight_layout()
if save_path:
plt.savefig(save_path)
print(f"Figure saved to: {save_path}")
plt.show()
def plot_force_distance_comparison(rr, ff, ll, L, Np, Npm, ss=None, a=None, title=None, save_path=None):
"""
Plot force magnitude vs distance.
Parameters
----------
rr : list of array_like
List of distance arrays.
ff : list of array_like
List of force magnitude arrays corresponding to each distance array.
ll : list of str
List of labels for each dataset.
ss : list of str or None, optional
List of symbols for each dataset (default: None).
L : float
Physical size of the field in Mpc/h.
Np : int
Number of particles per dimension.
Npm : int
Number of PM cells per dimension.
a : float or None, optional
If provided, the theoretical prefactor for the 1/ profile.
title : str, optional
Plot title.
save_path : str or None, optional
If provided, path to save the figure as a PDF.
"""
fig, ax = plt.subplots()
colours = plt.rcParams["axes.prop_cycle"].by_key()["color"]
handles1 = []
if ss is None:
ss = ["o"] * len(rr)
for i, (r, f, label, symbol) in enumerate(zip(rr, ff, ll, ss)):
scatter = ax.scatter(r, f, alpha=0.8, s=20, label=label,
color=colours[i % len(colours)], marker=symbol)
handles1.append(scatter)
loc1="lower right"
# Theoretical curve
theory_line = None
if a is not None and len(rr) > 0:
loc1="upper right"
def inverse_square(r, a):
return a / r**2
r_min = min(np.min(r) for r in rr)
r_max = max(np.max(r) for r in rr)
r_plot = np.linspace(r_min, r_max, 300)
f_th = inverse_square(r_plot, a)
theory_line, = ax.plot(r_plot, f_th, "k-", label=r"theory $\propto 1/r^2$")
handles1.append(theory_line)
# Characteristic vertical reference scales
nyquist = 2 * L / Npm
epsilon = 0.03 * L / Np
xs = 1.25 * L / Npm
xr = 4.5 * xs
line1 = ax.axvline(x=nyquist, color="black", linestyle="-", lw=1, label="Nyquist")
line2 = ax.axvline(x=2*epsilon, color="gray", linestyle="--", lw=2, label=r"Particle length $2\epsilon$")
line3 = ax.axvline(x=xs, color="gray", linestyle="-.", lw=2, label=r"Split scale $x_s$")
line4 = ax.axvline(x=xr, color="gray", linestyle=":", lw=2, label=r"Short-range reach $x_r$")
print(f"Nyquist: {nyquist:.2f} Mpc/h")
print(f"Particle length: {2*epsilon:.2f} Mpc/h")
print(f"Split scale: {xs:.2f} Mpc/h")
print(f"Short-range reach: {xr:.2f} Mpc/h")
# Set log-log axes and labels
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel(r"Distance $r$ [Mpc/$h$]")
ax.set_ylabel(r"Force magnitude [code units]")
if title is not None:
ax.set_title(title)
# Legend for data
legend1 = ax.legend(handles=handles1, loc=loc1, frameon=True, fontsize=GLOBAL_FS_TINY)
ax.add_artist(legend1)
# Legend for vertical lines
handles2 = [line1, line2, line3, line4]
fig.subplots_adjust(bottom=0.3)
legend2 = fig.legend(handles=handles2, loc='lower center', ncol=2, frameon=False)
ax.grid(True, which="both", ls=":", lw=0.5)
if save_path:
plt.savefig(save_path)
print(f"Figure saved to: {save_path}")
plt.show()

14
src/wip3m/preamble.tex Normal file
View file

@ -0,0 +1,14 @@
% ----------------------------------------------------------------------
% Copyright (C) 2025 Tristan Hoellinger
% Distributed under the GNU General Public License v3.0 (GPLv3).
% See the LICENSE file in the root directory for details.
% SPDX-License-Identifier: GPL-3.0-or-later
% ----------------------------------------------------------------------
% Author: Tristan Hoellinger
% Version: 0.1.0
% Date: 2025
% License: GPLv3
\usepackage{amsmath,amsfonts,amssymb,amsthm}
\usepackage{upgreek}

424
src/wip3m/tools.py Normal file
View file

@ -0,0 +1,424 @@
#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2025 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2025"
__license__ = "GPLv3"
"""
Various tools for the WIP3M project.
"""
import os
import gc
import numpy as np
from wip3m.logger import getCustomLogger
logger = getCustomLogger(__name__)
def none_bool_str(value):
"""Convert a string to None, bool, or str.
Parameters
----------
value : str
String to convert.
Returns
-------
None, bool, or str
Converted value.
"""
if value == "None" or value == None:
return None
elif value == "True":
return True
elif value == "False":
return False
return value
def get_k_max(L, size):
"""
Compute the maximum wavenumber for a given box size.
Parameters
----------
L : float
Size of the box in Mpc/h.
size : int
Number of grid cells along each dimension.
Returns
-------
float
Maximum wavenumber in h/Mpc.
"""
from numpy import pi, sqrt
# If kx = ky = kz = k_Nyquist, then |k| = sqrt(3) * k_Nyquist
return int(1e3 * sqrt(3) * pi * size / L + 1) * 1e-3
def joinstrs(list_of_strs):
"""Join a list of strings into a single string.
Parameters
----------
list_of_strs : list of str
List of strings to join.
Returns
-------
str
Concatenated string.
"""
return "".join([str(x) for x in list_of_strs if x is not None])
def generate_sim_params(params_dict, ICs, workdir, outdir, file_ext=None, force=False):
"""Write the parameter file.
Parameters
----------
params_dict : dict
Dictionary containing the parameters for the simulation.
ICs : str
Path to the initial conditions.
workdir : str
Directory where to store the parameter file.
outdir : str
Directory where to store the simulation outputs.
file_ext : str, optional
Prefix for the output files.
Returns
-------
sbmy_path : str
Path to the parameter file generated.
"""
from os.path import isfile
from pysbmy import param_file
from pysbmy.timestepping import StandardTimeStepping
method = params_dict["method"]
path = workdir + file_ext + "_" if file_ext else workdir
simpath = outdir + file_ext + "_" if file_ext else outdir
sbmy_path = joinstrs([path, "example_", method, ".sbmy"])
# Parameters shared by all methods for this run
Particles = params_dict["Np"]
Mesh = params_dict["N"]
BoxSize = params_dict["L"]
corner0 = params_dict["corner0"]
corner1 = params_dict["corner1"]
corner2 = params_dict["corner2"]
h = params_dict["h"]
Omega_m = params_dict["Omega_m"]
Omega_b = params_dict["Omega_b"]
n_s = params_dict["n_s"]
sigma8 = params_dict["sigma8"]
# Generate the time-stepping distribution
if method in ["pm", "cola", "spm", "p3m"]:
ts_filename = path + "ts_" + method + ".h5"
logger.info("Time-stepping distribution file: %s", ts_filename)
if not isfile(ts_filename) or force:
TimeStepDistribution = params_dict["TimeStepDistribution"]
ai = params_dict["ai"]
af = params_dict["af"]
nsteps = params_dict["nsteps"]
snapshots = np.full((nsteps), False)
TS = StandardTimeStepping(ai, af, snapshots, TimeStepDistribution)
TS.write(ts_filename)
else:
logger.info("Time-stepping distribution file already exists: %s", ts_filename)
StandardTimeStepping.read(ts_filename).plot(savepath=path + "ts_" + method + ".png")
elif method in ["lpt"]:
pass
else:
raise ValueError("Method not supported: {}".format(method))
# Write the parameter file
logger.info("Generating parameter file...")
if params_dict["method"] == "lpt":
S = param_file(
OutputRngStateLPT=simpath + "dummy.rng",
# Basic setup:
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
# Initial conditions:
ICsMode=params_dict["ICsMode"],
InputWhiteNoise=params_dict["InputWhiteNoise"], # None or str
WriteInitialConditions=1,
OutputInitialConditions=ICs,
# Power spectrum:
InputPowerSpectrum=params_dict["InputPowerSpectrum"],
# Final conditions for LPT:
OutputLPTSnapshot=simpath + "lpt_particles.gadget3",
OutputLPTDensity=simpath + "lpt_density.h5",
#############################
## Cosmological parameters ##
#############################
h=h,
Omega_m=Omega_m,
Omega_b=Omega_b,
n_s=n_s,
sigma8=sigma8,
Omega_q=1.0 - Omega_m,
Omega_k=0.0,
w0_fld=-1.0,
wa_fld=0.0,
)
if params_dict["method"] == "pm":
S = param_file(
# Basic setup:
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
# Initial conditions:
ICsMode=2,
InputInitialConditions=ICs,
# Final conditions for LPT:
RedshiftLPT=params_dict["RedshiftLPT"],
WriteLPTSnapshot=0,
WriteLPTDensity=0,
####################
## Module PM/COLA ##
####################
ModulePMCOLA=1,
EvolutionMode=1,
ParticleMesh=params_dict["Npm"],
TimeStepDistribution=ts_filename,
# Final snapshot:
RedshiftFCs=params_dict["RedshiftFCs"],
WriteFinalSnapshot=1,
OutputFinalSnapshot=simpath + "pm_snapshot.gadget3",
WriteFinalDensity=1,
OutputFinalDensity=simpath + "final_density_pm.h5",
RunForceDiagnostic=params_dict["RunForceDiagnostic"],
nPairsForceDiagnostic=params_dict["nPairsForceDiagnostic"],
nBinsForceDiagnostic=params_dict["nBinsForceDiagnostic"],
maxTrialsForceDiagnostic=params_dict["maxTrialsForceDiagnostic"],
OutputForceDiagnostic=params_dict["OutputForceDiagnostic"],
#############################
## Cosmological parameters ##
#############################
h=h,
Omega_m=Omega_m,
Omega_b=Omega_b,
n_s=n_s,
sigma8=sigma8,
Omega_q=1.0 - Omega_m,
Omega_k=0.0,
w0_fld=-1.0,
wa_fld=0.0,
)
if params_dict["method"] == "cola":
S = param_file(
# Basic setup:
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
# Initial conditions:
ICsMode=2,
InputInitialConditions=ICs,
# Final conditions for LPT:
RedshiftLPT=params_dict["RedshiftLPT"],
WriteLPTSnapshot=0,
WriteLPTDensity=0,
####################
## Module PM/COLA ##
####################
ModulePMCOLA=1,
EvolutionMode=2,
ParticleMesh=params_dict["Npm"],
TimeStepDistribution=ts_filename,
# Final snapshot:
RedshiftFCs=params_dict["RedshiftFCs"],
WriteFinalSnapshot=1,
OutputFinalSnapshot=simpath + "cola_snapshot.gadget3",
WriteFinalDensity=1,
OutputFinalDensity=simpath + "final_density_cola.h5",
RunForceDiagnostic=params_dict["RunForceDiagnostic"],
nPairsForceDiagnostic=params_dict["nPairsForceDiagnostic"],
nBinsForceDiagnostic=params_dict["nBinsForceDiagnostic"],
maxTrialsForceDiagnostic=params_dict["maxTrialsForceDiagnostic"],
OutputForceDiagnostic=params_dict["OutputForceDiagnostic"],
#############################
## Cosmological parameters ##
#############################
h=h,
Omega_m=Omega_m,
Omega_b=Omega_b,
n_s=n_s,
sigma8=sigma8,
Omega_q=1.0 - Omega_m,
Omega_k=0.0,
w0_fld=-1.0,
wa_fld=0.0,
)
elif params_dict["method"] == "p3m" or params_dict["method"] == "spm":
S = param_file(
# Basic setup:
Particles=Particles,
Mesh=Mesh,
BoxSize=BoxSize,
corner0=corner0,
corner1=corner1,
corner2=corner2,
# Initial conditions:
ICsMode=2,
InputInitialConditions=ICs,
# Final conditions for LPT:
RedshiftLPT=params_dict["RedshiftLPT"],
WriteLPTSnapshot=0,
WriteLPTDensity=0,
####################
## Module PM/COLA ##
####################
ModulePMCOLA=1,
EvolutionMode=params_dict["EvolutionMode"],
ParticleMesh=params_dict["Npm"],
TimeStepDistribution=ts_filename,
# Final snapshot:
RedshiftFCs=params_dict["RedshiftFCs"],
WriteFinalSnapshot=1,
OutputFinalSnapshot=simpath + "{}_snapshot.gadget3".format(method),
WriteFinalDensity=1,
OutputFinalDensity=simpath + "final_density_{}.h5".format(method),
n_Tiles=params_dict["n_Tiles"],
RunForceDiagnostic=params_dict["RunForceDiagnostic"],
nPairsForceDiagnostic=params_dict["nPairsForceDiagnostic"],
nBinsForceDiagnostic=params_dict["nBinsForceDiagnostic"],
maxTrialsForceDiagnostic=params_dict["maxTrialsForceDiagnostic"],
OutputForceDiagnostic=params_dict["OutputForceDiagnostic"],
#############################
## Cosmological parameters ##
#############################
h=h,
Omega_m=Omega_m,
Omega_b=Omega_b,
n_s=n_s,
sigma8=sigma8,
Omega_q=1.0 - Omega_m,
Omega_k=0.0,
w0_fld=-1.0,
wa_fld=0.0,
)
if not isfile(sbmy_path) or force:
S.write(sbmy_path)
logger.info("Parameter file written to %s", sbmy_path)
else:
logger.info("Parameter file already exists at %s", sbmy_path)
return sbmy_path
def read_field(*args):
"""
Read a field from a file.
Parameters
----------
args : tuple
Arguments to pass to the read_field function from pysbmy.
Returns
-------
field : str
The field read from the file.
"""
from io import BytesIO
from wip3m.low_level import stdout_redirector
from pysbmy.field import read_field as _read_field
with BytesIO() as f:
with stdout_redirector(f):
return _read_field(*args)
def generate_white_noise_Field(
L,
size,
corner,
seedphase,
fname_whitenoise,
seedname_whitenoise,
force_phase=False,
):
"""
Generate a white noise realisation in physical space and write it to
disk.
Parameters
----------
L : float
Size of the simulation box (in Mpc/h).
size : int
Number of grid points along each axis.
corner : float
Position of the corner (in Mpc/h).
seedphase : int or list of int
User-provided seed to generate the initial white noise.
fname_whitenoise : str
File path to write the white noise realisation.
seedname_whitenoise : str
File path to write the seed state of the RNG.
force_phase : bool, optional
If True, forces regeneration of the random phases. Default is
False.
Raises
------
OSError
If file writing fails or directory paths are invalid.
RuntimeError
For unexpected issues.
"""
if not os.path.exists(fname_whitenoise) or force_phase:
from pysbmy.field import BaseField
try:
logger.debug("Generating white noise for L=%.2f, size=%d", L, size)
rng = np.random.default_rng(seedphase)
logger.debug("Saving RNG state to %s", seedname_whitenoise)
np.save(seedname_whitenoise, rng.bit_generator.state)
with open(seedname_whitenoise + ".txt", "w") as f:
f.write(str(rng.bit_generator.state))
data = rng.standard_normal(size=size**3)
wn = BaseField(L, L, L, corner, corner, corner, 1, size, size, size, data)
del data
wn.write(fname_whitenoise)
logger.debug("White noise field written to %s", fname_whitenoise)
del wn
except OSError as e:
logger.error("Writing white noise failed at '%s': %s", fname_whitenoise, str(e))
raise
except Exception as e:
logger.critical("Unexpected error in generate_white_noise_Field: %s", str(e))
raise RuntimeError("generate_white_noise_Field failed.") from e
finally:
gc.collect()