"""
Main tools to drive exposure level processing.
Functions
---------
wcs_from_config
Extracts a WCS from the configuration file.
initializationstep
Creates and initializes L2 data.
saturation_check
Flags saturated pixels in a 3D cube.
subtract_dark_current
Subtracts dark current in a 3D cube.
repackage_wcs
Packages a WCS so that it can be handed to romanisim.
calibrateimage
L1->L2 driver.
"""
import sys
import warnings
from copy import deepcopy
import asdf
# not actually doing a simulation but needed to pass around the WCS types
import galsim
import numpy as np
import yaml
from astropy import units as u
from astropy.io import fits
from roman_datamodels.dqflags import pixel
from romanisim import image as rimage
from romanisim import persistence as rip
from romanisim import wcs as riwcs
# stcal imports
from stcal.saturation.saturation import flag_saturated_pixels
from .. import pars
from ..utils import (
coordutils,
fitting,
flatutils,
ipc_linearity,
maskhandling,
processlog,
reference_subtraction,
sky,
)
# local imports
from . import oututils
### function definitions below here
[docs]
def wcs_from_config(config):
"""
Gets a WCS object from the configuration.
Currently supports FITS headers imported from a simulation.
Parameters
----------
config : dict
Configuration dictionary (usually imported from YAML).
Returns
-------
astropy.io.fits.header.Header
The WCS as a FITS header.
"""
if "FITSWCS" in config:
with open(config["FITSWCS"]) as f:
return fits.Header.fromstring(f.read())
# if no WCS was found, just return None (we'll deal with this later)
return None
[docs]
def initializationstep(config, caldir, mylog):
"""
Initialization step.
Parameters
----------
config : dict
Configuration dictionary (usually imported from YAML).
caldir : dict
Locations of calibration files.
mylog : romanimpreprocess.utils.processlog.ProcessLog
Processing log.
Returns
-------
data : np.array
3D array, science cube loaded from L1
rdq : np.array
3D array, flags (ramp data quality)
pdq : np.array
2D array, flags (pixel data quality)
meta : dict
Other metadata (right now: frame_time and read_pattern)
l1meta : dict
Metadata stright from the L1 file (copy of ASDF subtree)
amp33 : np.array
3D array, reference output loaded from L1
"""
with asdf.open(config["IN"]) as f:
data = np.copy(f["roman"]["data"].astype(np.float32))
amp33 = np.copy(f["roman"]["amp33"].astype(np.float32))
rdq = np.zeros(np.shape(data), dtype=np.uint32)
# guide windows
# in DCL testing the rows containing the guide windows were affected
# we expect also that the pixels with IPC coupling to the guide window
# are affected at some level so I'm flagging those too.
guide_star = f["roman"]["meta"]["guide_star"]
xstart = int(guide_star["window_xstart"])
xstop = int(guide_star["window_xstop"])
ystart = int(guide_star["window_ystart"])
ystop = int(guide_star["window_ystop"])
mylog.append(f"guide window: x={xstart:d}:{xstop:d}, y={ystart:d}:{ystop:d}\n")
# if the metadata contain a real window, mask that row
if xstart >= 0 and ystart >= 0 and xstop <= pars.nside and ystop <= pars.nside:
rdq[:, :, xstart:xstop] |= pixel.GW_AFFECTED_DATA
# now flag potential IPC
if xstart > pars.nborder:
xstart -= 1
if xstop < pars.nside - pars.nborder:
xstop += 1
if ystart > pars.nborder:
ystart -= 1
if ystop < pars.nside - pars.nborder:
ystop += 1
rdq[:, ystart:ystop, xstart:xstop] |= pixel.GW_AFFECTED_DATA
# pull out metadata that we want later
meta = {
"frame_time": f["roman"]["meta"]["exposure"]["frame_time"],
"read_pattern": f["roman"]["meta"]["exposure"]["read_pattern"],
}
# more information
meta["ngrp"] = len(meta["read_pattern"])
meta["tbar"] = np.zeros(meta["ngrp"], dtype=np.float32)
meta["tau"] = np.zeros(meta["ngrp"], dtype=np.float32)
meta["N"] = np.zeros(meta["ngrp"], dtype=np.int16)
for i in range(meta["ngrp"]):
# N_i, tbar_i, and tau_i as defined in Casertano et al. 2022
meta["N"][i] = len(meta["read_pattern"][i])
t0 = meta["read_pattern"][i][0]
meta["tbar"][i] = (t0 + (meta["N"][i] - 1) / 2.0) * meta["frame_time"]
meta["tau"][i] = (t0 + (meta["N"][i] - 1) * (2 * meta["N"][i] - 1) / (6.0 * meta["N"][i])) * meta[
"frame_time"
]
l1meta = deepcopy(f["roman"]["meta"])
# mask
if "mask" in caldir:
with asdf.open(caldir["mask"]) as m:
rdq |= m["roman"]["dq"][None, :, :]
# pixel dq
pdq = np.bitwise_or.reduce(rdq, axis=0)
return data, rdq, pdq, meta, l1meta, amp33
[docs]
def saturation_check(data, read_pattern, rdq, pdq, caldir, mylog):
"""
Flags saturated pixels (in both 3D and 2D arrays).
Performs a saturation check on the data cube (`data`) using the calibration files in `caldir`.
Information is appended to `mylog`. The flags `rdq` and `pdq` are updated in place.
This function serves as a wrapper for ``flag_saturated_pixels`` (imported from ``stcal``).
Parameters
----------
data : np.array
3D raw data cube (uint16, shape ngroup,4096,4096).
read_pattern : list of list of int
MultiAccum table.
rdq : np.array
3D ramp data quality (uint32, shape ngroup,4096,4096).
pdq : np.array
2D pixel data quality (uint32, shape 4096,4096).
caldir : dict
Locations of calibration files.
mylog : romanimpreprocess.utils.processlog.ProcessLog
Processing log.
Returns
-------
None
"""
# passing the 0th frame will lead to division by zero, so we avoid this
# start the saturation check with the s th frame
s = 0
if read_pattern[0] == [0]:
s = 1
with asdf.open(caldir["saturation"]) as f:
flag_saturated_pixels(
data[None, s:, :, :], # flag_saturated_pixels expects a 4D array with integrations as the 0-axis
rdq[None, s:, :, :], # ramp data quality, with only 1 integration, expanded to 4D
pdq, # 2D pixel, passed through
f["roman"]["data"], # saturation threshold, 2D
np.copy(f["roman"]["dq"]), # saturation quality flags
2**16 - 1, # maximum of ADC output -- 16 bits
pixel, # this is the Roman data quality flag array
n_pix_grow_sat=1, # also flag 1 pixel around each saturated one
zframe=None,
read_pattern=read_pattern[s:], # again, this is a list of list of ints
bias=None,
)
# backs up 1 frame to be safe since if the non-linearity curve is sharp enough
# the existing algorithm can fail on a large group
# important to run this in ascending order
for i in range(len(read_pattern) - 1):
if len(read_pattern[i]) > 1:
rdq[i, :, :] |= rdq[i + 1, :, :] & pixel.SATURATED
[docs]
def subtract_dark_current(data, rdq, pdq, caldir, meta, mylog):
"""
Subtracts dark current from a linearized image.
The `data`, `rdq`, and `pdq` fields are updated in place.
Parameters
----------
data : np.array
3D data cube (in DN_lin, shape ngroup,4096,4096)
rdq : np.array
3D ramp data quality (uint32, shape ngroup,4096,4096)
pdq : np.array
2D pixel data quality (uint32, shape 4096,4096)
caldir : dict
Locations of calibration files.
meta : dict
Metadata dictionary (from L1 ASDF tree).
mylog : romanimpreprocess.utils.processlog.ProcessLog
Processing log.
Returns
-------
np.array
The 2D image of subtracted dark current in DN/s.
"""
with asdf.open(caldir["dark"]) as f:
dcsub = np.copy(f["roman"]["dark_slope"])
ngrp = meta["ngrp"]
for j in range(ngrp):
data[j, :, :] -= meta["tbar"][j] * dcsub
return dcsub
[docs]
def repackage_wcs(thewcs):
"""
Packages a WCS to feed to romanisim.
Right now supports FITS-standard headers from a simulation.
Since this for compatibility in ramp-fitting routines, can use this
and overwrite the WCS in the L2 ASDF tree with a full-accuracy gwcs
at a later stage.
Parameters
----------
thewcs : astropy.io.fits.Header or galsim.CelestialWCS
Input WCS.
Returns
-------
class
Packaged WCS, 2 layers deep for compatibility with romanisim.
"""
# make WCS --- a few ways of doing this
while True:
wcsobj = None
class Blank:
pass
# first try a FITS header
if isinstance(thewcs, fits.Header):
wcsobj = Blank()
wcsobj.header = Blank()
wcsobj.header.header = thewcs
break
# should work if this is a GalSim WCS
try:
header = fits.Header()
thewcs.writeToFitsHeader(header, galsim.BoundsI(0, pars.nside_active, 0, pars.nside_active))
# offset to FITS convention -- this is undone later
header["CRPIX1"] += 1
header["CRPIX2"] += 1
wcsobj = Blank()
wcsobj.header = Blank()
wcsobj.header.header = header
warnings.warn("Use of GalSim WCS in calibrate is not fully working yet!")
break
except Exception as e:
wcsobj = None
raise Exception("Unrecognized WCS") from e
return wcsobj
[docs]
def calibrateimage(config, verbose=True):
"""
Main routine to run the specified calibrations from a config file.
Parameters
----------
config : dict
Configuration (likely unpacked from a YAML file).
verbose : bool, optional
Whether to print lots of intermediate stuff to the terminal.
Returns
-------
None
"""
# setup
mylog = processlog.ProcessLog()
# get an initial WCS (if provided)
# in some simulations we may need to give this if the input stars themselves are simulated
thewcs = wcs_from_config(config)
caldir = config["CALDIR"]
# initialize a data cube and data quality
data, rdq, pdq, meta, l1meta, amp33 = initializationstep(config, caldir, mylog)
(ngrp, ny, nx) = np.shape(data)
nb = meta["nborder"] = pars.nborder
mylog.append("Initialized data\n")
# saturation check
saturation_check(data, meta["read_pattern"], rdq, pdq, caldir, mylog)
mylog.append("Saturation check complete\n")
# reference pixel correction -- right now using a 5-pixel filter of the left & right ref pixels
# and the top & bottom pixel subtraction functions from Laliotis et al. (2024)
# **This is a placeholder until:
# - amp33 to be implemented (currently the simulation leaves it blank)
# - improved reference pixel correction from GSFC group should be available
with asdf.open(caldir["dark"]) as f:
# rsub = np.zeros((ngrp, pars.nside), dtype=np.float32)
for j in range(ngrp):
image = np.zeros((pars.nside, pars.nside_augmented), dtype=np.float32)
image[:, : pars.nside] = data[j, :, :] - f["roman"]["data"][j, :, :]
with asdf.open(caldir["read"]) as fr:
if "amp33" in fr["roman"]:
image[:, -pars.channelwidth :] = amp33[j, :, :] - fr["roman"]["amp33"]["med"]
image[:, -pars.channelwidth :] -= np.median(image[:, -pars.channelwidth :])
image = reference_subtraction.ref_subtraction_row(image, use_ref_channel=True)
image = reference_subtraction.ref_subtraction_channel(image, use_ref_channel=True)
data[j, :, :] = image[:, : pars.nside] + f["roman"]["data"][j, :, :]
# bias correction
if "biascorr" in caldir:
with asdf.open(caldir["biascorr"]) as f:
data[:, nb:-nb, nb:-nb] -= f["roman"]["data"]
mylog.append("Included bias correction\n")
else:
mylog.append("Skipping bias correction\n")
# linearity correxction
# ** right now applies the linearity to a group average, which isn't strictly correct **
# ** will fix this in a future upgrade! **
data, dq_lin = ipc_linearity.multilin(
data,
caldir["linearitylegendre"], # the linearity cube
do_not_flag_first=meta["read_pattern"][0]
== 0, # don't flag the first read for being off scale if it is the reset
attempt_corr=~rdq
& pixel.SATURATED, # don't flag saturated pixels as having a bad linearity correction
)
if len(np.shape(dq_lin)) == 2:
rdq |= dq_lin[None, :, :]
else:
rdq |= dq_lin
del dq_lin # we have everything we need
mylog.append("Linearity correction complete\n")
# now data is in linearized DN, floating point
# subtract out dark current
# dcsub is the dark current that was subtracted --- data is updated in place
subtract_dark_current(data, rdq, pdq, caldir, meta, mylog) # removed dcsub= assignment as it isn't used
mylog.append("Dark current subtracted")
# IPC correction
if "ipc4d" in caldir:
ipc_linearity.correct_cube(data, caldir["ipc4d"], mylog, gain_file=caldir["gain"])
else:
mylog.append("skipping IPC correction\n")
# ramp fitting
uopt = {"slope": 0.4, "gain": 1.8, "sigma_read": 6.5}
if "RAMP_OPT_PARS" in config:
uopt = config["RAMP_OPT_PARS"]
u_ = float(uopt["slope"]) / float(uopt["gain"]) / float(uopt["sigma_read"]) ** 2
meta["K"] = fitting.construct_weights(u_, meta, exclude_first=True)
mylog.append(f"\n\nRamp fit optimized for u = {u_:11.5E} s**-1\n")
mylog.append("weights = {}\n".format(meta["K"]))
if "JUMP_DETECT_PARS" in config:
meta["jump_detect_pars"] = config["JUMP_DETECT_PARS"]
slope, slope_err_read, slope_err_poisson = fitting.ramp_fit(
data, rdq, pdq, meta, caldir, mylog, exclude_first=True
)
# apply flat field
flat = flatutils.get_flat(caldir, meta, pdq)
# this is the ratio of the true pixel area to the reference area (0.11 arcsec)^2
AreaFactor = (
coordutils.pixelarea(riwcs.convert_wcs_to_gwcs(repackage_wcs(thewcs)), N=np.shape(slope)[-1])
/ pars.Omega_ideal
)
flat = (flat / AreaFactor).astype(np.float32)
mylog.append("acquired flat field\n")
for p in [1, 2, 5, 10, 25, 50, 75, 90, 95, 98, 99]:
mylog.append(f" {p:2d}%ile = {np.percentile(flat,p):6.4f},")
mylog.append("\n")
slope /= flat
slope_err_read /= flat
slope_err_poisson /= flat
# need the median gain to send to a file
with asdf.open(caldir["gain"]) as g_:
medgain = np.median(g_["roman"]["data"])
mylog.append(f"median gain = {medgain:8.5f} e/DN\n")
# blank persistence object right now
persistence = rip.Persistence()
# sky information
slope_withsky = np.copy(slope) # version before sky subtraction
m = maskhandling.PixelMask1.build(pdq)
medsky, _ = sky.smooth_mode(sky.binkxk(np.where(np.logical_not(m), slope, np.nan), 4))
# if the configuration asks for simple subtraction, do it
if "SKYORDER" in config:
skyorder = int(config["SKYORDER"])
skycoefs, skymodel = sky.medfit(slope[nb:-nb, nb:-nb], order=skyorder)
slope[nb:-nb, nb:-nb] -= skymodel
del skymodel
else:
skycoefs = np.array([]).astype(np.float32)
skyorder = -1 # not used
im2, extras2 = rimage.make_asdf(
slope[nb:-nb, nb:-nb] * u.DN / u.s,
(slope_err_read[nb:-nb, nb:-nb] * u.DN / u.s) ** 2,
(slope_err_poisson[nb:-nb, nb:-nb] * u.DN / u.s) ** 2,
metadata=l1meta,
persistence=persistence,
dq=pdq[nb:-nb, nb:-nb],
imwcs=repackage_wcs(thewcs),
gain=medgain,
)
oututils.add_in_ref_data(im2, config["IN"], rdq, pdq)
# update the metadata
# oututils.update_flags(im2, "gen_cal_image") # <-- this doesn't work with updated roman_datamodels,
# but it isn't essential
oututils.add_in_provenance(im2, "gen_cal_image")
# process information specific to this code
processinfo = {
"medsky": medsky,
"medgain": medgain,
"skyorder": skyorder,
"skycoefs": skycoefs,
"ramp_opt_pars": uopt,
"meta": meta,
"weights": meta["K"],
"config": config,
"log": mylog.output,
"exclude_first": True,
}
# this is for getting the ramp data so we know which range was used
# (max 127 groups)
if "SLICEOUT" in config:
if config["SLICEOUT"]:
if ngrp >= 128:
raise ValueError("too many groups")
endslice = np.zeros((pars.nside_active, pars.nside_active), dtype=np.int8) - 1
nb = pars.nborder
for iend in range(1, ngrp):
endslice = np.where(
rdq[iend, nb:-nb, nb:-nb] & ~rdq[iend - 1, nb:-nb, nb:-nb] & pixel.SATURATED != 0,
iend - 1,
endslice,
)
processinfo["endslice"] = endslice
# Write file
with asdf.AsdfFile() as af2:
af2.tree = {"roman": im2, "processinfo": processinfo}
af2.tree["roman"]["data_withsky"] = slope_withsky[nb:-nb, nb:-nb] * u.DN / u.s
with open(config["OUT"], "wb") as f:
af2.write_to(f)
if "FITSOUT" in config:
if config["FITSOUT"]:
good = ~maskhandling.PixelMask1.build(im2["dq"]) # this is one choice
# note we accept saturated pixels in this step
fits.HDUList(
[
fits.PrimaryHDU(im2["data"]),
fits.ImageHDU(im2["dq"]),
fits.ImageHDU(np.where(good, im2["data"], -1000)),
]
).writeto(config["OUT"][:-5] + "_asdf_to.fits", overwrite=True)
print(mylog.output)
return
if __name__ == "__main__":
with open(sys.argv[1]) as f:
[docs]
config = yaml.safe_load(f)
calibrateimage(config)