Source code for slsim.Sources.SourceVariability.accretion_disk_reprocessing

import numpy as np
import astropy.constants as const
import astropy.units as u
from astropy import cosmology
from scipy import signal, interpolate
from slsim.Util.astro_util import (
    calculate_gravitational_radius,
    calculate_accretion_disk_response_function,
    downsample_passband,
    bring_passband_to_source_plane,
    convert_passband_to_nm,
    calculate_accretion_disk_emission,
)
from slsim.Sources.SourceVariability.light_curve_interpolation import (
    LightCurveInterpolation,
)
from speclite.filters import (
    load_filter,
)
from slsim.Util.param_util import (
    magnitude_to_amplitude,
    amplitude_to_magnitude,
)


[docs] class AccretionDiskReprocessing(object): def __init__(self, reprocessing_model, **kwargs_agn_model): """Initialize the accretion disk reprocessing object. :param reprocessing_model: keyword for the reprocessing model to be used. Only supports "lamppost" now. :param kwargs_agn_model: keyword arguments for the variability model. Note that these have default values if they are not input. For the lamppost model, the kwargs are: ('r_out'), ('r_resolution'), ('inclination_angle'), ('black_hole_mass_exponent'), ('black_hole_spin'), ('corona_height'), and ('eddington_ratio'). :type kwargs_agn_model: dict """ self.reprocessing_model = reprocessing_model self.kwargs_model = kwargs_agn_model if self.reprocessing_model not in ["lamppost"]: raise ValueError( "Given model is not supported. Currently supported model is lamppost." ) if self.reprocessing_model == "lamppost": self._model = lamppost_model default_lamppost_kwargs = { "r_out": 1000, "corona_height": 10, "r_resolution": 500, "inclination_angle": 0, "black_hole_mass_exponent": 8.0, "black_hole_spin": 0.0, "eddington_ratio": 0.1, } for jj in default_lamppost_kwargs: if jj not in self.kwargs_model: print( "keyword " + jj + " is not defined, using default value of: " + str(default_lamppost_kwargs[jj]) ) self.kwargs_model[jj] = default_lamppost_kwargs[jj] self.time_array = None self.magnitude_array = None if "redshift" in self.kwargs_model: self.redshift = self.kwargs_model["redshift"] else: self.redshift = 0
[docs] def define_new_response_function(self, rest_frame_wavelength_in_nanometers): """Define a response function of the agn accretion disk to the flaring corona in the lamppost geometry. :param rest_frame_wavelength_in_nanometers: The rest frame wavelength (not the observer's frame!) in [nanometers]. :return: An array representing the response function of the accretion disk with time lag spacing of [R_g / c]. """ return self._model(rest_frame_wavelength_in_nanometers, **self.kwargs_model)
[docs] def define_passband_response_function( self, passband, redshift=0, delta_wavelength=10, passband_wavelength_unit=u.angstrom, ): """Calculates the response function of the agn accretion disk to the flaring corona in the lamppost geometry for an input passband. :param passband: Str or List representing passband data. Either from speclite or a user defined passband represented as a list of lists or arrays. The first must be wavelengths, and the second must be the throughput of signature: [wavelength, throughput]. :param redshift: Float value of source redshift. Used to convert wavelengths of the passband into emitted wavelengths. :param delta_wavelength: Desired wavelength spacing in passband in [nanometers]. The passband will be resampled to allow for faster calculations. :param passband_wavelength_unit: Astropy unit representing the wavelength units used to define the original passband. Speclite filters typically use angstroms. :return: An array representing the response function of the accretion disk with time lag spacing of [R_g / c]. """ passband_in_nm = convert_passband_to_nm( passband, wavelength_unit_input=passband_wavelength_unit ) passband_in_source_plane = bring_passband_to_source_plane( passband_in_nm, redshift ) passband_to_use = downsample_passband( passband_in_source_plane, delta_wavelength, wavelength_unit_input=u.nm, wavelength_unit_output=u.nm, ) if len(passband_to_use[0]) > 20: print("Warning, over 20 wavelengths to calculate.") total_weighting = np.sum(passband_to_use[1]) total_response_function = ( self.define_new_response_function(passband_to_use[0][0]) * passband_to_use[1][0] / total_weighting ) for jj in range(len(passband_to_use[0]) - 1): if passband_to_use[1][1 + jj] > 0: response_function = ( self.define_new_response_function(passband_to_use[0][1 + jj]) * passband_to_use[1][1 + jj] / total_weighting ) total_response_function += response_function return total_response_function
[docs] def define_intrinsic_signal(self, time_array=None, magnitude_array=None): """Multi-purpose method to define an intrinsic signal of the AccretionDiskReprocessing() class. Passing in the time_array and magnitude_array arguments will write the signal to the object, while a call with no arguments will return the stored signal. :param time_array: The times which the light curve is sampled at, in [days]. :param magnitude_array: The amplitudes of the signal at each time in time_array. :return: The time_array and magnitude_array associated with the AccretionDiskReprocessing() object's intrinsic (driving) signal. """ if time_array is None and magnitude_array is None: return self.time_array, self.magnitude_array if (time_array is None) != (magnitude_array is None): raise ValueError( "You must provide both the time_array and the magnitude_array." ) if len(time_array) != len(magnitude_array): raise ValueError( "Input time_array and magnitude_array must be of equal length." ) self.time_array = time_array self.magnitude_array = magnitude_array return self.time_array, self.magnitude_array
[docs] def reprocess_signal( self, rest_frame_wavelength_in_nanometers=None, response_function_time_lags=None, response_function_amplitudes=None, ): """Multi-purpose method to calculate the response of the accretion disk to a reprocessing of the intrinsic signal. Passing the rest_frame_wavelength_in_nanometers argument will calculate a response function using define_new_response_function(). Passing in a response function (e.g. from an external source) to the response_function arguments will perform the convolution of the stored intrinsic signal with the chosen response function. :param rest_frame_wavelength_in_nanometers: Int representing the rest frame (not the observer's frame!) wavelength to calculate the response function at, in [nanometers]. :param response_function_time_lags: An optional array representing the time_array associated with the response function with units [days]. Time lags are defined in the rest frame (not the observer's frame!). If None and response_function_amplitudes is given, the time lags will be assumed to be in units [Rg / c]. :param response_function_amplitudes: An array representing the response function at each time lag. The amplitudes may use arbitrary units. :return: The magnitude_array of the reprocessed signal. Note that this is calculated in the rest frame, not the observer's frame! """ if self.time_array is None or self.magnitude_array is None: raise ValueError( "Please provide the intrinsic signal first, using define_intrinsic_signal()." ) gravitational_radius_in_days = ( calculate_gravitational_radius( self.kwargs_model["black_hole_mass_exponent"] ) / const.c.to(u.m / u.day) ).value if rest_frame_wavelength_in_nanometers is not None: if response_function_amplitudes is not None: raise ValueError( "Please provide only a wavelength or only a response function. Not both!" ) response_function = self.define_new_response_function( rest_frame_wavelength_in_nanometers ) length_in_days = int(len(response_function) * gravitational_radius_in_days) time_lag_axis = np.linspace(0, length_in_days, len(response_function)) if rest_frame_wavelength_in_nanometers is None: if response_function_amplitudes is None: raise ValueError("Please provide a wavelength or a response function.") response_function = response_function_amplitudes if response_function_time_lags is not None: if len(response_function_time_lags) != len( response_function_amplitudes ): raise ValueError( "The time lag array and response function array must match in length." ) time_lag_axis = response_function_time_lags length_in_days = int(time_lag_axis[-1] - time_lag_axis[0]) else: length_in_days = int( len(response_function) * gravitational_radius_in_days ) time_lag_axis = np.linspace(0, length_in_days, len(response_function)) if length_in_days == 0: length_in_days += 1 time_lag_axis = np.linspace(0, length_in_days, len(response_function)) interpolation_of_response_function = interpolate.interp1d( time_lag_axis, response_function, bounds_error=False, fill_value=0 ) tau_axis = np.linspace(0, int(time_lag_axis[-1]), int(time_lag_axis[-1]) + 1) interpolated_response_function = interpolation_of_response_function(tau_axis) light_curve = { "MJD": np.array(self.time_array), "ps_mag_intrinsic": np.array(self.magnitude_array), } signal_time_axis = np.linspace( int(self.time_array[0]), int(self.time_array[-1]), int(self.time_array[-1]) - int(self.time_array[0]) + 1, ) intrinsic_signal = LightCurveInterpolation(light_curve) interpolated_signal = intrinsic_signal.magnitude(signal_time_axis) reprocessed_signal = signal.convolve( interpolated_signal, (interpolated_response_function), mode="full" ) # bring the reprocessed signal to the observer frame interpolation_of_reprocessed_signal = interpolate.interp1d( signal_time_axis, reprocessed_signal[: len(signal_time_axis)], bounds_error=False, fill_value=0, ) redshifted_time_axis = signal_time_axis / (1 + self.redshift) reprocessed_signal_in_observed_frame = interpolation_of_reprocessed_signal( redshifted_time_axis ) normalization = np.nansum(interpolated_response_function) if normalization == 0: reprocessed_signal_in_observed_frame = intrinsic_signal.magnitude( redshifted_time_axis ) else: reprocessed_signal_in_observed_frame /= normalization return reprocessed_signal_in_observed_frame[: len(self.time_array)]
[docs] def determine_agn_luminosity_from_known_luminosity( self, known_band, known_magnitude, redshift, mag_zero_point, cosmo=cosmology.FlatLambdaCDM(H0=70, Om0=0.3), band=None, observer_frame_wavelength_in_nm=None, ): """Takes in the known luminosity of the AGN and defines the expected magnitude at other bands or wavelengths based on black body radiation. Speclite bands will be calculated at their effective wavelength. :param i_band_magnitude: Float representing magnitude of i band :param redshift: Float representing redshift of AGN :param cosmo: Astropy cosmology object used to calculate distances :param bands: Float representing a speclite filter :param wavelengths: Float representing wavlength in nm. """ if isinstance(known_band, str): filter_response = load_filter(known_band) else: raise ValueError("only speclite filters are implimented") effective_wavelength = (filter_response.effective_wavelength).to(u.nm) source_plane_wavelength = effective_wavelength / (1 + redshift) luminosity_distance = cosmo.luminosity_distance(redshift) obs_plane_flux = magnitude_to_amplitude(known_magnitude, mag_zero_point) source_plane_flux = luminosity_distance**2 * obs_plane_flux theoretical_flux = calculate_accretion_disk_emission( self.kwargs_model["r_out"], self.kwargs_model["r_resolution"], self.kwargs_model["inclination_angle"], source_plane_wavelength, self.kwargs_model["black_hole_mass_exponent"], self.kwargs_model["black_hole_spin"], self.kwargs_model["eddington_ratio"], ) # normalize flux flux_adjustment_ratio = source_plane_flux / theoretical_flux if band is not None: band_wavelength = (load_filter(band).effective_wavelength).to(u.nm) / ( 1 + redshift ) cur_theoretical_flux = ( calculate_accretion_disk_emission( self.kwargs_model["r_out"], self.kwargs_model["r_resolution"], self.kwargs_model["inclination_angle"], band_wavelength, self.kwargs_model["black_hole_mass_exponent"], self.kwargs_model["black_hole_spin"], self.kwargs_model["eddington_ratio"], ) * flux_adjustment_ratio ) cur_obs_flux = cur_theoretical_flux / luminosity_distance**2 output_magnitude = amplitude_to_magnitude(cur_obs_flux, mag_zero_point) elif observer_frame_wavelength_in_nm is not None: source_plane_wavelength = observer_frame_wavelength_in_nm / (1 + redshift) cur_theoretical_flux = ( calculate_accretion_disk_emission( self.kwargs_model["r_out"], self.kwargs_model["r_resolution"], self.kwargs_model["inclination_angle"], source_plane_wavelength, self.kwargs_model["black_hole_mass_exponent"], self.kwargs_model["black_hole_spin"], self.kwargs_model["eddington_ratio"], ) * flux_adjustment_ratio ) cur_obs_flux = cur_theoretical_flux / luminosity_distance**2 output_magnitude = amplitude_to_magnitude(cur_obs_flux, mag_zero_point) else: raise ValueError("Please define a band or wavelength") return output_magnitude.value
[docs] def lamppost_model( rest_frame_wavelength_in_nanometers, r_out=1000, r_resolution=500, inclination_angle=0, black_hole_mass_exponent=8.0, black_hole_spin=0.0, corona_height=10, eddington_ratio=0.1, ): """Uses astro_util.calculate_accretion_disk_response_function() to define the response of the accretion disk due to a driving signal in the lamppost geometry. :param rest_frame_wavelength_in_nanometers: The wavelength to calculate the response function at, in [nanometers]. This is in the frame of the accretion disk, not the frame of the observer. :param r_out: Maximum radius of the accretion disk in gravitational radii [R_g = GM/c^2], typically 10^3 to 10^5. :param r_resolution: Number of gridpoints to use between 0 and r_out. Final map will be calculated at (2*r_resolution) x (2*r_resolution). :param inclination_angle: The tilt of the accretion disk with respect to the observer [degrees] :param black_hole_mass_exponent: The log of the black hole mass normalized by the mass of the sun; black_hole_mass_exponent = log_10(black_hole_mass / mass_sun). Typical AGN have an exponent ranging from 6 to 10. :param black_hole_spin: The dimensionless spin parameter of the black hole, where the spinless case represents the Schwarzschild black hole. Maximum values of +/- 1. :param corona_height: The height of the corona lamppost in gravitational radii [R_g]. Typical choices range from 0 to 100. :param eddington_ratio: The desired Eddington ratio defined as a fraction of bolometric luminosity / Eddington luminosity. :return: The response function of the accretion disk with respect to variability in the corona. """ return calculate_accretion_disk_response_function( r_out=r_out, r_resolution=r_resolution, inclination_angle=inclination_angle, rest_frame_wavelength_in_nanometers=rest_frame_wavelength_in_nanometers, black_hole_mass_exponent=black_hole_mass_exponent, black_hole_spin=black_hole_spin, corona_height=corona_height, eddington_ratio=eddington_ratio, )