Source code for romanimpreprocess.from_sim.sim_to_isim

"""
Functions to convert external simulated images to Roman L1/L2-like format.

This works entirely at the single exposure level. Some parts wrap romanisim.

Functions
---------
hdu_sip_hflip
    Flips an SCA in the 3n row in Detector coordinates to align with Science coordinates.
hdu_sip_vflip
    Flips an SCA in the 3n+1 or 3n+2 rows in Detector coordinates to align with Science coordinates.
make_l1_fullcal
    Makes an L1 image using an OpenUniverse input and the calibration data.
    (Merges with romanisim routines.)
noise_1f_frame
    Generates 1/f noise.
fill_in_refdata_and_1f
    Fills in reference pixels and reference output, as well as 1/f noise.
simpletest
    Quick look tool for Level 1 to Level 2 conversion (not for production).
runconfig
    Configuration-driven function to convert a simulation to Level 1 format.

Classes
-------
Image2D
    2D image (may be from simulation).
Image2D_from_L1
    2D image from Level 1 data (useful for 'shortcut' workflow, for most applications use L1_to_L2).

"""

import copy
import inspect
import re
import sys
import warnings

import asdf
import galsim
import numpy as np
import roman_datamodels
import yaml
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from romanisim import __version__ as rstversion
from romanisim import image as rimage
from romanisim import l1 as rstl1
from romanisim import persistence as rip
from romanisim import ris_make_utils as ris
from romanisim import util, wcs
from romanisim.models import parameters

from .. import pars
from ..utils import typefix
from ..utils.coordutils import pixelarea

# local imports
from ..utils.ipc_linearity import IL, ipc_rev


