Source code for romanimpreprocess.utils.ipc_linearity

"""
IPC and linearity utilities.

Classes
-------
IL
    IPC+Inverse linearity class.

Functions
---------
ipc_fwd
    Carries out an IPC operation on the image.
ipc_rev
    Inverse IPC operation, to the given order.
correct_cube
    IPC corrects a full data cube (data) in place.
_lin
    Helper function to evaluate Legendre-based function.
linearity
    Performs a linearity correction.
multilin
    Performs a linearity correction, but with multiple groups.
invlinearity
    Calculates the inverse linearity. (This is most likely to be used in simulations.)
test__lin
    Simple test function for _lin.
test_lin_ilin
    Forward-backward test for linearity routines.
test_IL
    Some tests for the inverse linearity class.

"""

import sys

import asdf
import numpy as np
from roman_datamodels.dqflags import pixel

## IPC utilities ##


[docs] def ipc_fwd(image, kernel, gain=None): """ Carries out an IPC operation on the image. Parameters ---------- image : np.array 2D numpy array of size (ny,nx). kernel : np.array 4D numpy array of size (3,3,ny,nx). gain : np.array or None, optional If not None, the input gain map (in e/DN). Returns ------- output : np.array 2D numpy array, same shape as `image`, IPC-convolved. See Also -------- ipc_rev : Inverse function. Notes ----- The output image, in psuedocode, is:: output[y,x] = sum_{dy,dx} input[y-dy,x-dx] kernel[1+dy,1+dx,y-dy,x-dx] This function natively works in electrons, but if gain is provided then works in DN (does g^-1 K g). """ im = image if gain is not None: im = gain * image # start with the center image output = im * kernel[1, 1, :, :] # nearest neighbors # dy=1, dx=0 output[1:, :] += im[:-1, :] * kernel[2, 1, :-1, :] # dy=-1, dx=0 output[:-1, :] += im[1:, :] * kernel[0, 1, 1:, :] # dy=0, dx=1 output[:, 1:] += im[:, :-1] * kernel[1, 2, :, :-1] # dy=0, dx=-1 output[:, :-1] += im[:, 1:] * kernel[1, 0, :, 1:] # diagonals # dy=1, dx=1 output[1:, 1:] += im[:-1, :-1] * kernel[2, 2, :-1, :-1] # dy=1, dx=-1 output[1:, :-1] += im[:-1, 1:] * kernel[2, 0, :-1, 1:] # dy=-1, dx=1 output[:-1, 1:] += im[1:, :-1] * kernel[0, 2, 1:, :-1] # dy=-1, dx=-1 output[:-1, :-1] += im[1:, 1:] * kernel[0, 0, 1:, 1:] if gain is not None: output /= gain return output
[docs] def ipc_rev(image, kernel, order=2, gain=None): """ Inverse IPC operation, to the given order. Grows the footprint of each pixel to ``(2*order+1,2*order+1)``. If `gain `is provided, then does g^-1 K^-1 g image instead of K^-1 image. (Equivalent: operate on DN instead of e.) Parameters ---------- image : np.array 2D numpy array of size (ny,nx). kernel : np.array 4D numpy array of size (3,3,ny,nx). order : int, optional The order of IPC inversion; default is 2nd order (correct to next-to-next-to-neighbor pixel). Error is of order ``alpha**(order+1)``. gain : np.array or None, optional If not None, the input gain map (in e/DN). Returns ------- output : np.array 2D numpy array, same shape as `image`, IPC-convolved. See Also -------- ipc_fwd : Forward function. """ image2 = image if gain is not None: image2 = gain * image output = np.copy(image2) for _ in range(order): output = output + image2 - ipc_fwd(output, kernel) if gain is not None: output /= gain return output
[docs] def correct_cube(data, ipc_file, mylog, gain_file=None): """ IPC corrects a full data cube (data) in place. Operates in electrons if `gain_file` is None or missing, but operates in DN if `gain_file` is provided. Parameters ---------- data : np.array 3D data cube, shape (ngrp, ny, nx). ipc_file : str Location of ASDF ipc4d calibration reference file. mylog : romanimpreprocess.utils.processlog.ProcessLog Processing log. gain_file : str or None, optional If not none, the ASDF gain calibration reference file. """ if ipc_file is None: if mylog is not None: mylog.append("No IPC file specified, skipping ...\n") return with asdf.open(ipc_file) as F: kernel = F["roman"]["data"] if mylog is not None: mylog.append( f"IPC kernel center range --> {np.amin(kernel[1,1,:,:]):f},{np.amax(kernel[1,1,:,:]):f}\n" ) (ngrp, ny, nx) = np.shape(data) nb = (8192 + (nx - np.shape(kernel)[-1]) // 2) % 16 if mylog is not None: mylog.append(f" ..., {ngrp:d} groups, excluding {nb:d} border pixels\n") if gain_file is None: g = 1.0 else: with asdf.open(gain_file) as G: g = np.copy(G["roman"]["data"][nb : ny - nb, nb : nx - nb]) for i in range(ngrp): data[i, nb : ny - nb, nb : nx - nb] = ipc_rev(data[i, nb : ny - nb, nb : nx - nb] * g, kernel) / g
## LINEARITY UTILITIES ##
[docs] def _lin(z, coefs, linextrap=True): """ Helper function to evaluate Legendre-based function. Parameters ---------- z : np.array Rescaled signal (modified DN), shape (ny,nx). coefs : np.array Legendre polynomial coefficients, shape (p_order+1,ny,nx). linextrap : bool, optional Linearly extrapolate beyond end of range? (Default = True is better behaved.) Returns ------- phi : np.array The linearized signal, `` sum_l coefs_l P_p(z)``, shape (ny,nx) exflag : np.array of bool Extrapolated beyond |z|=1?, shape (ny,nx) """ exflag = np.abs(z) > 1 # are we extrapolating? phi = np.copy(coefs[0, :, :]) poly_prev = np.ones_like(phi) poly = np.copy(z) for L in range(1, np.shape(coefs)[0]): if linextrap: phi += coefs[L, :, :] * np.where( exflag, np.sign(z) ** L * (1 + L * (L + 1) / 2.0 * (np.abs(z) - 1)), poly ) else: phi += coefs[L, :, :] * poly # Legendre polynomial recursion relation poly_next = (2 * L + 1) / (L + 1) * z * poly - L / (L + 1) * poly_prev poly_prev = poly poly = poly_next return phi, exflag
[docs] def linearity(S, linearity_file, origin=(0, 0)): """ Performs a linearity correction. Parameters ---------- S : np.array 2D data array in DN_raw. linearity_file : str ASDF file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of S in the convention of the file. Returns ------- Slin : np.array 2D data array in DN_lin. Same shape as `S`. dq : np.array of uint32 2D flag array. Same shape as `S`. Notes ----- The coordinate convention is that if you have a block `S` that corresponds to region ``[128:132,256:260]``, then you would give ``origin=(256,128)``. """ (dy, dx) = np.shape(S) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx with asdf.open(linearity_file) as F: Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] phi, exflag = _lin(-1 + 2 * (S - Smin) / (Smax - Smin), F["roman"]["data"][:, ymin:ymax, xmin:xmax]) dq = np.copy(F["roman"]["dq"][ymin:ymax, xmin:xmax]) dq |= np.where(exflag, pixel.NO_LIN_CORR, 0).astype(np.uint32) # flag with bad linearity correction return phi, dq
[docs] def multilin(S, linearity_file, origin=(0, 0), do_not_flag_first=True, attempt_corr=None): """ Performs a linearity correction, but with multiple groups. Parameters ---------- S : np.array Input data as a 3D numpy array, shape (ngrp, ny, nx). linearity_file : str ASDF calibration reference file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of `S` in the convention of the file. do_not_flag_first : bool, optional Don't flag the first read if it is out of range (True by default for reset-read frames that we won't use anyway). attempt_corr : np.array of bool, optional If provided, an array of the same shape as `S` that is True if we want to try the correction and False otherwise (the idea is that we want to be able to *not* flag a pixel that is saturated). Default is to attempt to correct everything. Returns ------- Slin : np.array Linearized data, shape (ngrp,ny,nx), in DN_lin. dq : np.array of uint32 Flag array (2D). Notes ----- The coordinate convention is that if you have a block `S` that corresponds to region ``[128:132,256:260]``, then you would give ``origin=(256,128)``. This is in 2D, so `origin` has only 2 entries (no offset is given on the time axis). ``Slin=0`` corresponding to the bias level ``Sref`` in the calibration reference file. """ (ngrp, dy, dx) = np.shape(S) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx # accept everything if attempt_corr not provided if attempt_corr is None: attempt_corr = np.ones((ngrp, dy, dx), dtype=bool) phi = np.zeros(np.shape(S), dtype=np.float32) with asdf.open(linearity_file) as F: Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] Sref = F["roman"]["Sref"][ymin:ymax, xmin:xmax] dq = np.copy(F["roman"]["dq"][ymin:ymax, xmin:xmax]) for j in range(ngrp): z = -1 + 2 * (S[j, :, :] - Smin) / (Smax - Smin) if j == 0 and do_not_flag_first: z = np.clip(z, -1, 1) phi[j, :, :], exflag = _lin(z, F["roman"]["data"][:, ymin:ymax, xmin:xmax]) phi[j, :, :] = np.where( dq & (pixel.NO_LIN_CORR | pixel.REFERENCE_PIXEL) == 0, phi[j, :, :], S[j, :, :] - Sref ) # flag reads with bad linearity correction if not (j == 0 and do_not_flag_first): dq |= np.where(np.logical_and(exflag, attempt_corr[j, :, :]), pixel.NO_LIN_CORR, 0).astype( np.uint32 ) return phi, dq
[docs] def invlinearity(Slin, linearity_file, origin=(0, 0)): """ Calculates the inverse linearity. (This is most likely to be used in simulations.) Parameters ---------- Slin : np.array 2D input data, shape (ny,nx), in DN_lin. linearity_file : str ASDF calibration reference file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of `S` in the convention of the calibration file. Returns ------- S : np.array 2D array, same shape as `Slin`, in DN_raw. exflag : np.array of bool Extrapolation flag. Notes ----- This function works by bisection. It is the slowest step in the simulation -> Level 1 workflow, so we plan to implement a more advanced algorithm in the future. """ (dy, dx) = np.shape(Slin) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx with asdf.open(linearity_file) as F: z = np.zeros_like(Slin) # binary search, robust over the range -1 < z < +1 # (which should encapsulate anything; also automatically saturates) for j in range(1, 25): phi, exflag = _lin(z, F["roman"]["data"][:, ymin:ymax, xmin:xmax], linextrap=False) # linextrap=False saves some time z += np.where(phi < Slin, 1 / 2**j, -1 / 2**j) Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] S = Smin + (Smax - Smin) / 2.0 * (1 + z) return S, exflag
"""IPC + inverse linearity forward modeling tools"""
[docs] class IL: """ IPC+Inverse linearity class. This exists to wrap the IPC and inverse-linearity operations in a way that is consistent with ``romanisim``. Parameters ---------- linearity_file : str The ASDF linearity calibration reference file. gain_file : str The ASDF gain calibration reference file. ipc_file : str or None The ASDF ipc4d calibration reference file. If None, skips the IPC. start_e : np.array or float, optional If provided, starts with some number of electrons (start_e, number or array) in the well. Useful for reset noise. Methods ------- __init__ Constructor set_dq Sets the 3D data quality flags. apply Converts a linearized signal to a non-linear, IPC-convolved signal. """
[docs] def __init__(self, linearity_file, gain_file, ipc_file, start_e=0.0):
[docs] self.linearity_file = linearity_file
[docs] self.gain_file = gain_file
[docs] self.ipc_file = ipc_file
[docs] self.start_e = start_e
# need the .dq attribute with asdf.open(self.linearity_file) as f: self._dq = np.copy(f["roman"]["dq"])
[docs] def set_dq(self, ngroup=1, nborder=4): """ Sets the 3D data quality flags. This is so that the data quality flags can be propagated. Parameters ---------- ngroup : int, optional Number of groups to initialize. nborder : int, optional Number of border reference pixels. Returns ------- None """ (ny, nx) = np.shape(self._dq) self.dq = np.zeros((ngroup, ny - 2 * nborder, nx - 2 * nborder), dtype=np.uint32) self.dq[:, :, :] = self._dq[None, nborder : ny - nborder, nborder : nx - nborder]
[docs] def apply(self, counts, electrons=False, electrons_out=False): """ Converts a linearized signal to a non-linear signal. Parameters ---------- counts : np.array 2D array of counts. electrons : bool, optional Is input in electrons (True) or DN_lin (False, default). electrons_out : bool, optional Is output in electrons (True) or DN_raw (False, default). """ print("apply", electrons, electrons_out, np.shape(counts)) # print(counts[:6, :6]) sys.stdout.flush() # this uses a 4d IPC file if self.ipc_file is not None: with asdf.open(self.ipc_file) as f: counts_conv = ipc_fwd(counts + self.start_e, f["roman"]["data"]) else: counts_conv = counts + self.start_e # gain factors for in and out (nyc, nxc) = np.shape(counts) g_in = 1.0 g_out = 1.0 if electrons or electrons_out: with asdf.open(self.gain_file) as f: # extract the gain (with reference pixels clipped if needed) g = f["roman"]["data"] (nyg, nxg) = np.shape(g) if nyg > nyc: nb = (nyg - nyc) // 2 g = g[nb:-nb, nb:-nb] if electrons: g_in = g if electrons_out: g_out = g # what to strip off the counts array nb = (8192 - nyc // 2) % 16 S, _ = invlinearity(counts_conv / g_in, self.linearity_file, origin=(nb, nb)) if not electrons_out: return S # below here, we know electrons_out is on. with asdf.open(self.linearity_file) as F: return g_out * (S - F["roman"]["Sref"][nb : nb + nyc, nb : nb + nxc])
[docs] def test__lin(): """Simple test function for _lin.""" z = np.linspace(-1.5, 1.5, 31).reshape((1, 31)) coefs = np.zeros((4, 1, 31)) coefs[3, :, :] = 1.0 phi, _ = _lin(z, coefs) print(phi)
[docs] def test_lin_ilin(linearity_file): """ Forward-backward test for linearity routines. Parameters ---------- linearity_file : str ASDF linearity calibration reference file. Returns ------- None """ ymin = 260 ymax = 262 xmin = 140 xmax = 143 dy = ymax - ymin dx = xmax - xmin with asdf.open(linearity_file) as F: print("Smin", F["roman"]["Smin"][ymin:ymax, xmin:xmax]) print("Smax", F["roman"]["Smax"][ymin:ymax, xmin:xmax]) S = F["roman"]["Sref"][ymin:ymax, xmin:xmax] + np.linspace(0, dx * dy - 1, dx * dy).reshape((dy, dx)) Slin, dq = linearity(S, linearity_file, origin=(xmin, ymin)) Sfwd, exflag = invlinearity(Slin, linearity_file, origin=(xmin, ymin)) print("coefs:") with asdf.open(linearity_file) as F: print(F["roman"]["data"][:, ymin:ymax, xmin:xmax]) print("signal [DN_raw]:") print(S) print("inverted signal [DN_lin]:") print(Slin) print("recovered signal [DN_raw]:") print(Sfwd) print("flags") print(dq, exflag)
[docs] def test_IL(linearity_file, gain_file, ipc_file): """ Some tests for the inverse linearity class. Parameters ---------- linearity_file : str ASDF linearity calibration reference file. gain_file : str ASDF gain calibration reference file. ipc_file : str ASDF ipc4d calibration reference file. Returns ------- None """ ILTEST = IL(linearity_file, gain_file, ipc_file) n = 4088 NE = np.zeros((n, n), dtype=np.float32) ymin = 260 ymax = 262 xmin = 140 xmax = 143 print(ILTEST.apply(NE, electrons=True, electrons_out=False)[ymin:ymax, xmin:xmax]) NE[::3, ::3] = 2.0e3 print(ILTEST.apply(NE, electrons=True, electrons_out=False)[ymin:ymax, xmin:xmax])
if __name__ == "__main__": # some tests if len(sys.argv) < 3: print("call with python linearity.py <linearity_file> <gain_file> [<ipc_file>].") exit() test__lin() test_lin_ilin(sys.argv[1]) if len(sys.argv) < 4: test_IL(sys.argv[1], sys.argv[2], None) else: test_IL(sys.argv[1], sys.argv[2], sys.argv[3])