Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add airmass dependent Bandpasses #444

Merged
merged 24 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion config/imsim-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ image:

det_name: "$det_name"

bandpass: { type: RubinBandpass, band: "$band" }
bandpass:
type: RubinBandpass
band: $band
# You can optionally add an airmass here to incorporate into the bandpass. If you leave
# this out, then the fiducial bandpass with airmass = 1.2 will be used.
airmass: { type: OpsimData, field: airmass }
camera: "@output.camera"
det_name: $det_name

wcs:
type: Batoid
Expand Down
184 changes: 171 additions & 13 deletions imsim/bandpass.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,71 @@

import numpy as np
import os
from pathlib import Path
import galsim
from galsim.config import RegisterBandpassType
from astropy.table import Table

__all__ = ['RubinBandpass']


def RubinBandpass(band, logger=None):
class AtmInterpolator:
""" Interpolate atmospheric transmission curves from throughputs repo.
Linear interpolation of log(transmission) independently for every wavelength
does a good job. Extrapolation is done by assuming a constant slope in
log(transmission).

Parameters
----------
Xs : `np.array`
Airmass values at which the transmission curves are tabulated.
arr : `np.array`
Transmission curves at the airmass values. First index is the airmass,
second index is the wavelength.
"""
def __init__(self, Xs, arr):
self.Xs = Xs
self.arr = arr
with np.errstate(all='ignore'):
self.logarr = np.log(arr)
self.log_extrapolation_slope = (
(self.logarr[-1] - self.logarr[-2])/(self.Xs[-1]-self.Xs[-2])
)

def __call__(self, X):
""" Evaluate atmospheric transmission curve at airmass X.

Parameters
----------
X : `float`
Airmass at which to evaluate the transmission curve.

Returns
-------
out : `np.array`
Transmission curve at the requested airmass.
"""
assert X >= 1.0
idx = np.searchsorted(self.Xs, X, side='right')
if idx == len(self.Xs): # extrapolate
frac = (X - self.Xs[idx-1])
out = np.array(self.logarr[idx-1])
out += frac*self.log_extrapolation_slope
else:
frac = (X - self.Xs[idx-1]) / (self.Xs[idx] - self.Xs[idx-1])
out = (1-frac)*np.array(self.logarr[idx-1])
out += frac*self.logarr[idx]
out = np.exp(out)
out[~np.isfinite(out)] = 0.0 # Clean up exp(log(0)) => 0
return out