[docs] def hdu_sip_hflip(data, header): """ Horizontal flip of SCA and WCS. Assumes SIP convention. This function operates on the data and WCS header in place. Parameters ---------- data : np.array 2D image of an SCA. header : astropy.io.fits.header.Header Header containing the WCS. Returns ------- None See Also -------- hdu_sip_vflip : Similar, but for vertical flip instead. """ (ny, nx) = np.shape(data) data[:, :] = data[:, ::-1] # flipping the data is the easy part # now flip the WCS header["CRPIX1"] = nx + 1 - header["CRPIX1"] header["CD1_1"] = -header["CD1_1"] header["CD2_1"] = -header["CD2_1"] try: # if there is a SIP table, we flip it. # for A: the even p's need a sign flip to reverse the direction of the SIP u-axis # for B: the odd p's need a sign flip to reverse the direction of the SIP u-axis a_order = int(header["A_ORDER"]) b_order = int(header["B_ORDER"]) for p in range(0, a_order + 1, 2): for q in range(a_order + 1 - p): keyword = f"A_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) for p in range(1, b_order + 1, 2): for q in range(b_order + 1 - p): keyword = f"B_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) except (ValueError, KeyError): print("Exception in SIP table, skipping ...")
[docs] def hdu_sip_vflip(data, header): """ Vertical flip of SCA and WCS. Assumes SIP convention. This function operates on the data and WCS header in place. Parameters ---------- data : np.array 2D image of an SCA. header : astropy.io.fits.header.Header Header containing the WCS. Returns ------- None See Also -------- hdu_sip_hflip : Similar, but for horizontal flip instead. """ (ny, nx) = np.shape(data) data[:, :] = data[::-1, :] # flipping the data is the easy part # now flip the WCS header["CRPIX2"] = ny + 1 - header["CRPIX2"] header["CD1_2"] = -header["CD1_2"] header["CD2_2"] = -header["CD2_2"] try: # if there is a SIP table, we flip it. # for A: the odd q's need a sign flip to reverse the direction of the SIP v-axis # for B: the even q's need a sign flip to reverse the direction of the SIP v-axis a_order = int(header["A_ORDER"]) b_order = int(header["B_ORDER"]) for q in range(1, a_order + 1, 2): for p in range(a_order + 1 - q): keyword = f"A_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) for q in range(0, b_order + 1, 2): for p in range(b_order + 1 - q): keyword = f"B_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) except (ValueError, KeyError): print("Exception in SIP table, skipping ...")
[docs] def make_l1_fullcal(counts, read_pattern, caldir, rng=None, persistence=None, tstart=None): """ Make an L1 image with the full calibration information. This carries out similar steps to romanisim.l1.make_l1, but provides us a bit more control over the settings. Parameters ---------- counts : galsim.Image Contains mean number electrons per pixel per exposure. read_pattern : list of list of int MultiAccum table. caldir : dict Dictionary of the reference files. rng : galsim.BaseDeviate, optional The random number generator. persistence : romanisim.persistence.Persistence, optional Persistence object, not used yet. tstart : float, optional Start time to feed to romanisim. Returns ------- l1 : np.array 3D image array dq : np.array 3D quality array """ # generate the reset noise (will complain if rng is None!) resetnoise = np.zeros_like(counts.array) nb = (8192 - np.shape(resetnoise)[-1] // 2) % 256 # get border size galsim.GaussianDeviate(rng).generate(resetnoise) with asdf.open(caldir["read"]) as f: resetnoise *= f["roman"]["resetnoise"][nb:-nb, nb:-nb] with asdf.open(caldir["gain"]) as f: resetnoise *= f["roman"]["data"][nb:-nb, nb:-nb] # now resetnoise is a random image in electrons tij = rstl1.read_pattern_to_tij(read_pattern) # subtract the apporopriate amount of electrons so that we will be at the # zero level (on average) after some dark current if "biascorr" in caldir: with asdf.open(caldir["biascorr"]) as f: tbias = float(f["roman"]["t0"]) with asdf.open(caldir["gain"]) as g: with asdf.open(caldir["dark"]) as f: resetnoise -= ( tbias * f["roman"]["dark_slope"][nb:-nb, nb:-nb] / g["roman"]["data"][nb:-nb, nb:-nb] ) # this model includes linearity *and* IPC. # default application is electrons_in=False, electrons_out=False # (the .apply method is called in apportion_counts_to_resultants with # electrons_in = True -- this means that electrons go in and raw DN go out, # so actually includes the gain as well!) e2dn_model = IL(caldir["linearitylegendre"], caldir["gain"], caldir["ipc4d"], start_e=resetnoise) # set the size of the data quality array e2dn_model.set_dq(ngroup=len(read_pattern), nborder=pars.nborder) # print(read_pattern) # print(len(read_pattern)) # print('-->', counts.array[3890,237], e2dn_model.linearity_file, e2dn_model.gain_file, # e2dn_model.ipc_file) # sys.stdout.flush() # generates resultants in DN resultants, dq = rstl1.apportion_counts_to_resultants( counts.array, tij, inv_linearity=e2dn_model, crparam={}, persistence=persistence, tstart=tstart, rng=rng, seed=None, ) # print('resultants array', resultants[:,3890,237]) with asdf.open(caldir["read"]) as f: resultants = rstl1.add_read_noise_to_resultants( resultants, tij, rng=rng, seed=None, read_noise=f["roman"]["data"][nb:-nb, nb:-nb], ) # removed pedestal_extra_noise if "biascorr" in caldir: with asdf.open(caldir["biascorr"]) as f: resultants += f["roman"]["data"] resultants[:, :, :] = np.round(resultants) return resultants * u.DN, dq
[docs] def noise_1f_frame(rng): """ Generates a 4096x128 block of 1/f noise. The frame is normalized to variance of 1 per logarithmic range in frequency, i.e., S(f) = 1/f, where Var X = int_0^infty S(f) df. Parameters ---------- rng : galsim.BaseDeviate The random number generator. Returns ------- np.array of float Shape (4096,128). """ len = 2 * pars.nside * pars.channelwidth this_array = np.zeros(2 * len) galsim.GaussianDeviate(rng).generate(this_array) # get frequencies and amplitude ~ sqrt{power} freq = np.linspace(0, 1 - 1.0 / len, len) freq[len // 2 :] -= 1.0 amp = (1.0e-99 + np.abs(freq * len)) ** (-0.5) amp[0] = 0.0 ftsignal = np.zeros((len,), dtype=np.complex128) ftsignal[:] = this_array[:len] ftsignal[:] += 1j * this_array[len:] ftsignal *= amp block = np.fft.fft(ftsignal).real[: len // 2] / np.sqrt(2.0) block -= np.mean(block) # print('generated noise, std -->', np.std(block)) return block.reshape((pars.nside, pars.channelwidth)).astype(np.float32)
[docs] def fill_in_refdata_and_1f(im, caldir, rng, tij, fill_in_banding=True, amp33=None): """ Fills in the reference pixel data in an image, and adds 1/f noise. If `amp33` is provided, then also tries to fill in the reference output (if the calibration reference files have that information). Noise is added in-place to `im` and (if provided) `amp33`, keeping the same data type. Parameters ---------- im : np.array The image data cube. Shape (ngrp, ny, nx). caldir : dict The dictionary of calibration reference files. rng : galsim.BaseDeviate The random number generator. tij : list of list of float Time stamps of the reads in seconds. fill_in_banding : bool, optional Whether to gennerate 1/f noise. amp33 : np.array, optional If provided, array to put the reference output. Returns ------- None """ (ngrp, ny, nx) = np.shape(im) # get shape nborder = parameters.nborder # the extra layer in noise is for the reset noise, which gets added to everything noise = np.zeros((ngrp + 1, ny, nx), dtype=np.float32) galsim.GaussianDeviate(rng).generate(noise) with asdf.open(caldir["read"]) as f: noise[:-1, :, :] *= f["roman"]["data"][None, :, :] noise[-1, :, :] *= f["roman"]["resetnoise"] for j in range(len(tij)): noise[j, :, :] /= len(tij[j]) ** 0.5 noise[:-1, :, :] += noise[-1, :, :][None, :, :] # adds the reset noise to the reference pixels with asdf.open(caldir["dark"]) as f: noise[:-1, :, :] += np.copy(f["roman"]["data"]) # what we have above is a dark image, but we want to fill in the # active pixels with the data from im noise[:-1, nborder : ny - nborder, nborder : nx - nborder] = im[ :, nborder : ny - nborder, nborder : nx - nborder ].astype(noise.dtype) # reference output amp33info = { "valid": False, "med": np.zeros((pars.nside, pars.channelwidth), dtype=np.float32), "std": np.zeros((pars.nside, pars.channelwidth), dtype=np.float32), "M_PINK": 0.0, "RU_PINK": 0.0, } if amp33 is not None: with asdf.open(caldir["read"]) as f: if "amp33" in f["roman"]: amp33info = copy.deepcopy(f["roman"]["amp33"]) else: warnings.warn("Didn't find reference output information. Skipping ...") # generate correlated noise if fill_in_banding: with asdf.open(caldir["read"]) as f: u_pink = float(f["roman"]["anc"]["U_PINK"]) c_pink = float(f["roman"]["anc"]["C_PINK"]) print("adding correlated noise", u_pink, c_pink) for j in range(len(tij)): common_noise = noise_1f_frame(rng) * c_pink for ch in range(32): pinknoise = noise_1f_frame(rng) * u_pink + common_noise if ch % 2 == 1: pinknoise = pinknoise[:, ::-1] noise[j, :, pars.channelwidth * ch : pars.channelwidth * (ch + 1)] += ( pinknoise / len(tij[j]) ** 0.5 ).astype(noise.dtype) # reference output is completely built here (signal + noise) if amp33info["valid"]: whitenoise = np.zeros((pars.nside, pars.channelwidth), dtype=np.float32) galsim.GaussianDeviate(rng).generate(whitenoise) whitenoise *= amp33info["std"] pinknoise = amp33info["RU_PINK"] * noise_1f_frame(rng) + amp33info["M_PINK"] * common_noise amp33[j, :, :] = (amp33info["med"] + (whitenoise + pinknoise) / len(tij[j]) ** 0.5).astype( amp33.dtype ) # write back to the original im[:, :, :] = np.clip(np.round(noise[:-1, :, :]), 0, 2**16 - 1).astype(im.dtype)
[docs] class Image2D: """ 2D image, along with WCS and sky information. It can be constructed from simulations or (ultimately) from Roman data. Parameters ---------- intype : str Input type (e.g., "anlsim") Attributes ---------- image : np.array of flat A 2D image. Units are e/p/s if generated from a simulation in e. galsimwcs : galsim.wcs.CelestialWCS The world coordinate system for this image. header : astropy.io.fits.header.Header The world coordinate system for this image in FITS WCS format. date : str The observation date (ISO 8601 string). filter : str The observation filter (4 characters, e.g., R062). idsca : (int, int) The observation ID and SCA. ra_ : float Right ascension (in degrees) of the WFI. dec_ : float Declination (in degrees) of the WFI. pa_ : float Position angle (in degrees) of the WFI. refdata : dict Reference data locations. af : asdf.AsdfFile Level 1 data af2 : asdf.AsdfFile Level 2 data Methods ------- __init__ Constructor init_anlsim Constructor from Open Universe simulation file. simulate Simulates the ramps, including L1 and L2 data. L1_write_to Write simulated L1 data file (ASDF) L2_write_to Write simulated L2 data file (ASDF) Notes ----- The legal `intype` strings are: * ``anlsim`` : The Open Universe 2024 simulation "truth" (or equivalent) """
[docs] def __init__(self, intype, **kwargs): if intype == "anlsim": self.init_anlsim(kwargs["fname"])
[docs] def init_anlsim(self, fname, flip=True): """ Constructor from Open Universe 2024-type simulation. Parameters ---------- fname : str The input file name. flip : bool, optional If True, then flips from SCA native coordinates to science-aligned (SOC-like product). Returns ------- None """ # get (id,sca) m = re.search(r"_(\d+)_(\d+)\.fits", fname) self.idsca = (int(m.group(1)), int(m.group(2))) # read header and data with fits.open(fname) as f: data = f[0].data self.header = f[0].header # flip SCAs depending on which row they are in if flip: if self.idsca[1] % 3 == 0: hdu_sip_hflip(data, self.header) else: hdu_sip_vflip(data, self.header) self.image = data / self.header["EXPTIME"] # get this in electrons per second # offset from FITS -> GWCS convention self.header["CRPIX1"] -= 1 self.header["CRPIX2"] -= 1 self.galsimwcs, origin = galsim.wcs.readFromFitsHeader(self.header) try: self.date = self.header["DATE-OBS"] print(self.date) self.date = re.sub(" ", "T", self.date) + "Z" print(self.date) except (ValueError, KeyError): self.date = "2025-01-01T00:00:00.000000" self.filter = self.header["FILTER"][:4] self.ra_ = float(self.header["RA_TARG"]) self.dec_ = float(self.header["DEC_TARG"]) self.pa_ = float(self.header["PA_OBSY"])
[docs] def simulate(self, use_read_pattern, caldir=None, config={}, seed=43, includewcs=False): """ Performs Level 1 & 2 simulations. This is based on the ``romanisim.image.simulate`` function, but some functionality has been changed to be useful for this class. Parameters ---------- use_read_pattern : list of list of int The MultiAccum table. caldir : dict, optional Dictionary of where the calibration files are located. (Otherwise uses internal defaults, only good for testing.) config : dict, optional Configuration file (usually expanded from YAML). seed : int, optional Random number seed. includewcs : bool, optional If True, includes the GWCS in the output file. Returns ------- None """ target_pattern = 1000000 parameters.read_pattern[target_pattern] = use_read_pattern bandpass_name = "F" + self.filter[1:] # schema wants, e.g., "F129" instead of "J129" metadata = ris.set_metadata( date=self.date, bandpass=bandpass_name, sca=self.idsca[1], ma_table_number=target_pattern ) print("::", self.ra_, self.dec_, self.pa_) coord = SkyCoord(ra=self.ra_ * u.deg, dec=self.dec_ * u.deg, frame="icrs") wcs.fill_in_parameters(metadata, coord, boresight=False, pa_aper=self.pa_) rng = galsim.UniformDeviate(seed) ### steps below are from romanisim.image.simulate ### image_mod = roman_datamodels.datamodels.ImageModel.create_fake_data() meta = image_mod.meta meta["wcs"] = None for key in parameters.default_parameters_dictionary: meta[key].update(parameters.default_parameters_dictionary[key]) for key in metadata: meta[key].update(metadata[key]) util.add_more_metadata(meta) read_pattern = metadata["exposure"].get("read_pattern", use_read_pattern) # for this simulation, we want to build something self-contained refdata = rimage.gather_reference_data(image_mod, usecrds=False) # reffiles = refdata["reffiles"] # persistence -> None persistence = rip.Persistence() # boder reference pixels nborder = parameters.nborder # simulate a blank image # note that newer versions of romanisim need psftype, but older ones don't, so # this should be OK either way? psfpar = dict() args = inspect.signature(rimage.simulate_counts).parameters choices = {"psftype": "galsim", "stpsf": False, "webbpsf": False} for a in list(choices.keys()): if a in args: psfpar[a] = choices[a] if caldir is None: counts, simcatobj = rimage.simulate_counts( image_mod.meta, [], rng=rng, usecrds=False, darkrate=refdata["dark"], flat=refdata["flat"], psf_keywords=dict(), **psfpar, ) nside_sub = pars.nside - 2 * nborder this_flat = 1.0 * np.ones( (nside_sub, nside_sub), dtype=np.float32 ) # dummy, prevent an error later try: _g = float(parameters.reference_data["gain"]) except TypeError: _g = float(parameters.reference_data["gain"].value) g = _g * np.ones((nside_sub, nside_sub), dtype=np.float32) # dummy, prevent an error later else: # get dark current in DN/p/s with asdf.open(caldir["dark"]) as f: this_dark = f["roman"]["dark_slope"][nborder:-nborder, nborder:-nborder] # get flat field with asdf.open(caldir["flat"]) as f: this_flat = f["roman"]["data"][nborder:-nborder, nborder:-nborder] # convert to e/p/s with asdf.open(caldir["gain"]) as f: this_dark = this_dark * f["roman"]["data"][nborder:-nborder, nborder:-nborder] g = np.copy(f["roman"]["data"][nborder:-nborder, nborder:-nborder]) # save gain for later # de-convolve the IPC kernel with asdf.open(caldir["ipc4d"]) as f: this_dark = ipc_rev(this_dark, f["roman"]["data"]) # dark is in e/s this_flat = ipc_rev( this_flat, f["roman"]["data"], gain=g ) # but flat was measured in DN_lin so need gain=g this_flat = np.clip(this_flat, 0.0, 2 - 2**-21) this_dark = np.clip( this_dark, -0.1 * this_flat, None ) # prevent a spurious negative dark from giving an illegal negative total count rate # now run with this version of the dark rate counts, simcatobj = rimage.simulate_counts( image_mod.meta, [], rng=rng, usecrds=False, darkrate=this_dark, flat=this_flat, psf_keywords=dict(), **psfpar, ) util.update_pointing_and_wcsinfo_metadata(image_mod.meta, counts.wcs) # convert from e/s --> e using the parameters file and read pattern t = parameters.read_time * (use_read_pattern[-1][-1] - use_read_pattern[0][0]) # the input simulations include the pixel area in their estimated e/s # but the flat field will ultimately include the pixel area as well! # therefore we need to re-scale the flat to the ideal pixel area (0.11 arcsec)^2 flat_witharea = this_flat / (pixelarea(self.galsimwcs, N=pars.nside_active) / pars.Omega_ideal) C = float(config["CNORM"]) if "CNORM" in config else 1.0 print(pixelarea(self.galsimwcs, N=pars.nside_active)[::1024, ::1024]) print(flat_witharea[::1024, ::1024]) sys.stdout.flush() counts.array[:, :] += rng.np.poisson( lam=np.clip(C * t * g / pars.g_ideal * self.image * flat_witharea, 0, None) ).astype(counts.array.dtype) # this is where the (simulated) L1 data is created if caldir is None: l1, l1dq = rstl1.make_l1( counts, read_pattern, read_noise=refdata["readnoise"], pedestal_extra_noise=parameters.pedestal_extra_noise, rng=rng, gain=refdata["gain"], crparam={}, inv_linearity=refdata["inverselinearity"], tstart=image_mod.meta.exposure.start_time, persistence=persistence, saturation=refdata["saturation"], ) else: # need to run the individual steps ourselves l1, l1dq = make_l1_fullcal( counts, read_pattern, caldir, rng=rng, tstart=image_mod.meta.exposure.start_time, persistence=persistence, ) # convert to asdf im, extras = rstl1.make_asdf(l1, dq=l1dq, metadata=image_mod.meta, persistence=persistence) # fill in the reference pixels and reference output if caldir is not None: # NO_AMP33 would allow us to bypass the reference output, if it isn't in the config file amp33struct = im["amp33"] if "NO_AMP33" in caldir: if caldir["NO_AMP33"]: amp33struct = None fill_in_refdata_and_1f( im["data"], caldir, rng, rstl1.read_pattern_to_tij(read_pattern), fill_in_banding=True, amp33=amp33struct, ) # Create metadata for simulation parameter romanisimdict = {"version": rstversion} # for storage reasons, I took out all the large metadata arrays in romanisimdict # include the gwcs if includewcs: romanisimdict["wcs"] = wcs.wcs_from_fits_header(self.header) # Write file self.af = asdf.AsdfFile() self.af.tree = {"roman": im, "romanisim": romanisimdict} # Make idealized L2 data slopeinfo = rimage.make_l2( l1, read_pattern, read_noise=refdata["readnoise"], gain=refdata["gain"], flat=refdata["flat"], linearity=refdata["linearity"], darkrate=refdata["dark"], dq=l1dq, ) l2dq = np.bitwise_or.reduce(l1dq, axis=0) # package header so that there is a obj.header.header # this is a hack for compatibility with convert_wcs_to_gwcs class Blank: pass obj = Blank() obj.header = Blank() obj.header.header = self.header # im2, extras2 = rimage.make_asdf( *slopeinfo, metadata=image_mod.meta, persistence=persistence, dq=l2dq, imwcs=obj, gain=refdata["gain"], ) # functionality to pull over the WCS from L1 without dependence on wcs.convert_wcs_to_gwcs # im2['roman']['meta'].update(wcs=this_gwcs) # im2['roman']['meta']['wcsinfo']['s_region'] = wcs.create_s_region(this_gwcs) # may need this data if "cal_step" in im2["meta"]: im2["meta"]["cal_step"]["wfi18_transient"] = "INCOMPLETE" im2["meta"]["cal_step"]["dark_decay"] = "INCOMPLETE" # Create metadata for simulation parameter romanisimdict2 = {"version": rstversion} romanisimdict2.update(**extras2) # Write file self.af2 = asdf.AsdfFile() self.af2.tree = {"roman": im2, "romanisim": romanisimdict2} self.refdata = refdata
[docs] def L1_write_to(self, filename): """ Writes L1 data to a file if available. Parameters ---------- filename : str Where to write the file (should end with ``.asdf``). Returns ------- bool True if successful, False if not written. """ if hasattr(self, "af"): with open(filename, "wb") as f: self.af.write_to(f) else: return False
[docs] def L2_write_to(self, filename): """ Writes L2 data to a file if available. Parameters ---------- filename : str Where to write the file (should end with ``.asdf``). Returns ------- bool True if successful, False if not written. """ if hasattr(self, "af2"): typefix.fix(self.af2) with open(filename, "wb") as f: self.af2.write_to(f) else: return False
[docs] class Image2D_from_L1(Image2D): """Similar to Image2D, but constructed from L1 data file. with Image2D_from_L1(infile, refdata, thewcs) as L1: ... """ # Context manager functions
[docs] def __enter__(self): return self
[docs] def __exit__(self, exception_type, exception_value, exception_traceback): self.af.close()
# Constructor def __init__(self, infile, refdata, thewcs, verbose_err=True): """Constructor. The arguments are: infile : L1 data file (ASDF format) refdata : the calibration reference data thewcs WCS object (in some form -- currently FITS Header or GalSimWCS, though we've had issues with the latter) """
[docs] self.af = asdf.open(infile)
[docs] self.refdata = refdata
[docs] self.thewcs = thewcs
[docs] def pseudocalibrate(self): """Generates a simple calibrated (L2) image. This doesn't use romancal, but can be useful as a pass-through function. """ # collect information nborder = parameters.nborder # Make idealized L2 data refdata = self.refdata l1dq = np.zeros( np.shape(self.af["roman"]["data"][:, nborder:-nborder, nborder:-nborder]), dtype=np.uint32 ) slopeinfo = rimage.make_l2( self.af["roman"]["data"][:, nborder:-nborder, nborder:-nborder] * u.DN, self.af["roman"]["meta"]["exposure"]["read_pattern"], read_noise=refdata["readnoise"], gain=refdata["gain"], flat=refdata["flat"], linearity=refdata["linearity"], darkrate=refdata["dark"], dq=l1dq, ) l2dq = np.bitwise_or.reduce(l1dq, axis=0) # make WCS --- a few ways of doing this while True: wcsobj = None class Blank: pass # first try a FITS header if isinstance(self.thewcs, fits.Header): wcsobj = Blank() wcsobj.header = Blank() wcsobj.header.header = self.thewcs break # should work if this is a GalSim WCS # try: # header = fits.Header() # self.thewcs.writeToFitsHeader( # header, galsim.BoundsI(0, pars.nside_active, 0, pars.nside_active) # ) # # offset to FITS convention -- this is undone later # header["CRPIX1"] += 1 # header["CRPIX2"] += 1 # wcsobj = Blank() # wcsobj.header = Blank() # wcsobj.header.header = header # warnings.warn("Use of GalSim WCS in calibrate is not fully working yet!") # break # except Exception: # wcsobj = None raise Exception("Unrecognized WCS") persistence = rip.Persistence() im2, extras2 = rimage.make_asdf( *slopeinfo, metadata=self.af["roman"]["meta"], persistence=persistence, dq=l2dq, imwcs=wcsobj, gain=refdata["gain"], ) # we didn't do this step if "cal_step" in im2["meta"]: im2["meta"]["cal_step"]["wfi18_transient"] = "INCOMPLETE" im2["meta"]["cal_step"]["dark_decay"] = "INCOMPLETE" # Create metadata for simulation parameter romanisimdict2 = {"version": rstversion} romanisimdict2.update(**extras2) # Write file self.af2 = asdf.AsdfFile() self.af2.tree = {"roman": im2, "romanisim": romanisimdict2}
[docs] def run_config(config): """ This allows the L1 image construction to be called as a Python function instead of a stand-alone code. Parameters ---------- config : dict Configuration file (usually a dictionary unpacked from YAML). Returns ------- None """ # calibration files caldir = config.get("CALDIR", None) print("Reading from <--", config["IN"]) print("Writing to -->", config["OUT"]) print("Using calibration data:") print(caldir) use_read_pattern = [] ng = len(config["READS"]) // 2 for j in range(ng): use_read_pattern.append(list(range(int(config["READS"][2 * j]), int(config["READS"][2 * j + 1])))) print("Read pattern:", use_read_pattern) # Optional inputs seed = 43 if "SEED" in config: seed = int(config["SEED"]) x = Image2D("anlsim", fname=config["IN"]) x.simulate(use_read_pattern, caldir=caldir, config=config, seed=seed) x.L1_write_to(config["OUT"]) # header information for the WCS x.header["COMMENT"] = "truth wcs from sim_to_isim" x.header.tofile(config["OUT"][:-5] + "_asdf_wcshead.txt", overwrite=True) # also write the FITS file for viewing if "FITSOUT" in config: if config["FITSOUT"]: image_out = np.zeros((ng, pars.nside, pars.nside_augmented), dtype=np.uint16) with asdf.open(config["OUT"]) as f: image_out[:, :, : pars.nside] = f["roman"]["data"] image_out[:, :, pars.nside :] = f["roman"]["amp33"] fits.PrimaryHDU(image_out).writeto(config["OUT"][:-5] + "_asdf_to.fits", overwrite=True)
if __name__ == "__main__": """Stand-alone function to convert from OpenUniverse to L1. Call it with: python sim_to_isim <config file> The config file is in YAML format and has the fields: Required: 'IN': input file name (FITS) 'OUT': output file name (must end in '.asdf') 'READS': a list of length 2*Ngrp: 0th group is [READS[0]:READS[1]], then [READS[2]:READS[3]], etc. Optional: 'CALDIR': Python dictionary with the calibration reference files. This can contain: LINEARITY 'FITSOUT': also write a FITS output (default: False; mostly useful for visualization in ds9) 'SEED': RNG seed """ # read settings with open(sys.argv[1]) as f:
[docs] config = yaml.safe_load(f)
run_config(config)