Source code for slsim.Sources.SourcePopulation.galaxies

import numpy as np
import numpy.random as random
from slsim.Lenses.selection import object_cut
from slsim.Util import param_util
from slsim.Sources.SourcePopulation.source_pop_base import SourcePopBase
from astropy.table import Column, vstack
from slsim.Util.param_util import (
    average_angular_size,
    axis_ratio,
    eccentricity,
    downsample_galaxies,
    galaxy_size_redshift_evolution,
    galaxy_size,
)
from astropy import units as u
from slsim.Sources.source import Source
import os


# TODO: Use type to determine galaxy_list type
[docs] class Galaxies(SourcePopBase): """Class describing elliptical galaxies.""" def __init__( self, galaxy_list, kwargs_cut, cosmo, sky_area, list_type="astropy_table", catalog_type=None, downsample_to_dc2=False, source_size="Bernardi", extended_source_type="single_sersic", extended_source_kwargs={}, ): """ :param galaxy_list: An astropy table with galaxy parameters. :type galaxy_list: astropy Table object. :param kwargs_cut: cuts in parameters: band, band_mag, z_min, z_max :type kwargs_cut: dict :param cosmo: astropy.cosmology instance :param sky_area: Sky area over which galaxies are sampled. Must be in units of solid angle. :type sky_area: `~astropy.units.Quantity` :param list_type: format of the source catalog file. Currently, it supports a single astropy table or a list of astropy tables. :param catalog_type: type of the catalog. If someone wants to use scotch catalog, they need to specify it. :type catalog_type: str. eg: "scotch" or None :param downsample_to_dc2: Boolean. If True, downsamples the given galaxy population at redshift greater than 1.5 to DC2 galaxy population. :param source_size: If "Bernardi", computes galaxy size using g-band magnitude otherwise rescales skypy source size to Shibuya et al. (2015): https://iopscience.iop.org/article/10.1088/0067-0049/219/2/15/pdf :param extended_source_type: Keyword to specify type of the extended source. Supported extended source types are "single_sersic", "double_sersic", "interpolated". :type extended_source_type: str :param extended_source_kwargs: dictionary of keyword arguments for ExtendedSource. Please see documentation of ExtendedSource() class as well as specific extended source classes. """ super().__init__(cosmo=cosmo, sky_area=sky_area) self.extendedsource_kwargs = extended_source_kwargs self.light_profile = extended_source_type if downsample_to_dc2 is True: samp1, samp2, samp3, samp4, samp5, samp6 = down_sample_to_dc2( galaxy_pop=galaxy_list, sky_area=sky_area ) samp_low = galaxy_list[galaxy_list["z"] <= 2] galaxy_list = vstack([samp_low, samp1, samp2, samp3, samp4, samp5, samp6]) """slsim_sample_3_35, slsim_sample_35_4, slsim_sample_4_45, slsim_sample_45_5])""" self.n = len(galaxy_list) # add missing keywords in astropy.Table object if list_type == "astropy_table": galaxy_list = convert_to_slsim_convention( galaxy_catalog=galaxy_list, light_profile=self.light_profile, input_catalog_type=catalog_type, source_size=source_size, cosmo=cosmo, ) column_names_update = galaxy_list.colnames if self.light_profile in ["single_sersic", "catalog_source"]: if "e1" not in column_names_update or "e2" not in column_names_update: galaxy_list["e1"] = -np.ones(self.n) galaxy_list["e2"] = -np.ones(self.n) if "n_sersic" not in column_names_update: galaxy_list["n_sersic"] = -np.ones(self.n) if self.light_profile == "double_sersic": # these are the name convention for double sersic profiles. if ( "n_sersic_0" not in column_names_update or "n_sersic_1" not in column_names_update ): galaxy_list["n_sersic_0"] = -np.ones(self.n) galaxy_list["n_sersic_1"] = -np.ones(self.n) if ( "e0_1" not in column_names_update or "e0_2" not in column_names_update ): galaxy_list["e0_1"] = -np.ones(self.n) galaxy_list["e0_2"] = -np.ones(self.n) if ( "e1_1" not in column_names_update or "e1_2" not in column_names_update ): galaxy_list["e1_1"] = -np.ones(self.n) galaxy_list["e1_2"] = -np.ones(self.n) if ( "angular_size_0" not in column_names_update or "angular_size_1" not in column_names_update ): galaxy_list["angular_size_0"] = -np.ones(self.n) galaxy_list["angular_size_1"] = -np.ones(self.n) else: column_names = galaxy_list[0].colnames if "ellipticity" not in column_names: raise ValueError("ellipticity is missing in galaxy_list columns.") if "e1" not in column_names or "e2" not in column_names: for table in galaxy_list: new_column_length = len(table) new_column_1 = Column([-1.0] * new_column_length, name="e1") new_column_2 = Column([-1.0] * new_column_length, name="e2") table.add_columns([new_column_1, new_column_2]) if "n_sersic" not in column_names: for table in galaxy_list: new_column_length = len(table) new_column = Column([-1] * new_column_length, name="n_sersic") table.add_column(new_column) # make cuts self._galaxy_select = object_cut(galaxy_list, list_type=list_type, **kwargs_cut) self._num_select = len(self._galaxy_select) self.list_type = list_type self._full_galaxy_list = galaxy_list @property def source_number(self): """Number of sources registered (within given area on the sky) :return: number of sources """ return self.n @property def source_number_selected(self): """Number of sources selected (within given area on the sky) :return: number of sources passing the selection criteria """ return self._num_select
[docs] def draw_source_dict(self, z_max=None, z_min=None, galaxy_index=None): """Choose source at random. :param z_max: maximum redshift limit for the galaxy to be drawn. If no galaxy is found for this limit, None will be returned. :param z_min: minimum redshift limit for the galaxy to be drawn. If no galaxy is found for this limit, None will be returned. :param galaxy_index: index of galaxy to pic (if provided) :return: dictionary of source """ if galaxy_index is not None: galaxy = self._full_galaxy_list[galaxy_index] elif z_max is not None or z_min is not None: if z_max is None: z_max = 1100 if z_min is None: z_min = 0 filtered_galaxies = self._galaxy_select[ (self._galaxy_select["z"] < z_max) & (z_min < self._galaxy_select["z"]) ] if len(filtered_galaxies) == 0: return None else: index = random.randint(0, len(filtered_galaxies)) galaxy = filtered_galaxies[index] else: index = random.randint(0, self._num_select) galaxy = self._galaxy_select[index] if "a_rot" in galaxy.colnames: phi_rot = galaxy["a_rot"] else: phi_rot = None if self.light_profile in ["single_sersic", "catalog_source"]: if "ellipticity" in galaxy.colnames: if galaxy["e1"] == -1 or galaxy["e2"] == -1: e1, e2 = galaxy_projected_eccentricity( float(galaxy["ellipticity"]), rotation_angle=phi_rot ) galaxy["e1"] = e1 galaxy["e2"] = e2 else: raise ValueError("ellipticity is missing in galaxy_list columns.") if galaxy["n_sersic"] == -1: galaxy["n_sersic"] = 1 # TODO make a better estimate with scatter elif self.light_profile == "double_sersic": if galaxy["e0_1"] == -1 or galaxy["e0_2"] == -1: if "ellipticity0" in galaxy.colnames: e0_1, e0_2 = galaxy_projected_eccentricity( float(galaxy["ellipticity0"]), rotation_angle=phi_rot ) galaxy["e0_1"] = e0_1 galaxy["e0_2"] = e0_2 elif "a0" in galaxy.colnames and "b0" in galaxy.colnames: axis_ratio_0 = axis_ratio(a=galaxy["a0"], b=galaxy["b0"]) ellip_0 = eccentricity(q=axis_ratio_0) e0_1, e0_2 = galaxy_projected_eccentricity( float(ellip_0), rotation_angle=phi_rot ) galaxy["e0_1"] = e0_1 galaxy["e0_2"] = e0_2 else: raise ValueError( "ellipticity or semi-major and semi-minor axis are missing for" " the first light profile in galaxy_list columns" ) if galaxy["e1_1"] == -1 or galaxy["e1_2"] == -1: if "ellipticity1" in galaxy.colnames: e1_1, e1_2 = galaxy_projected_eccentricity( float(galaxy["ellipticity1"]), rotation_angle=phi_rot ) galaxy["e1_1"] = e1_1 galaxy["e1_2"] = e1_2 elif "a1" in galaxy.colnames and "b1" in galaxy.colnames: axis_ratio_1 = axis_ratio(a=galaxy["a1"], b=galaxy["b1"]) ellip_1 = eccentricity(q=axis_ratio_1) e1_1, e1_2 = galaxy_projected_eccentricity(float(ellip_1)) galaxy["e1_1"] = e1_1 galaxy["e1_2"] = e1_2 else: raise ValueError( "ellipticity or semi-major and semi-minor axis are missing for" " the second light profile in galaxy_list columns" ) if galaxy["angular_size_0"] == -1 or galaxy["angular_size_1"] == -1: if "a0" in galaxy.colnames and "b0" in galaxy.colnames: galaxy["angular_size_0"] = average_angular_size( a=galaxy["a0"], b=galaxy["b0"] ) else: raise ValueError( "semi-major and semi-minor axis are missing for the first light" " profile in galaxy_list columns" ) if "a1" in galaxy.colnames and "b1" in galaxy.colnames: galaxy["angular_size_1"] = average_angular_size( a=galaxy["a1"], b=galaxy["b1"] ) else: raise ValueError( "semi-major and semi-minor axis are missing for the second" " light profile in galaxy_list columns" ) if galaxy["n_sersic_0"] == -1 or galaxy["n_sersic_1"] == -1: galaxy["n_sersic_0"] = 1 galaxy["n_sersic_1"] = 4 else: raise ValueError( "Provided number of light profiles is not supported. It should be" " either 'single or 'double' " ) return galaxy
[docs] def draw_source(self, z_max=None, z_min=None, galaxy_index=None): """Choose source at random. :param z_max: maximum redshift limit for the galaxy to be drawn. If no galaxy is found for this limit, None will be returned. :param z_min: minimum redshift limit for the galaxy to be drawn. If no galaxy is found for this limit, None will be returned. :param galaxy_index: index of galaxy to pic (if provided) :return: instance of Source class """ galaxy = self.draw_source_dict( z_max=z_max, z_min=z_min, galaxy_index=galaxy_index ) if galaxy is None: return None source_class = Source( cosmo=self._cosmo, extended_source_type=self.light_profile, **galaxy, **self.extendedsource_kwargs ) return source_class
[docs] def draw_galaxies(self, area, z_max=None): """Draw galaxies within a specified area and redshift limit. :param area (astropy.units.Quantity): Area in which to draw galaxies (in square arcseconds). :param z_max: Maximum redshift for the galaxies. If None, no redshift cut is applied. :return: List of drawn galaxy instances. """ total_sources = self.source_number_selected pop_sky_area_arcsec2 = self.sky_area.to_value("arcsec2") area_arcsec2 = area.to_value("arcsec2") mean_sources = (total_sources / pop_sky_area_arcsec2) * area_arcsec2 # draw from Poisson Distribution number_of_sources = np.random.poisson(lam=mean_sources) galaxies_list = [] for _ in range(number_of_sources): galaxy = self.draw_source(z_max=z_max) if galaxy is not None: galaxy.update_center(area=area_arcsec2) galaxies_list.append(galaxy) return galaxies_list
[docs] def galaxy_projected_eccentricity(ellipticity, rotation_angle=None): """Projected eccentricity of elliptical galaxies as a function of other deflector parameters. :param ellipticity: eccentricity amplitude :type ellipticity: float [0,1) :param rotation_angle: rotation angle of the major axis of elliptical galaxy in radian. The reference of this rotation angle is +Ra axis i.e towards the East direction and it goes from East to North. If it is not provided, it will be drawn randomly. :return: e1, e2 eccentricity components """ if rotation_angle is None: phi = np.random.uniform(0, np.pi) else: phi = rotation_angle e = param_util.epsilon2e(ellipticity) e1 = e * np.cos(2 * phi) e2 = e * np.sin(2 * phi) return e1, e2
[docs] def convert_to_slsim_convention( galaxy_catalog, light_profile, input_catalog_type="skypy", source_size=None, cosmo=None, ): """This function converts scotch/catalog to slsim conventions. In slsim, sersic index are either n_sersic or (n_sersic_0 and n_sersic_1). Ellipticity are either ellipticity or (ellipticity0 and ellipticity1). These kewords can be read by Galaxies class. This function is written to convert scotch catalog to slsim convension and to change unit of angular size in skypy source catalog to arcsec. :param galaxy_catalog: An astropy table of galaxy catalog in other conventions. :type galaxy_catalog: astropy Table object. :param light_profile: keyword for number of sersic profile to use in source light model. accepted keywords: "single_sersic", "double_sersic". :param input_catalog_type: type of the catalog. If someone wants to use scotch catalog or skypy catalog, they need to specify it. :type input_catalog_type: str. eg: "scotch" or "skypy". :param source_size: Keyword for source size convention. If "Bernardi", computes galaxy size using g-band magnitude otherwise rescales skypy source size to Shibuya et al.(2015): ht tps://iopscience.iop.org/article/10.1088/0067-0049/219/2/15/pdf :param cosmo: astropy.cosmology instance :return: galaxy catalog in slsim convension. """ galaxy_catalog = galaxy_catalog.copy() column_names = galaxy_catalog.colnames for col_name in column_names: if "_host" in col_name: # Remove '_host' from the column name new_col_name = col_name.replace("_host", "") # Rename the column galaxy_catalog.rename_column(col_name, new_col_name) if light_profile == "double_sersic": if "n0" in column_names or "n1" in column_names: galaxy_catalog.rename_column("n0", "n_sersic_0") galaxy_catalog.rename_column("n1", "n_sersic_1") if "e0" in column_names or "e1" in column_names: galaxy_catalog.rename_column("e0", "ellipticity0") galaxy_catalog.rename_column("e1", "ellipticity1") if "angular_size0" in column_names or "angular_size1" in column_names: galaxy_catalog.rename_column("angular_size0", "angular_size_0") galaxy_catalog.rename_column("angular_size1", "angular_size_1") if light_profile == "single_sersic": if "e" in column_names: galaxy_catalog.rename_column("e", "ellipticity") if input_catalog_type == "scotch": galaxy_catalog["a_rot"] = np.deg2rad(galaxy_catalog["a_rot"]) if input_catalog_type == "skypy": if source_size == "Bernardi": # compute angular size from g-band magnitude. source_size = galaxy_size( galaxy_catalog["mag_g"], galaxy_catalog["z"], cosmo ) physical_size = source_size[0] * u.kpc angular_size = source_size[1].value * u.arcsec else: # rescales skypy source size to Shibuya et al. (2015). # compute the rescaled physical size. The resulted value is devided by 2.5 to # match the best-fit model given in https://iopscience.iop.org/article/10.1088/0067-0049/219/2/15/pdf rescaled_physical_size = ( galaxy_catalog["physical_size"] * galaxy_size_redshift_evolution(galaxy_catalog["z"]) / 2.5 ) # compute the rescaled angular size rescaled_angular_size = ( rescaled_physical_size ) / cosmo.angular_diameter_distance(galaxy_catalog["z"]).to(u.kpc) physical_size = rescaled_physical_size angular_size = (rescaled_angular_size * u.rad).to(u.arcsec) galaxy_catalog["physical_size"] = physical_size galaxy_catalog["angular_size"] = angular_size return galaxy_catalog
[docs] def down_sample_to_dc2(galaxy_pop, sky_area): """Downsamples given galaxy pop above redshift 1.5 to DC2 galaxy population. :param galaxy_pop: Astropy table of galaxy population. :param sky_area: Sky area over which galaxies are sampled. Must be in units of solid angle and it should be astropy unit object. :return: Astropy tables of downsampled galaxy population in different bins. Redshift bins for returned populations are: (2-2.5), (2.5-3), (3-3.5), (3.5-4), (4-4.5), (4.5-5) """ path = os.path.dirname(__file__) new_path = path[: path.rfind("slsim/")] module_path = os.path.dirname(new_path) # path1 = os.path.join( # module_path, "data/DC2_data/dc2_galaxy_count_1.5_2.npy" # ) path2 = os.path.join(module_path, "data/DC2_data/dc2_galaxy_count_2_2.5.npy") path3 = os.path.join(module_path, "data/DC2_data/dc2_galaxy_count_2.5_3.npy") # DC2 galaxy counts in 3 different redshift bins: (2-2.5), (2.5-3). Beyond 3 # , we use the same count as 3rd bin because DC2 only reach up to redshift 3. # dN1 = np.load(path1) dN2 = int(sky_area.value) * np.load(path2) dN3 = int(sky_area.value) * np.load(path3) # M_min1=21.531229 # M_max1=29.999994 # dM1=0.2920263882341056 M_min2 = 22.084414 M_max2 = 29.999998 dM2 = 0.2729511918692753 M_min3 = 22.654068 M_max3 = 29.999996 dM3 = 0.25330786869443694 # slsim_sample_15_2=downsample_galaxies(galaxy_pop, dN1, dM1, M_min1, # M_max1, 1.5, 2) slsim_sample_2_25 = downsample_galaxies( galaxy_pop, dN2, dM2, M_min2, M_max2, 2, 2.5 ) slsim_sample_25_3 = downsample_galaxies( galaxy_pop, dN3, dM3, M_min3, M_max3, 2.5, 3 ) slsim_sample_3_35 = downsample_galaxies( galaxy_pop, dN3, dM3, M_min3, M_max3, 3, 3.5 ) slsim_sample_35_4 = downsample_galaxies( galaxy_pop, dN3, dM3, M_min3, M_max3, 3.5, 4 ) slsim_sample_4_45 = downsample_galaxies( galaxy_pop, dN3, dM3, M_min3, M_max3, 4, 4.5 ) slsim_sample_45_5 = downsample_galaxies( galaxy_pop, dN3, dM3, M_min3, M_max3, 4.5, 5 ) return ( slsim_sample_2_25, slsim_sample_25_3, slsim_sample_3_35, slsim_sample_35_4, slsim_sample_4_45, slsim_sample_45_5, )