def RubinBandpass(
band,
airmass=None,
camera=None,
det_name=None,
logger=None
):
"""Return one of the Rubin bandpasses, specified by the single-letter name.

The zeropoint is automatically set to the AB zeropoint normalization.
Expand All @@ -15,24 +74,113 @@ def RubinBandpass(band, logger=None):
----------
band : `str`
The name of the bandpass. One of u,g,r,i,z,y.
airmass : `float`, optional
The airmass at which to evaluate the bandpass. If None, use the
standard X=1.2 bandpass.
camera : `str`, optional
Name of the camera for which to incorporate the detector-specific
quantum efficiency. If None, use the standard hardware bandpass.
det_name : `str`, optional
Name of the detector for which to incorporate the quantum efficiency. If
None, use the standard hardware bandpass.
logger : logging.Logger
If provided, a logger for logging debug statements.
"""
# This uses the baseline throughput files from lsst.sims
sims_dir = os.getenv("RUBIN_SIM_DATA_DIR")
file_name = os.path.join(sims_dir, "throughputs", "baseline", f"total_{band}.dat")
if not os.path.isfile(file_name):
# If the user doesn't have the RUBIN_SIM_DATA_DIR defined, or if they don't have
# the correct files installed, revert to the GalSim files.
logger = galsim.config.LoggerWrapper(logger)
logger.warning("Warning: Using the old bandpass files from GalSim, not lsst.sims")
file_name = f"LSST_{band}.dat"
bp = galsim.Bandpass(file_name, wave_type='nm')
if sum([camera is not None, det_name is not None]) == 1:
raise ValueError("Must provide both camera and det_name if using one.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interested implementation of this. I generally prefer

if (camera is not None) != (det_name is not None):

Not sure if that's more or less obvious as to intent, but I figured I put it out there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like that. Shortened it to if (camera is None) != (det_name is None):.

match camera:
case 'LsstCam':
camera = 'lsstCam'
case 'LsstComCam':
camera = 'comCamSim'
case _:
camera = camera
tp_path = Path(os.getenv("RUBIN_SIM_DATA_DIR")) / "throughputs"
if airmass is None and camera is None:
file_name = tp_path / "baseline" / f"total_{band}.dat"
if not file_name.is_file():
logger = galsim.config.LoggerWrapper(logger)
logger.warning("Warning: Using the old bandpass files from GalSim, not rubin_sim")
file_name = f"LSST_{band}.dat"
bp = galsim.Bandpass(str(file_name), wave_type='nm')
else:
if airmass is None:
airmass = 1.2
# Could be more efficient by only reading in the bracketing airmasses,
# but probably doesn't matter much here.
atmos = {}
for f in (tp_path / "atmos").glob("atmos_??_aerosol.dat"):
X = float(f.name[-14:-12])/10.0
wave, tput = np.genfromtxt(f).T
atmos[X] = wave, tput
Xs = np.array(sorted(atmos.keys()))
arr = np.array([atmos[X][1] for X in Xs])

interpolator = AtmInterpolator(Xs, arr)
tput = interpolator(airmass)
bp_atm = galsim.Bandpass(
galsim.LookupTable(
wave, tput, interpolant='linear'
),
wave_type='nm'
)

if camera is not None:
old_path = Path(os.getenv("OBS_LSST_DATA_DIR"))
old_path = old_path / camera / "transmission_sensor" / det_name.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessarily in the environment? If not, should maybe do something graceful if it isn't. Otherwise I think getenv returns None, and then Path probably takes this to be the current directory, which might lead to something odd happening downstream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out Path(None) raises a TypeError. I think it's okay to panic here; user asked for a specific detector bandpass but we're unable to comply. Reraised a ValueError with a more helpful message.

det_files = list(old_path.glob("*.ecsv"))
if len(det_files) != 1:
raise ValueError(f"Expected 1 detector file, found {len(det_files)}")
det_file = det_files[0]
table = Table.read(det_file)
# Average over amplifiers
amps = np.unique(table['amp_name'])
det_wave = table[table['amp_name'] == amps[0]]['wavelength']
det_tput = table[table['amp_name'] == amps[0]]['efficiency']/100
for amp in amps[1:]:
assert np.all(det_wave == table[table['amp_name'] == amp]['wavelength'])
det_tput += table[table['amp_name'] == amp]['efficiency']/100
det_tput /= len(amps)
bp_det = galsim.Bandpass(
galsim.LookupTable(
det_wave, det_tput, interpolant='linear'
),
wave_type='nm'
)

# Get the rest of the hardware throughput
optics_wave, optics_tput = np.genfromtxt(
tp_path / "baseline" / f"filter_{band}.dat"
).T
for f in ["m1.dat", "m2.dat", "m3.dat", "lens1.dat", "lens2.dat", "lens3.dat"]:
wave, tput = np.genfromtxt(
tp_path / "baseline" / f
).T
assert np.all(wave == optics_wave)
optics_tput *= tput
bp_optics = galsim.Bandpass(
galsim.LookupTable(
optics_wave, optics_tput, interpolant='linear'
),
wave_type='nm'
)
bp_hardware = bp_det * bp_optics
else:
file_name = tp_path / "baseline" / f"hardware_{band}.dat"
wave, hardware_tput = np.genfromtxt(file_name).T
bp_hardware = galsim.Bandpass(
galsim.LookupTable(
wave, hardware_tput, interpolant='linear'
),
wave_type='nm'
)
bp = bp_atm * bp_hardware
bp = bp.truncate(relative_throughput=1.e-3)
bp = bp.thin()
bp = bp.withZeropoint('AB')
return bp


class RubinBandpassBuilder(galsim.config.BandpassBuilder):
"""A class for building a RubinBandpass in the config file
"""
Expand All @@ -48,9 +196,19 @@ def buildBandpass(self, config, base, logger):
the constructed Bandpass object.
"""
req = { 'band' : str }
kwargs, safe = galsim.config.GetAllParams(config, base, req=req)
opt = {
'airmass' : float,
'camera' : str,
'det_name' : str
}
kwargs, safe = galsim.config.GetAllParams(
config, base, req=req, opt=opt
)
kwargs['logger'] = logger
bp = RubinBandpass(**kwargs)

# Also, store the kwargs=None version in the base config.
base['fiducial_bandpass'] = RubinBandpass(band=kwargs['band'], logger=logger)
logger.debug('bandpass = %s', bp)
return bp, safe

Expand Down
37 changes: 35 additions & 2 deletions imsim/photon_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from functools import lru_cache
import numpy as np

import galsim
import batoid
from galsim import PhotonArray, PhotonOp, GaussianDeviate
from galsim import Bandpass, PhotonArray, PhotonOp, GaussianDeviate
from galsim.config import RegisterPhotonOpType, PhotonOpBuilder, GetAllParams
from galsim.config import BuildBandpass, GalSimConfigError
from galsim.celestial import CelestialCoord
from galsim.config.util import get_cls_params
from coord import Angle
Expand Down Expand Up @@ -481,3 +481,36 @@ def ray_vector_to_photon_array(
out.dxdz, out.dydz = (out.dxdz.ravel(), out.dydz.ravel())
out.flux[ray_vector.vignetted] = 0.0
return out


class BandpassRatio(PhotonOp):
"""Photon operator that reweights photon fluxes to effect
a specified bandpass from photons initially sampled from
a different bandpass.
"""
def __init__(
self,
target_bandpass: Bandpass,
initial_bandpass: Bandpass
):
self.target = target_bandpass
self.initial = initial_bandpass
self.ratio = self.target / self.initial

def applyTo(self, photon_array, local_wcs=None, rng=None):
photon_array.flux *= self.ratio(photon_array.wavelength)


class BandpassRatioBuilder(PhotonOpBuilder):
def buildPhotonOp(self, config, base, logger):
if 'target_bandpass' not in config:
raise GalSimConfigError("target_bandpass is required for BandpassRatio")
if 'initial_bandpass' not in config:
raise GalSimConfigError("initial_bandpass is required for BandpassRatio")
kwargs = {}
kwargs['target_bandpass'] = BuildBandpass(config, 'target_bandpass', base, logger)[0]
kwargs['initial_bandpass'] = BuildBandpass(config, 'initial_bandpass', base, logger)[0]
return BandpassRatio(**kwargs)


RegisterPhotonOpType('BandpassRatio', BandpassRatioBuilder())
Loading
Loading