Joint kNN-CDF calculation (#36)

* Add joint kNN CDF

* add jointKNN calculation

* change sub script

* Update readme

* update sub

* Small changes

* comments

* update nb

* Update submisison script
This commit is contained in:
Richard Stiskalek 2023-04-01 11:16:11 +01:00 committed by GitHub
parent cb67e326c4
commit 522ee709c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 4569 additions and 57 deletions

View file

@ -7,9 +7,10 @@
## Project Clustering ## Project Clustering
- [ ] Add uncertainty to the kNN-CDF autocorrelation. - [ ] Add uncertainty to the kNN-CDF autocorrelation?
- [ ] Add the joint kNN-CDF calculation. - [ ] Add kNN-CDF differences.
- [ ] Make kNN-CDF more memory friendly if generating many randoms. - [x] Add the joint kNN-CDF calculation.
- [x] Make kNN-CDF more memory friendly if generating many randoms.
## Project Environmental Dependence ## Project Environmental Dependence

View file

@ -124,6 +124,62 @@ class kNN_CDF:
cdf[cdf > 0.5] = 1 - cdf[cdf > 0.5] cdf[cdf > 0.5] = 1 - cdf[cdf > 0.5]
return cdf return cdf
@staticmethod
def clipped_cdf(cdf):
"""
Clip the CDF, setting values where the CDF is either 0 or after the
first occurence of 1 to `numpy.nan`.
Parameters
----------
cdf : 2- or 3-dimensional array
CDF to be clipped.
Returns
-------
clipped_cdf : 2- or 3-dimensional array
The clipped CDF.
"""
cdf = numpy.copy(cdf)
if cdf.ndim == 2:
cdf = cdf.reshape(1, *cdf.shape)
nknns, nneighbours, __ = cdf.shape
for i in range(nknns):
for k in range(nneighbours):
ns = numpy.where(cdf[i, k, :] == 1.)[0]
if ns.size > 1:
cdf[i, k, ns[1]:] = numpy.nan
cdf[cdf == 0] = numpy.nan
cdf = cdf[0, ...] if nknns == 1 else cdf # Reshape if necessary
return cdf
@staticmethod
def joint_to_corr(cdf0, cdf1, joint_cdf):
"""
Calculate the correlation function from the joint kNN-CDFs.
Parameters
----------
cdf0 : 2-dimensional array
CDF evaluated at `rs` of the first kNN.
cdf1 : 2-dimensional array
CDF evaluated at `rs` of the second kNN.
joint_cdf : 2-dimensional array
Joint CDF evaluated at `rs`.
Returns
-------
corr : 2-dimensional array
Correlation function evaluated at `rs`.
"""
assert cdf0.ndim == cdf1.ndim == joint_cdf.ndim == 2
corr = numpy.zeros_like(joint_cdf)
for k in range(joint_cdf.shape[0]):
corr[k, :] = joint_cdf[k, :] - cdf0[k, :] * cdf1[k, :]
return corr
def brute_cdf(self, knn, nneighbours, Rmax, nsamples, rmin, rmax, neval, def brute_cdf(self, knn, nneighbours, Rmax, nsamples, rmin, rmax, neval,
random_state=42, dtype=numpy.float32): random_state=42, dtype=numpy.float32):
""" """
@ -173,9 +229,102 @@ class kNN_CDF:
cdf = numpy.asanyarray(cdf) cdf = numpy.asanyarray(cdf)
return rs, cdf return rs, cdf
def joint(self, knn0, knn1, nneighbours, Rmax, nsamples, rmin, rmax,
neval, batch_size=None, random_state=42,
dtype=numpy.float32):
"""
Calculate the joint CDF for two kNNs of CSiBORG halo catalogues.
Parameters
----------
knn0 : `sklearn.neighbors.NearestNeighbors` instance
kNN of the first CSiBORG halo catalogue.
knn1 : `sklearn.neighbors.NearestNeighbors` instance
kNN of the second CSiBORG halo catalogue.
neighbours : int
Maximum number of neighbours to use for the kNN-CDF calculation.
Rmax : float
Maximum radius of the sphere in which to sample random points for
the knn-CDF calculation. This should match the CSiBORG catalogues.
nsamples : int
Number of random points to sample for the knn-CDF calculation.
rmin : float
Minimum distance to evaluate the CDF.
rmax : float
Maximum distance to evaluate the CDF.
neval : int
Number of points to evaluate the CDF.
batch_size : int, optional
Number of random points to sample in each batch. By default equal
to `nsamples`, however recommeded to be smaller to avoid requesting
too much memory,
random_state : int, optional
Random state for the random number generator.
dtype : numpy dtype, optional
Calculation data type. By default `numpy.float32`.
Returns
-------
rs : 1-dimensional array
Distances at which the CDF is evaluated.
cdf0 : 2-dimensional array
CDF evaluated at `rs` of the first kNN.
cdf1 : 2-dimensional array
CDF evaluated at `rs` of the second kNN.
joint_cdf : 2-dimensional array
Joint CDF evaluated at `rs`.
"""
batch_size = nsamples if batch_size is None else batch_size
assert nsamples >= batch_size
nbatches = nsamples // batch_size
bins = numpy.logspace(numpy.log10(rmin), numpy.log10(rmax), neval)
joint_cdf = numpy.zeros((nneighbours, neval - 1), dtype=dtype)
cdf0 = numpy.zeros_like(joint_cdf)
cdf1 = numpy.zeros_like(joint_cdf)
jointdist = numpy.zeros((batch_size, 2), dtype=dtype)
for j in range(nbatches):
rand = self.rvs_in_sphere(batch_size, Rmax,
random_state=random_state + j)
dist0, __ = knn0.kneighbors(rand, nneighbours)
dist1, __ = knn1.kneighbors(rand, nneighbours)
for k in range(nneighbours):
jointdist[:, 0] = dist0[:, k]
jointdist[:, 1] = dist1[:, k]
maxdist = numpy.max(jointdist, axis=1)
# Joint CDF
_counts, __, __ = binned_statistic(
maxdist, maxdist, bins=bins, statistic="count",
range=(rmin, rmax))
joint_cdf[k, :] += _counts
# First CDF
_counts, __, __ = binned_statistic(
dist0[:, k], dist0[:, k], bins=bins, statistic="count",
range=(rmin, rmax))
cdf0[k, :] += _counts
# Second CDF
_counts, __, __ = binned_statistic(
dist1[:, k], dist1[:, k], bins=bins, statistic="count",
range=(rmin, rmax))
cdf1[k, :] += _counts
joint_cdf = numpy.cumsum(joint_cdf, axis=-1)
cdf0 = numpy.cumsum(cdf0, axis=-1)
cdf1 = numpy.cumsum(cdf1, axis=-1)
for k in range(nneighbours):
joint_cdf[k, :] /= joint_cdf[k, -1]
cdf0[k, :] /= cdf0[k, -1]
cdf1[k, :] /= cdf1[k, -1]
rs = (bins[1:] + bins[:-1]) / 2 # Bin centers
return rs, cdf0, cdf1, joint_cdf
def __call__(self, *knns, nneighbours, Rmax, nsamples, rmin, rmax, neval, def __call__(self, *knns, nneighbours, Rmax, nsamples, rmin, rmax, neval,
batch_size=None, verbose=True, random_state=42, batch_size=None, verbose=True, random_state=42,
left_nan=True, right_nan=True, dtype=numpy.float32): dtype=numpy.float32):
""" """
Calculate the CDF for a set of kNNs of CSiBORG halo catalogues. Calculate the CDF for a set of kNNs of CSiBORG halo catalogues.
@ -204,12 +353,6 @@ class kNN_CDF:
Verbosity flag. Verbosity flag.
random_state : int, optional random_state : int, optional
Random state for the random number generator. Random state for the random number generator.
left_nan : bool, optional
Whether to set values where the CDF is 0 to `numpy.nan`. By
default `True`.
right_nan : bool, optional
Whether to set values where the CDF is 1 to `numpy.nan` after its
first occurence to 1. By default `True`.
dtype : numpy dtype, optional dtype : numpy dtype, optional
Calculation data type. By default `numpy.float32`. Calculation data type. By default `numpy.float32`.
@ -222,38 +365,28 @@ class kNN_CDF:
""" """
batch_size = nsamples if batch_size is None else batch_size batch_size = nsamples if batch_size is None else batch_size
assert nsamples >= batch_size assert nsamples >= batch_size
nbatches = nsamples // batch_size # Number of batches nbatches = nsamples // batch_size
# Preallocate the bins and the CDF array # Preallocate the bins and the CDF array
bins = numpy.logspace(numpy.log10(rmin), numpy.log10(rmax), neval) bins = numpy.logspace(numpy.log10(rmin), numpy.log10(rmax), neval)
cdfs = numpy.zeros((len(knns), nneighbours, neval - 1), dtype=dtype) cdfs = numpy.zeros((len(knns), nneighbours, neval - 1), dtype=dtype)
for i, knn in enumerate(tqdm(knns) if verbose else knns): for i, knn in enumerate(tqdm(knns) if verbose else knns):
# Loop over batches. This is to avoid generating large mocks
# requiring a lot of memory. Add counts to the CDF array
for j in range(nbatches): for j in range(nbatches):
rand = self.rvs_in_sphere(batch_size, Rmax, rand = self.rvs_in_sphere(batch_size, Rmax,
random_state=random_state + j) random_state=random_state + j)
dist, __ = knn.kneighbors(rand, nneighbours) dist, __ = knn.kneighbors(rand, nneighbours)
for k in range(nneighbours): # Count for each neighbour for k in range(nneighbours): # Count for each neighbour
_counts, __, __ = binned_statistic( _counts, __, __ = binned_statistic(
dist[:, k], dist[:, k], bins=bins, statistic="count", dist[:, k], dist[:, k], bins=bins, statistic="count",
range=(rmin, rmax)) range=(rmin, rmax))
cdfs[i, k, :] += _counts cdfs[i, k, :] += _counts
rs = (bins[1:] + bins[:-1]) / 2 # Bin centers
cdfs = numpy.cumsum(cdfs, axis=-1) # Cumulative sum, i.e. the CDF cdfs = numpy.cumsum(cdfs, axis=-1) # Cumulative sum, i.e. the CDF
for i in range(len(knns)): for i in range(len(knns)):
for k in range(nneighbours): for k in range(nneighbours):
cdfs[i, k, :] /= cdfs[i, k, -1] cdfs[i, k, :] /= cdfs[i, k, -1]
# Set to NaN values after the first point where the CDF is 1
if right_nan:
ns = numpy.where(cdfs[i, k, :] == 1.)[0]
if ns.size > 1:
cdfs[i, k, ns[1]:] = numpy.nan
# Set to NaN values where the CDF is 0
if left_nan:
cdfs[cdfs == 0] = numpy.nan
rs = (bins[1:] + bins[:-1]) / 2 # Bin centers
cdfs = cdfs[0, ...] if len(knns) == 1 else cdfs cdfs = cdfs[0, ...] if len(knns) == 1 else cdfs
return rs, cdfs return rs, cdfs

File diff suppressed because one or more lines are too long

View file

@ -17,6 +17,7 @@ from os.path import join
from argparse import ArgumentParser from argparse import ArgumentParser
from copy import deepcopy from copy import deepcopy
from datetime import datetime from datetime import datetime
from itertools import combinations
from mpi4py import MPI from mpi4py import MPI
from TaskmasterMPI import master_process, worker_process from TaskmasterMPI import master_process, worker_process
from sklearn.neighbors import NearestNeighbors from sklearn.neighbors import NearestNeighbors
@ -59,7 +60,8 @@ ics = [7444, 7468, 7492, 7516, 7540, 7564, 7588, 7612, 7636, 7660, 7684,
9556, 9580, 9604, 9628, 9652, 9676, 9700, 9724, 9748, 9772, 9796, 9556, 9580, 9604, 9628, 9652, 9676, 9700, 9724, 9748, 9772, 9796,
9820, 9844] 9820, 9844]
dumpdir = "/mnt/extraspace/rstiskalek/csiborg/knn" dumpdir = "/mnt/extraspace/rstiskalek/csiborg/knn"
fout = join(dumpdir, "knncdf_{}.p") fout_auto = join(dumpdir, "auto", "knncdf_{}.p")
fout_cross = join(dumpdir, "cross", "knncdf_{}_{}.p")
############################################################################### ###############################################################################
@ -68,7 +70,7 @@ fout = join(dumpdir, "knncdf_{}.p")
knncdf = csiborgtools.match.kNN_CDF() knncdf = csiborgtools.match.kNN_CDF()
def do_task(ic): def do_auto(ic):
out = {} out = {}
cat = csiborgtools.read.HaloCatalogue(ic, max_dist=Rmax) cat = csiborgtools.read.HaloCatalogue(ic, max_dist=Rmax)
@ -83,7 +85,39 @@ def do_task(ic):
out.update({"cdf_{}".format(i): cdf}) out.update({"cdf_{}".format(i): cdf})
out.update({"rs": rs, "mass_threshold": mass_threshold}) out.update({"rs": rs, "mass_threshold": mass_threshold})
joblib.dump(out, fout.format(ic)) joblib.dump(out, fout_auto.format(ic))
def do_cross(ics):
out = {}
cat1 = csiborgtools.read.HaloCatalogue(ics[0], max_dist=Rmax)
cat2 = csiborgtools.read.HaloCatalogue(ics[1], max_dist=Rmax)
for i, mmin in enumerate(mass_threshold):
knn1 = NearestNeighbors()
knn1.fit(cat1.positions[cat1["totpartmass"] > mmin, ...])
knn2 = NearestNeighbors()
knn2.fit(cat2.positions[cat2["totpartmass"] > mmin, ...])
rs, cdf0, cdf1, joint_cdf = knncdf.joint(
knn1, knn2, nneighbours=args.nneighbours, Rmax=Rmax,
rmin=args.rmin, rmax=args.rmax, nsamples=args.nsamples,
neval=args.neval, batch_size=args.batch_size,
random_state=args.seed)
corr = knncdf.joint_to_corr(cdf0, cdf1, joint_cdf)
out.update({"corr_{}".format(i): corr})
out.update({"rs": rs, "mass_threshold": mass_threshold})
joblib.dump(out, fout_cross.format(*ics))
###############################################################################
# Autocorrelation calculation #
###############################################################################
if nproc > 1: if nproc > 1:
@ -91,15 +125,34 @@ if nproc > 1:
tasks = deepcopy(ics) tasks = deepcopy(ics)
master_process(tasks, comm, verbose=True) master_process(tasks, comm, verbose=True)
else: else:
worker_process(do_task, comm, verbose=False) worker_process(do_auto, comm, verbose=False)
else: else:
tasks = deepcopy(ics) tasks = deepcopy(ics)
for task in tasks: for task in tasks:
print("{}: completing task `{}`.".format(datetime.now(), task)) print("{}: completing task `{}`.".format(datetime.now(), task))
do_task(task) do_auto(task)
comm.Barrier() comm.Barrier()
###############################################################################
# Crosscorrelation calculation #
###############################################################################
if nproc > 1:
if rank == 0:
tasks = list(combinations(ics, 2))
master_process(tasks, comm, verbose=True)
else:
worker_process(do_cross, comm, verbose=False)
else:
tasks = deepcopy(ics)
for task in tasks:
print("{}: completing task `{}`.".format(datetime.now(), task))
do_cross(task)
comm.Barrier()
if rank == 0: if rank == 0:
print("{}: all finished.".format(datetime.now())) print("{}: all finished.".format(datetime.now()))
quit() # Force quit the script quit() # Force quit the script

View file

@ -1,21 +1,17 @@
nthreads=30 nthreads=151
memory=7 memory=4
queue="berg" queue="cmb"
env="/mnt/zfsusers/rstiskalek/csiborgtools/venv_galomatch/bin/python" env="/mnt/zfsusers/rstiskalek/csiborgtools/venv_galomatch/bin/python"
file="run_knn.py" file="run_knn.py"
rmin=0.01 rmin=0.01
rmax=100 rmax=100
nneighbours=16 nneighbours=8
nsamples=1000000000 nsamples=100000000
batch_size=10000000 batch_size=1000000
neval=10000 neval=10000
# 1000,000,0 pythoncm="$env $file --rmin $rmin --rmax $rmax --nneighbours $nneighbours --nsamples $nsamples --batch_size $batch_size --neval $neval"
# 10000000 # 1e7
# 1000000000
pythoncm="$env $file --rmin $rmin --rmax $rmax --nneighbours $nneighbours --nsamples $nsamples --neval $neval"
# echo $pythoncm # echo $pythoncm
# $pythoncm # $pythoncm