import numpy as np
import numpy.random as random
from slsim.Util import param_util
from slsim.Lenses.selection import object_cut
from slsim.Deflectors.MassLightConnection.richness2mass import mass_richness_relation
from slsim.Halos.halo_population import gene_e_ang_halo, concent_m_w_scatter
from colossus.cosmology import cosmology as colossus_cosmo
from slsim.Deflectors.MassLightConnection.velocity_dispersion import (
vel_disp_abundance_matching,
)
from slsim.Deflectors.DeflectorPopulation.elliptical_lens_galaxies import (
elliptical_projected_eccentricity,
)
from slsim.Deflectors.MassLightConnection.velocity_dispersion import vel_disp_nfw
from slsim.Deflectors.DeflectorPopulation.deflectors_base import DeflectorsBase
from slsim.Deflectors.deflector import Deflector
from lenstronomy.Util.param_util import phi_q2_ellipticity
from astropy import units as u
from astropy.table import hstack
from scipy.spatial.distance import cdist
[docs]
class ClusterDeflectors(DeflectorsBase):
"""Class describing cluster lens model with a NFW profile for the dark
matter halo and EPL profile for the subhalos (cluster members). It makes
use of a group/cluster catalog and a group/cluster member catalog (e.g.
redMaPPer).
This class is called by setting deflector_type == "cluster-catalog"
in LensPop.
"""
def __init__(
self,
cluster_list,
members_list,
galaxy_list,
kwargs_cut,
kwargs_mass2light,
cosmo,
sky_area,
catalog_type="skypy",
richness_fn="Abdullah2022",
kwargs_draw_members=None,
assign_galaxy_redshift=False,
cored_profile=False,
):
"""
:param cluster_list: list of dictionary with redshift and richness
(or mass) from a group/cluster catalog.
Mandatory keys: 'cluster_id' 'z', 'richness' or 'halo_mass'
:type cluster_list: ~astropy.table.Table
:param members_list: list of dictionary with positions and magnitudes of
group/cluster members.
Mandatory keys: 'cluster_id', 'ra', 'dec', 'mag_{band}'
:type members_list: ~astropy.table.Table
:param galaxy_list: list of dictionary with lens parameters of
SLSim galaxies to be assigned as deflectors to each member.
:type galaxy_list: ~astropy.table.Table
:param kwargs_cut: cuts in parameters: band, band_mag, z_min, z_max
:type kwargs_cut: dict
:param kwargs_mass2light: mass-to-light relation
:type kwargs_mass2light: dict
:param cosmo: astropy.cosmology instance
:type cosmo: ~astropy.cosmology
:param sky_area: Sky area over which galaxy_list is sampled. Must be in units of
solid angle.
:type sky_area: `~astropy.units.Quantity`
:param richness_fn: richness-mass relation to assign a mass to each cluster
:type richness_fn: str
:param kwargs_draw_members: kwargs for draw_members method
:type kwargs_draw_members: dict or None
:param catalog_type: type of input catalog (skypy or cosmoDC2)
:type catalog_type: str
:param assign_galaxy_redshift: if True, assign the redshift of the
galaxy to the member galaxy instead of the cluster redshift
:type assign_galaxy_redshift: bool
:param cored_profile: flag for adding cored density profile
:type cored_profile: boolean
"""
galaxy_list = param_util.catalog_with_angular_size_in_arcsec(
galaxy_catalog=galaxy_list, input_catalog_type=catalog_type
)
super().__init__(
deflector_table=cluster_list,
kwargs_cut=kwargs_cut,
cosmo=cosmo,
sky_area=sky_area,
)
self.deflector_profile = "NFW_CLUSTER"
self.cored_profile = cored_profile
self.richness_fn = richness_fn
if kwargs_draw_members is None:
kwargs_draw_members = {}
self.kwargs_draw_members = kwargs_draw_members
self.set_cosmo()
cluster_list = self.preprocess_clusters(cluster_list)
members_list = self.preprocess_members(
cluster_list,
members_list,
galaxy_list,
assign_galaxy_redshift=assign_galaxy_redshift,
)
self._f_vel_disp = vel_disp_abundance_matching(
galaxy_list, z_max=0.5, sky_area=sky_area, cosmo=cosmo
)
self._members_select = object_cut(members_list, **kwargs_cut)
self._cluster_select = cluster_list[
np.isin(cluster_list["cluster_id"], self._members_select["cluster_id"])
]
self._members_select["vel_disp"] = self._f_vel_disp(
np.log10(self._members_select["stellar_mass"])
)
self._kwargs_mass2light = kwargs_mass2light
self._num_select = len(self._cluster_select)
self._cosmo = cosmo
# TODO: random reshuffle of matched list
[docs]
def deflector_number(self):
"""
:return: number of deflectors
"""
number = self._num_select
return number
[docs]
def draw_deflector(self, index=None):
"""
:param index: index of deflector, if not provided, draw randomly from all deflectors
:type index: int or None
:param cored: Boolean flag for cored density profile
:type cored: True for cored, False for cuspy profile
:return: dictionary of complete parameterization of deflector
"""
index = random.randint(0, self._num_select - 1)
deflector = self.draw_cluster(index)
members = self.draw_members(deflector["cluster_id"], **self.kwargs_draw_members)
deflector["subhalos"] = members
deflector["cored_profile"] = self.cored_profile
deflector_class = Deflector(deflector_type=self.deflector_profile, **deflector)
return deflector_class
[docs]
def get_deflector(self, cluster_id, cored=False):
"""
:param cluster_id: identifier of the cluster
:type cluster_id: int
:type index: int or None
:param cored: Boolean flag for cored density profile
:type cored: True for cored, False for cuspy profile
:return: dictionary of complete parameterization of deflector for the given cluster_id
"""
indices = np.where(self._cluster_select["cluster_id"] == cluster_id)[0]
index = indices[0] # Take the first match
# Draw the cluster using the found index
deflector = self.draw_cluster(index)
# Draw the members for this cluster
members = self.draw_members(deflector["cluster_id"], **self.kwargs_draw_members)
deflector["subhalos"] = members
deflector["cored_profile"] = self.cored_profile
# Create and return the deflector class
deflector_class = Deflector(deflector_type=self.deflector_profile, **deflector)
return deflector_class
[docs]
def draw_cluster(self, index):
"""
:param index: index of cluster in catalog
:type index: int
:return: dictionary of NFW parameters for the cluster halo
"""
cluster = self._cluster_select[index]
if cluster["halo_mass"] == -1:
cluster["halo_mass"] = mass_richness_relation(
cluster["richness"], self.richness_fn
)
if cluster["concentration"] == -1:
cluster["concentration"] = concent_m_w_scatter(
np.array([cluster["halo_mass"]]), cluster["z"], sig=0.33
)[0]
if cluster["vel_disp"] == -1:
cluster["vel_disp"] = vel_disp_nfw(
cluster["halo_mass"], cluster["concentration"], self.cosmo, cluster["z"]
)
if cluster["e1_mass"] == -1 or cluster["e2_mass"] == -1:
e, phi = gene_e_ang_halo(np.array([cluster["halo_mass"]]))
e1, e2 = phi_q2_ellipticity(np.deg2rad(phi[0]), 1 - e[0])
cluster["e1_mass"] = e1
cluster["e2_mass"] = e2
return dict(cluster)
[docs]
def draw_members(self, cluster_id, center_scatter=0.2, max_dist=80, bcg_band="r"):
"""
:param cluster_id: identifier of the cluster
:type cluster_id: int
:param center_scatter: scatter in center of the BCG in arcsec
:type center_scatter: float
:param max_dist: maximum distance from the BCG in arcsec
:type max_dist: float
bcg_band: band to use to identify the BCG
:type bcg_band: str
:return: astropy table with EPL+Sersic parameters of each member
"""
members = self._members_select[cluster_id == self._members_select["cluster_id"]]
members["vel_disp"] = np.where(
members["vel_disp"] == -1,
param_util.vel_disp_from_m_star(members["stellar_mass"]),
members["vel_disp"],
)
for i in range(len(members)):
if members[i]["e1_light"] == -1 or members[i]["e2_light"] == -1:
e1_light, e2_light, e1_mass, e2_mass = (
elliptical_projected_eccentricity(
**members[i], **self._kwargs_mass2light
)
)
members[i]["e1_light"] = e1_light
members[i]["e2_light"] = e2_light
members[i]["e1_mass"] = e1_mass
members[i]["e2_mass"] = e2_mass
members["n_sersic"] = np.where(
members["n_sersic"] == -1, 4, members["n_sersic"]
)
bcg_id = np.argmin(members[f"mag_{bcg_band}"])
bcg_ra, bcg_dec = members["ra"][bcg_id], members["dec"][bcg_id]
center_ra, center_dec = (
np.random.normal(bcg_ra, center_scatter / 3600),
np.random.normal(bcg_dec, center_scatter / 3600),
)
center_x = (members["ra"] - center_ra) * 3600 * np.cos(center_dec / 180 * np.pi)
center_y = (members["dec"] - center_dec) * 3600
members["center_x"] = np.where(
members["center_x"] == -1, center_x, members["center_x"]
)
members["center_y"] = np.where(
members["center_y"] == -1, center_y, members["center_y"]
)
center_dist = np.sqrt(members["center_x"] ** 2 + members["center_y"] ** 2)
members = members[center_dist < max_dist]
return members
[docs]
@staticmethod
def assign_similar_galaxy(
members_list,
galaxy_list,
cosmo=None,
bands=("g", "r", "i", "z", "Y"),
max_gals=10000,
assign_galaxy_redshift=False,
):
"""Assigns a similar galaxy to each member of a group/cluster member
catalog by comparing their magnitudes and redshifts.
:param members_list: astropy table with columns 'mag_{band}',
'z'
:type members_list: astropy.table.Table
:param galaxy_list: astropy table with columns 'mag_{band}', 'z'
:type galaxy_list: astropy.table.Table
:param cosmo: astropy.cosmology instance
:type cosmo: astropy.cosmology
:param bands: list of bands to compare
:type bands: list
:param max_gals: maximum number of galaxies to compare to
:type max_gals: int
:param assign_galaxy_redshift: if True, assign the redshift of
the galaxy to the member galaxy instead of the cluster
redshift
:type assign_galaxy_redshift: bool
:return: astropy table with the same number of rows as
members_list and columns from both members_list and
galaxy_list
:rtype: astropy.table.Table
"""
# shuffle galaxy list and select a subset
if len(galaxy_list) > max_gals:
indices = np.random.choice(len(galaxy_list), max_gals, replace=False)
galaxy_list = galaxy_list[indices]
mag_cols = [f"mag_{b}" for b in bands if f"mag_{b}" in members_list.columns]
if not mag_cols:
raise ValueError("No magnitude columns found in members_list")
mag_members = [members_list[mag] for mag in mag_cols]
mag_galaxies = [galaxy_list[mag] for mag in mag_cols]
dist_mod_members = -5 * np.log10(
cosmo.luminosity_distance(members_list["z"]) / (10 * u.pc)
)
dist_mod_galaxies = -5 * np.log10(
cosmo.luminosity_distance(galaxy_list["z"]) / (10 * u.pc)
)
distance = cdist(
np.stack([*mag_members, dist_mod_members], axis=1),
np.stack([*mag_galaxies, dist_mod_galaxies], axis=1),
metric="euclidean",
)
nearest_neighbors_indices = distance.argmin(axis=1)
similar_galaxies = galaxy_list[nearest_neighbors_indices]
if assign_galaxy_redshift:
# Use galaxy redshift instead of member redshift
include_cols_members = [
col
for col in members_list.columns
if col not in mag_cols + ["z"] # Exclude both mags AND redshift
]
include_cols_galaxies = [
col
for col in galaxy_list.columns # Keep ALL galaxy columns including 'z'
]
else:
# Original behavior - use member redshift
include_cols_members = [
col for col in members_list.columns if col not in mag_cols
]
include_cols_galaxies = [
col for col in galaxy_list.columns if col not in ["z"]
]
return hstack(
[
members_list[include_cols_members],
similar_galaxies[include_cols_galaxies],
]
)
[docs]
@staticmethod
def preprocess_clusters(cluster_list):
n_clusters = len(cluster_list)
column_names = cluster_list.columns
if "cluster_id" not in column_names:
raise ValueError("cluster_id is mandatory in cluster catalog")
if "z" not in column_names:
raise ValueError("redshift is mandatory in cluster catalog")
if "halo_mass" not in column_names:
if "richness" not in column_names:
raise ValueError(
"richness or halo_mass is mandatory in cluster catalog"
)
cluster_list["halo_mass"] = -np.ones(n_clusters)
if "concentration" not in column_names:
cluster_list["concentration"] = -np.ones(n_clusters)
if "vel_disp" not in column_names:
cluster_list["vel_disp"] = -np.ones(n_clusters)
if "e1_mass" not in column_names or "e2_mass" not in column_names:
cluster_list["e1_mass"] = -np.ones(n_clusters)
cluster_list["e2_mass"] = -np.ones(n_clusters)
return cluster_list
[docs]
def preprocess_members(
self, cluster_list, members_list, galaxy_list, assign_galaxy_redshift=False
):
n_clusters = len(cluster_list)
n_members = len(members_list)
column_names = members_list.columns
if "z" not in column_names:
members_list["z"] = -np.ones(n_members)
# assign the redshift of the cluster to its members
for i in range(n_clusters):
z = cluster_list["z"][i]
members_list["z"][
members_list["cluster_id"] == cluster_list["cluster_id"][i]
] = z
# use center_x and center_y if available, otherwise use ra and dec
if "center_x" not in column_names or "center_y" not in column_names:
members_list["center_x"] = -np.ones(n_members)
members_list["center_y"] = -np.ones(n_members)
if "ra" not in column_names or "dec" not in column_names:
raise ValueError(
"ra and dec or center_x and center_y "
"are mandatory in members catalog"
)
else:
if "ra" not in column_names or "dec" not in column_names:
members_list["ra"] = -np.ones(n_members)
members_list["dec"] = -np.ones(n_members)
# assign a similar SLSim galaxy to each member
members_list = self.assign_similar_galaxy(
members_list,
galaxy_list,
cosmo=self.cosmo,
assign_galaxy_redshift=assign_galaxy_redshift,
)
# update column names
column_names = members_list.colnames
if "vel_disp" not in column_names:
members_list["vel_disp"] = -np.ones(n_members)
if "e1_light" not in column_names or "e2_light" not in column_names:
members_list["e1_light"] = -np.ones(n_members)
members_list["e2_light"] = -np.ones(n_members)
if "e1_mass" not in column_names or "e2_mass" not in column_names:
members_list["e1_mass"] = -np.ones(n_members)
members_list["e2_mass"] = -np.ones(n_members)
if "n_sersic" not in column_names:
members_list["n_sersic"] = -np.ones(n_members)
if "gamma_pl" not in column_names:
members_list["gamma_pl"] = np.ones(n_members) * 2
return members_list
[docs]
def set_cosmo(self):
"""Set the cosmology in colossus to match the astropy.cosmology
instance."""
params = dict(
flat=(self.cosmo.Ok0 == 0.0),
H0=self.cosmo.H0.value,
Om0=self.cosmo.Om0,
Ode0=self.cosmo.Ode0,
Ob0=(
self.cosmo.Ob0
if (self.cosmo.Ob0 is not None) and (self.cosmo.Ob0 != 0)
else 0.04897
),
Tcmb0=self.cosmo.Tcmb0.value if self.cosmo.Tcmb0.value > 0 else 2.7255,
Neff=self.cosmo.Neff,
sigma8=0.8102,
ns=0.9660499,
)
colossus_cosmo.setCosmology(cosmo_name="halo_cosmo", **params)