Source code for romanimpreprocess.utils.ipc_linearity

"""
IPC and linearity utilities.

Classes
-------
IL
    IPC+Inverse linearity class.

Functions
---------
ipc_fwd
    Carries out an IPC operation on the image.
ipc_rev
    Inverse IPC operation, to the given order.
correct_cube
    IPC corrects a full data cube (data) in place.
_lin
    Helper function to evaluate Legendre-based function.
linearity
    Performs a linearity correction.
multilin
    Performs a linearity correction, but with multiple groups.
invlinearity
    Calculates the inverse linearity. (This is most likely to be used in simulations.)

"""

import sys

import asdf
import numpy as np
from roman_datamodels.dqflags import pixel

## IPC utilities ##


[docs] def ipc_fwd(image, kernel, gain=None): """ Carries out an IPC operation on the image. Parameters ---------- image : np.array 2D numpy array of size (ny,nx). kernel : np.array 4D numpy array of size (3,3,ny,nx). gain : np.array or None, optional If not None, the input gain map (in e/DN). Returns ------- output : np.array 2D numpy array, same shape as `image`, IPC-convolved. See Also -------- ipc_rev : Inverse function. Notes ----- The output image, in psuedocode, is:: output[y,x] = sum_{dy,dx} input[y-dy,x-dx] kernel[1+dy,1+dx,y-dy,x-dx] This function natively works in electrons, but if gain is provided then works in DN (does g^-1 K g). """ im = image if gain is not None: im = gain * image # start with the center image output = im * kernel[1, 1, :, :] # nearest neighbors # dy=1, dx=0 output[1:, :] += im[:-1, :] * kernel[2, 1, :-1, :] # dy=-1, dx=0 output[:-1, :] += im[1:, :] * kernel[0, 1, 1:, :] # dy=0, dx=1 output[:, 1:] += im[:, :-1] * kernel[1, 2, :, :-1] # dy=0, dx=-1 output[:, :-1] += im[:, 1:] * kernel[1, 0, :, 1:] # diagonals # dy=1, dx=1 output[1:, 1:] += im[:-1, :-1] * kernel[2, 2, :-1, :-1] # dy=1, dx=-1 output[1:, :-1] += im[:-1, 1:] * kernel[2, 0, :-1, 1:] # dy=-1, dx=1 output[:-1, 1:] += im[1:, :-1] * kernel[0, 2, 1:, :-1] # dy=-1, dx=-1 output[:-1, :-1] += im[1:, 1:] * kernel[0, 0, 1:, 1:] if gain is not None: output /= gain return output
[docs] def ipc_rev(image, kernel, order=2, gain=None): """ Inverse IPC operation, to the given order. Grows the footprint of each pixel to ``(2*order+1,2*order+1)``. If `gain `is provided, then does g^-1 K^-1 g image instead of K^-1 image. (Equivalent: operate on DN instead of e.) Parameters ---------- image : np.array 2D numpy array of size (ny,nx). kernel : np.array 4D numpy array of size (3,3,ny,nx). order : int, optional The order of IPC inversion; default is 2nd order (correct to next-to-next-to-neighbor pixel). Error is of order ``alpha**(order+1)``. gain : np.array or None, optional If not None, the input gain map (in e/DN). Returns ------- output : np.array 2D numpy array, same shape as `image`, IPC-convolved. See Also -------- ipc_fwd : Forward function. """ image2 = image if gain is not None: image2 = gain * image output = np.copy(image2) for _ in range(order): output = output + image2 - ipc_fwd(output, kernel) if gain is not None: output /= gain return output
[docs] def correct_cube(data, ipc_file, mylog, gain_file=None): """ IPC corrects a full data cube (data) in place. Operates in electrons if `gain_file` is None or missing, but operates in DN if `gain_file` is provided. Parameters ---------- data : np.array 3D data cube, shape (ngrp, ny, nx). ipc_file : str Location of ASDF ipc4d calibration reference file. mylog : romanimpreprocess.utils.processlog.ProcessLog Processing log. gain_file : str or None, optional If not none, the ASDF gain calibration reference file. """ if ipc_file is None: if mylog is not None: mylog.append("No IPC file specified, skipping ...\n") return with asdf.open(ipc_file) as F: kernel = F["roman"]["data"] if mylog is not None: mylog.append( f"IPC kernel center range --> {np.amin(kernel[1,1,:,:]):f},{np.amax(kernel[1,1,:,:]):f}\n" ) (ngrp, ny, nx) = np.shape(data) nb = (8192 + (nx - np.shape(kernel)[-1]) // 2) % 16 if mylog is not None: mylog.append(f" ..., {ngrp:d} groups, excluding {nb:d} border pixels\n") if gain_file is None: g = 1.0 else: with asdf.open(gain_file) as G: g = np.copy(G["roman"]["data"][nb : ny - nb, nb : nx - nb]) for i in range(ngrp): data[i, nb : ny - nb, nb : nx - nb] = ipc_rev(data[i, nb : ny - nb, nb : nx - nb] * g, kernel) / g
## LINEARITY UTILITIES ##
[docs] def _lin(z, coefs, linextrap=True): """ Helper function to evaluate Legendre-based function. Parameters ---------- z : np.array Rescaled signal (modified DN), shape (ny,nx). coefs : np.array Legendre polynomial coefficients, shape (p_order+1,ny,nx). linextrap : bool, optional Linearly extrapolate beyond end of range? (Default = True is better behaved.) Returns ------- phi : np.array The linearized signal, `` sum_l coefs_l P_p(z)``, shape (ny,nx) exflag : np.array of bool Extrapolated beyond |z|=1?, shape (ny,nx) """ exflag = np.abs(z) > 1 # are we extrapolating? phi = np.copy(coefs[0, :, :]) poly_prev = np.ones_like(phi) poly = np.copy(z) for L in range(1, np.shape(coefs)[0]): if linextrap: phi += coefs[L, :, :] * np.where( exflag, np.sign(z) ** L * (1 + L * (L + 1) / 2.0 * (np.abs(z) - 1)), poly ) else: phi += coefs[L, :, :] * poly # Legendre polynomial recursion relation poly_next = (2 * L + 1) / (L + 1) * z * poly - L / (L + 1) * poly_prev poly_prev = poly poly = poly_next return phi, exflag
[docs] def linearity(S, linearity_file, origin=(0, 0)): """ Performs a linearity correction. Parameters ---------- S : np.array 2D data array in DN_raw. linearity_file : str ASDF file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of S in the convention of the file. Returns ------- Slin : np.array 2D data array in DN_lin. Same shape as `S`. dq : np.array of uint32 2D flag array. Same shape as `S`. Notes ----- The coordinate convention is that if you have a block `S` that corresponds to region ``[128:132,256:260]``, then you would give ``origin=(256,128)``. """ (dy, dx) = np.shape(S) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx with asdf.open(linearity_file) as F: Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] phi, exflag = _lin(-1 + 2 * (S - Smin) / (Smax - Smin), F["roman"]["data"][:, ymin:ymax, xmin:xmax]) dq = np.copy(F["roman"]["dq"][ymin:ymax, xmin:xmax]) dq |= np.where(exflag, pixel.NO_LIN_CORR, 0).astype(np.uint32) # flag with bad linearity correction return phi, dq
[docs] def multilin(S, linearity_file, origin=(0, 0), do_not_flag_first=True, attempt_corr=None): """ Performs a linearity correction, but with multiple groups. Parameters ---------- S : np.array Input data as a 3D numpy array, shape (ngrp, ny, nx). linearity_file : str ASDF calibration reference file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of `S` in the convention of the file. do_not_flag_first : bool, optional Don't flag the first read if it is out of range (True by default for reset-read frames that we won't use anyway). attempt_corr : np.array of bool, optional If provided, an array of the same shape as `S` that is True if we want to try the correction and False otherwise (the idea is that we want to be able to *not* flag a pixel that is saturated). Default is to attempt to correct everything. Returns ------- Slin : np.array Linearized data, shape (ngrp,ny,nx), in DN_lin. dq : np.array of uint32 Flag array (2D). Notes ----- The coordinate convention is that if you have a block `S` that corresponds to region ``[128:132,256:260]``, then you would give ``origin=(256,128)``. This is in 2D, so `origin` has only 2 entries (no offset is given on the time axis). ``Slin=0`` corresponding to the bias level ``Sref`` in the calibration reference file. """ (ngrp, dy, dx) = np.shape(S) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx # accept everything if attempt_corr not provided if attempt_corr is None: attempt_corr = np.ones((ngrp, dy, dx), dtype=bool) phi = np.zeros(np.shape(S), dtype=np.float32) with asdf.open(linearity_file) as F: Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] Sref = F["roman"]["Sref"][ymin:ymax, xmin:xmax] dq = np.copy(F["roman"]["dq"][ymin:ymax, xmin:xmax]) for j in range(ngrp): z = -1 + 2 * (S[j, :, :] - Smin) / (Smax - Smin) if j == 0 and do_not_flag_first: z = np.clip(z, -1, 1) phi[j, :, :], exflag = _lin(z, F["roman"]["data"][:, ymin:ymax, xmin:xmax]) phi[j, :, :] = np.where( dq & (pixel.NO_LIN_CORR | pixel.REFERENCE_PIXEL) == 0, phi[j, :, :], S[j, :, :] - Sref ) # flag reads with bad linearity correction if not (j == 0 and do_not_flag_first): dq |= np.where(np.logical_and(exflag, attempt_corr[j, :, :]), pixel.NO_LIN_CORR, 0).astype( np.uint32 ) return phi, dq
[docs] def invlinearity(Slin, linearity_file, origin=(0, 0)): """ Calculates the inverse linearity. (This is most likely to be used in simulations.) Parameters ---------- Slin : np.array 2D input data, shape (ny,nx), in DN_lin. linearity_file : str ASDF calibration reference file with linearity data. origin : (int, int), optional The (x,y) position of the lower-left corner of `S` in the convention of the calibration file. Returns ------- S : np.array 2D array, same shape as `Slin`, in DN_raw. exflag : np.array of bool Extrapolation flag. Notes ----- This function works by bisection. It is the slowest step in the simulation -> Level 1 workflow, so we plan to implement a more advanced algorithm in the future. """ (dy, dx) = np.shape(Slin) ymin = origin[1] ymax = ymin + dy xmin = origin[0] xmax = xmin + dx with asdf.open(linearity_file) as F: z = np.zeros_like(Slin) # binary search, robust over the range -1 < z < +1 # (which should encapsulate anything; also automatically saturates) for j in range(1, 25): phi, exflag = _lin(z, F["roman"]["data"][:, ymin:ymax, xmin:xmax], linextrap=False) # linextrap=False saves some time z += np.where(phi < Slin, 1 / 2**j, -1 / 2**j) Smin = F["roman"]["Smin"][ymin:ymax, xmin:xmax] Smax = F["roman"]["Smax"][ymin:ymax, xmin:xmax] S = Smin + (Smax - Smin) / 2.0 * (1 + z) return S, exflag
"""IPC + inverse linearity forward modeling tools"""
[docs] class IL: """ IPC+Inverse linearity class. This exists to wrap the IPC and inverse-linearity operations in a way that is consistent with ``romanisim``. Parameters ---------- linearity_file : str The ASDF linearity calibration reference file. gain_file : str The ASDF gain calibration reference file. ipc_file : str or None The ASDF ipc4d calibration reference file. If None, skips the IPC. start_e : np.array or float, optional If provided, starts with some number of electrons (start_e, number or array) in the well. Useful for reset noise. Methods ------- __init__ Constructor set_dq Sets the 3D data quality flags. apply Converts a linearized signal to a non-linear, IPC-convolved signal. """
[docs] def __init__(self, linearity_file, gain_file, ipc_file, start_e=0.0):
[docs] self.linearity_file = linearity_file
[docs] self.gain_file = gain_file
[docs] self.ipc_file = ipc_file
[docs] self.start_e = start_e
# need the .dq attribute with asdf.open(self.linearity_file) as f: self._dq = np.copy(f["roman"]["dq"])
[docs] def set_dq(self, ngroup=1, nborder=4): """ Sets the 3D data quality flags. This is so that the data quality flags can be propagated. Parameters ---------- ngroup : int, optional Number of groups to initialize. nborder : int, optional Number of border reference pixels. Returns ------- None """ (ny, nx) = np.shape(self._dq) self.dq = np.zeros((ngroup, ny - 2 * nborder, nx - 2 * nborder), dtype=np.uint32) self.dq[:, :, :] = self._dq[None, nborder : ny - nborder, nborder : nx - nborder]
[docs] def apply(self, counts, electrons=False, electrons_out=False): """ Converts a linearized signal to a non-linear signal. Parameters ---------- counts : np.array 2D array of counts. electrons : bool, optional Is input in electrons (True) or DN_lin (False, default). electrons_out : bool, optional Is output in electrons (True) or DN_raw (False, default). """ print("apply", electrons, electrons_out, np.shape(counts)) # print(counts[:6, :6]) sys.stdout.flush() # this uses a 4d IPC file if self.ipc_file is not None: with asdf.open(self.ipc_file) as f: counts_conv = ipc_fwd(counts + self.start_e, f["roman"]["data"]) else: counts_conv = counts + self.start_e # gain factors for in and out (nyc, nxc) = np.shape(counts) g_in = 1.0 g_out = 1.0 if electrons or electrons_out: with asdf.open(self.gain_file) as f: # extract the gain (with reference pixels clipped if needed) g = f["roman"]["data"] (nyg, nxg) = np.shape(g) if nyg > nyc: nb = (nyg - nyc) // 2 g = g[nb:-nb, nb:-nb] if electrons: g_in = g if electrons_out: g_out = g # what to strip off the counts array nb = (8192 - nyc // 2) % 16 S, _ = invlinearity(counts_conv / g_in, self.linearity_file, origin=(nb, nb)) if not electrons_out: return S # below here, we know electrons_out is on. with asdf.open(self.linearity_file) as F: return g_out * (S - F["roman"]["Sref"][nb : nb + nyc, nb : nb + nxc])