Source code for slsim.LsstSciencePipeline.util_lsst

import numpy as np
from astropy.table import Column
from slsim.ImageSimulation.image_simulation import lens_image_series, lens_image
from slsim.Util.param_util import (
    fits_append_table,
    convert_mjd_to_days,
    transient_event_time_mjd,
    flux_error_to_magnitude_error,
    magnitude_to_amplitude,
)
import os


[docs] def variable_lens_injection( lens_class, band, num_pix, transform_pix2angle, exposure_data ): """Injects variable lens to the dp0 time series data. :param lens_class: Lens() object :param band: imaging band :param num_pix: number of pixels per axis :param transform_pix2angle: transformation matrix (2x2) of pixels into coordinate displacements :param exposure_data: An astropy table of exposure data. It must contain calexp images or generated noisy background image (column name should be "time_series_images", these images are single exposure images of the same part of the sky at different time), magnitude zero point (column name should be "zero_point", these are zero point magnitudes for each single exposure images in time series image) , psf kernel for each exposure (column name should be "psf_kernel", these are pixel psf kernel for each single exposure images in time series image), exposure time (column name should be "expo_time", these are exposure time for each single exposure images in time series images), observation time (column name should be "obs_time", these are observation time in days for each single exposure images in time series images) :return: Astropy table of injected lenses and exposure information of dp0 data """ ## chose transient starting point randomly. start_point_mjd_time = transient_event_time_mjd( min(exposure_data["obs_time"]), max(exposure_data["obs_time"]) ) ## Convert mjd observation time to days. We should do this because lightcurves are # in the unit of days. observation_time = convert_mjd_to_days( exposure_data["obs_time"], start_point_mjd_time ) lens_images = lens_image_series( lens_class, band=band, mag_zero_point=exposure_data["zero_point"], num_pix=num_pix, psf_kernel=exposure_data["psf_kernel"], transform_pix2angle=transform_pix2angle, exposure_time=exposure_data["expo_time"], t_obs=observation_time, ) final_image = [] for i in range(len(exposure_data["obs_time"])): final_image.append(exposure_data["time_series_images"][i] + lens_images[i]) lens_col = Column(name="lens", data=lens_images) final_image_col = Column(name="injected_lens", data=final_image) if "lens" in exposure_data.colnames: exposure_data.replace_column("lens", lens_col) else: exposure_data.add_column(lens_col) if "injected_lens" in exposure_data.colnames: exposure_data.replace_column("injected_lens", final_image_col) else: exposure_data.add_column(final_image_col) return exposure_data
[docs] def multiple_variable_lens_injection( lens_class_list, band, num_pix, transform_matrices_list, exposure_data_list, output_file=None, ): """Injects multiple variable lenses to multiple dp0 time series data. :param lens_class_list: list of Lens() object :param band: imaging band :param num_pix: number of pixels per axis :param transform_matrices_list: list of transformation matrix (2x2) of pixels into coordinate displacements for each exposure :param exposure_data: list of astropy table of exposure data for each set of time series images. It must contain calexp images or generated noisy background image (column name should be "time_series_images", these images are single exposure images of the same part of the sky at different time), magnitude zero point (column name should be "zero_point", these are zero point magnitudes for each single exposure images in time series image) , psf kernel for each exposure (column name should be "psf_kernel", these are pixel psf kernel for each single exposure images in time series image), exposure time (column name should be "expo_time", these are exposure time for each single exposure images in time series images), observation time (column name should be "obs_time", these are observation time in days for each single exposure images in time series images) :param output_file: path to the output FITS file where data will be saved :return: list of astropy table of injected lenses and exposure information of dp0 data for each time series lenses. If output_file path is provided, it saves list of these astropy table in fits file with the given name. """ final_images_catalog = [] for lens_class, transform_matrices, expo_data in zip( lens_class_list, transform_matrices_list, exposure_data_list ): variable_injected_image = variable_lens_injection( lens_class, band=band, num_pix=num_pix, transform_pix2angle=transform_matrices, exposure_data=expo_data, ) if output_file is None: final_images_catalog.append(variable_injected_image) else: first_table = not os.path.exists(output_file) if first_table: variable_injected_image.write(output_file, overwrite=True) first_table = False else: fits_append_table(output_file, variable_injected_image) if len(final_images_catalog) > 1: return final_images_catalog return None
[docs] def opsim_variable_lens_injection( lens_class, bands, num_pix, transform_pix2angle, exposure_data ): """Injects variable lens to the OpSim time series data (1 object). :param lens_class: Lens() object :param bands: list of imaging bands of interest :param num_pix: number of pixels per axis :param transform_pix2angle: transformation matrix (2x2) of pixels into coordinate displacements :param exposure_data: An astropy table of exposure data. One entry of table_list_data generated from the opsim_time_series_images_data function. It must contain the rms of background noise fluctuations (column name should be "bkg_noise"), psf kernel for each exposure (column name should be "psf_kernel", these are pixel psf kernel for each single exposure images in time series image), observation time (column name should be "obs_time", these are observation time in days for each single exposure images in time series images), exposure time (column name should be "expo_time", these are exposure time for each single exposure images in time series images), magnitude zero point (column name should be "zero_point", these are zero point magnitudes for each single exposure images in time series image), coordinates of the object (column name should be "calexp_center"), these are the coordinates in (ra, dec), and the band in which the observation is taken (column name should be "band"). :return: Astropy table of injected lenses and exposure information of dp0 data. """ ## chose transient starting point randomly. start_point_mjd_time = transient_event_time_mjd( min(exposure_data["obs_time"]), max(exposure_data["obs_time"]) ) final_image = [] for obs in range(len(exposure_data["obs_time"])): exposure_data_obs = exposure_data[obs] observation_time = convert_mjd_to_days( exposure_data_obs["obs_time"], start_point_mjd_time ) if exposure_data_obs["band"] not in bands: continue if "bkg_noise" in exposure_data_obs.keys(): std_gaussian_noise = exposure_data_obs["bkg_noise"] else: std_gaussian_noise = None lens_images = lens_image( lens_class, band=exposure_data_obs["band"], mag_zero_point=exposure_data_obs["zero_point"], num_pix=num_pix, psf_kernel=exposure_data_obs["psf_kernel"], transform_pix2angle=transform_pix2angle, exposure_time=exposure_data_obs["expo_time"], t_obs=observation_time, std_gaussian_noise=std_gaussian_noise, ) final_image.append(lens_images) lens_col = Column(name="lens", data=final_image) final_image_col = Column(name="injected_lens", data=final_image) # Create a new Table with only the bands of interest expo_bands = np.array([b for b in exposure_data["band"]]) mask = np.isin(expo_bands, bands) exposure_data_new = exposure_data[mask] # if len(exposure_data_new) > 0: exposure_data_new.add_columns([lens_col, final_image_col]) return exposure_data_new
[docs] def transient_data_with_cadence( lens_class, exposure_data, noise=True, symmetric=False, pix_scale=0.2, random_seed=None, ): """Puts lensed transient into the provided cadence. For LSST, this will be cadence from opsim. :param lens_class: Lens() object :param exposure_data: An astropy table of exposure data. One entry of table_list_data generated from the opsim_time_series_images_data function. It must contain the rms of background noise fluctuations (column name should be "bkg_noise"), psf kernel for each exposure (column name should be "psf_kernel", these are pixel psf kernel for each single exposure images in time series image), observation time (column name should be "obs_time", these are observation time in days for each single exposure images in time series images), exposure time (column name should be "expo_time", these are exposure time for each single exposure images in time series images), magnitude zero point (column name should be "zero_point", these are zero point magnitudes for each single exposure images in time series image), coordinates of the object (column name should be "calexp_center"), these are the coordinates in (ra, dec), and the band in which the observation is taken (column name should be "band"). :param noise: Boolean. If True, a gaussian noise is added to the lightcurve flux. :param symmetric: Boolean. If True, a symmetric error on magnitude is provided. :param pixscale: pixel scale of the observing instrument. :return: Astropy table of lightcurve and exposure information of dp0 data. The table contains: Observation time in days, lens id, magnitude of each image and associated errors, lens image. If the lens system produces fewer than four images, the missing magnitudes and errors are filled with -1. """ copied_exposure_data = exposure_data.copy() min_mjd = min(copied_exposure_data["obs_time"]) max_mjd = max(copied_exposure_data["obs_time"]) min_lc_time, max_lc_time = ( min(lens_class.source(0)._source._lightcurve_time), max(lens_class.source(0)._source._lightcurve_time), ) start_mjd_time = transient_event_time_mjd(min_mjd, max_mjd, random_seed=random_seed) copied_exposure_data["obs_time_in_days"] = convert_mjd_to_days( copied_exposure_data["obs_time"], start_mjd_time ) copied_exposure_data = copied_exposure_data[ (copied_exposure_data["obs_time_in_days"] >= min_lc_time - 50) & (copied_exposure_data["obs_time_in_days"] <= max_lc_time) ] ra_dec = copied_exposure_data["calexp_center"][0] lens_id = lens_class.generate_id(ra=ra_dec[0].degree, dec=ra_dec[1].degree) num_images = lens_class.image_number[0] mag_images = {f"mag_image_{i+1}": [] for i in range(num_images)} mag_errors = { f"mag_error_image_{i+1}_{err}": [] for i in range(num_images) for err in ["low", "high"] } for exposure in copied_exposure_data: obs_time = exposure["obs_time_in_days"] magnitudes = lens_class.point_source_magnitude( band=exposure["band"], lensed=True, time=obs_time )[0] # Compute noise bkg_noise, fwhm = exposure["bkg_noise"], exposure["psf_fwhm"] N_pix = np.pi * (2 * fwhm) ** 2 / (pix_scale**2) sigma_noise_total = bkg_noise / np.sqrt(N_pix) for i in range(num_images): amplitude = magnitude_to_amplitude(magnitudes[i], exposure["zero_point"]) total_counts = amplitude * exposure["expo_time"] poisson_noise = np.sqrt(total_counts) flux_err = np.sqrt(sigma_noise_total**2 + poisson_noise**2) mag_realiz, mag_err_low, mag_err_high = flux_error_to_magnitude_error( amplitude, flux_err, exposure["zero_point"], noise=noise, symmetric=symmetric, ) mag_images[f"mag_image_{i+1}"].append(mag_realiz) mag_errors[f"mag_error_image_{i+1}_low"].append(mag_err_low) mag_errors[f"mag_error_image_{i+1}_high"].append(mag_err_high) # Fill missing values for systems with <4 images for i in range(num_images, 4): mag_images[f"mag_image_{i+1}"] = [-1.0] * len(copied_exposure_data) mag_errors[f"mag_error_image_{i+1}_low"] = [-1.0] * len(copied_exposure_data) mag_errors[f"mag_error_image_{i+1}_high"] = [-1.0] * len(copied_exposure_data) for key in list(mag_images.keys()) + list(mag_errors.keys()): mag_images_or_errors = mag_images if key in mag_images else mag_errors mag_images_or_errors[key] = np.array( mag_images_or_errors[key], dtype=float ).reshape(-1) # Create and add columns to the table copied_exposure_data.add_columns( [Column(name="lens_id", data=[lens_id] * len(copied_exposure_data))] + [Column(name=name, data=data) for name, data in mag_images.items()] + [Column(name=name, data=data) for name, data in mag_errors.items()] ) return copied_exposure_data
[docs] def extract_lightcurves_in_different_bands(transient_data_table): """Extract lightcurves and images in different bands from the given catalog. This a function written to read data table from transient_data_with_cadence function above. :param transient_data_table: An astropy table containing lightcurves in a certain cadence. This table must contain magnitude and corresponding errors. The column name for magnitude should be "mag_image_n", and column names for the error should be "mag_error_image_n_low" and "mag_error_image_n_high", where n could be 1, 2, 3, or 4. :return: A dictionary of magnitudes, associated errors, observation times, structured by band. """ table = transient_data_table # Extract unique bands bands = np.unique(table["band"]) # Initialize dictionaries to hold magnitudes, errors, observation times, and # optionally image lists magnitudes = {f"mag_image_{i}": {band: [] for band in bands} for i in range(1, 5)} errors_low = { f"mag_error_image_{i}_low": {band: [] for band in bands} for i in range(1, 5) } errors_high = { f"mag_error_image_{i}_high": {band: [] for band in bands} for i in range(1, 5) } obs_time = {band: [] for band in bands} # Populate dictionaries with magnitudes, errors, and optionally image lists # corresponding to each band for band in bands: # Filter rows that correspond to the current band band_data = table[table["band"] == band] obs_time[band] = band_data["obs_time"].tolist() for i in range(1, 5): mag_col = f"mag_image_{i}" err_low_col = f"mag_error_image_{i}_low" err_high_col = f"mag_error_image_{i}_high" if mag_col in band_data.colnames and np.any(band_data[mag_col] != -1): # Only proceed with the column if the magnitude is not -1 magnitudes[mag_col][band] = band_data[mag_col].tolist() errors_low[err_low_col][band] = band_data[err_low_col].tolist() errors_high[err_high_col][band] = band_data[err_high_col].tolist() result = { "magnitudes": magnitudes, "errors_low": errors_low, "errors_high": errors_high, "obs_time": obs_time, } return result