Source code for romanimpreprocess.from_sim.sim_to_isim

"""
Functions to convert external simulated images to Roman L1/L2-like format.

This works entirely at the single exposure level. Some parts wrap romanisim.

Functions
---------
hdu_sip_hflip
    Flips an SCA in the 3n row in Detector coordinates to align with Science coordinates.
hdu_sip_vflip
    Flips an SCA in the 3n+1 or 3n+2 rows in Detector coordinates to align with Science coordinates.
make_l1_fullcal
    Makes an L1 image using an OpenUniverse input and the calibration data.
    (Merges with romanisim routines.)
noise_1f_frame
    Generates 1/f noise.
fill_in_refdata_and_1f
    Fills in reference pixels and reference output, as well as 1/f noise.
simpletest
    Quick look tool for Level 1 to Level 2 conversion (not for production).
runconfig
    Configuration-driven function to convert a simulation to Level 1 format.

Classes
-------
Image2D
    2D image (may be from simulation).
Image2D_from_L1
    2D image from Level 1 data (useful for 'shortcut' workflow, for most applications use L1_to_L2).

"""

import copy
import re
import sys
import warnings

import asdf
import galsim
import numpy as np
import roman_datamodels
import yaml
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from romanisim import __version__ as rstversion
from romanisim import image as rimage
from romanisim import l1 as rstl1
from romanisim import parameters, util, wcs
from romanisim import persistence as rip
from romanisim import ris_make_utils as ris

from .. import pars
from ..utils.coordutils import pixelarea

# local imports
from ..utils.ipc_linearity import IL, ipc_rev


[docs] def hdu_sip_hflip(data, header): """ Horizontal flip of SCA and WCS. Assumes SIP convention. This function operates on the data and WCS header in place. Parameters ---------- data : np.array 2D image of an SCA. header : astropy.io.fits.header.Header Header containing the WCS. Returns ------- None See Also -------- hdu_sip_vflip : Similar, but for vertical flip instead. """ (ny, nx) = np.shape(data) data[:, :] = data[:, ::-1] # flipping the data is the easy part # now flip the WCS header["CRPIX1"] = nx + 1 - header["CRPIX1"] header["CD1_1"] = -header["CD1_1"] header["CD2_1"] = -header["CD2_1"] try: # if there is a SIP table, we flip it. # for A: the even p's need a sign flip to reverse the direction of the SIP u-axis # for B: the odd p's need a sign flip to reverse the direction of the SIP u-axis a_order = int(header["A_ORDER"]) b_order = int(header["B_ORDER"]) for p in range(0, a_order + 1, 2): for q in range(a_order + 1 - p): keyword = f"A_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) for p in range(1, b_order + 1, 2): for q in range(b_order + 1 - p): keyword = f"B_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) except (ValueError, KeyError): print("Exception in SIP table, skipping ...")
[docs] def hdu_sip_vflip(data, header): """ Vertical flip of SCA and WCS. Assumes SIP convention. This function operates on the data and WCS header in place. Parameters ---------- data : np.array 2D image of an SCA. header : astropy.io.fits.header.Header Header containing the WCS. Returns ------- None See Also -------- hdu_sip_hflip : Similar, but for horizontal flip instead. """ (ny, nx) = np.shape(data) data[:, :] = data[::-1, :] # flipping the data is the easy part # now flip the WCS header["CRPIX2"] = ny + 1 - header["CRPIX2"] header["CD1_2"] = -header["CD1_2"] header["CD2_2"] = -header["CD2_2"] try: # if there is a SIP table, we flip it. # for A: the odd q's need a sign flip to reverse the direction of the SIP v-axis # for B: the even q's need a sign flip to reverse the direction of the SIP v-axis a_order = int(header["A_ORDER"]) b_order = int(header["B_ORDER"]) for q in range(1, a_order + 1, 2): for p in range(a_order + 1 - q): keyword = f"A_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) for q in range(0, b_order + 1, 2): for p in range(b_order + 1 - q): keyword = f"B_{p:1d}_{q:1d}" if keyword in header: header[keyword] = -float(header[keyword]) except (ValueError, KeyError): print("Exception in SIP table, skipping ...")
[docs] def make_l1_fullcal(counts, read_pattern, caldir, rng=None, persistence=None, tstart=None): """ Make an L1 image with the full calibration information. This carries out similar steps to romanisim.l1.make_l1, but provides us a bit more control over the settings. Parameters ---------- counts : galsim.Image Contains mean number electrons per pixel per exposure. read_pattern : list of list of int MultiAccum table. caldir : dict Dictionary of the reference files. rng : galsim.BaseDeviate, optional The random number generator. persistence : romanisim.persistence.Persistence, optional Persistence object, not used yet. tstart : float, optional Start time to feed to romanisim. Returns ------- l1 : np.array 3D image array dq : np.array 3D quality array """ # generate the reset noise (will complain if rng is None!) resetnoise = np.zeros_like(counts.array) nb = (8192 - np.shape(resetnoise)[-1] // 2) % 256 # get border size galsim.GaussianDeviate(rng).generate(resetnoise) with asdf.open(caldir["read"]) as f: resetnoise *= f["roman"]["resetnoise"][nb:-nb, nb:-nb] with asdf.open(caldir["gain"]) as f: resetnoise *= f["roman"]["data"][nb:-nb, nb:-nb] # now resetnoise is a random image in electrons tij = rstl1.read_pattern_to_tij(read_pattern) # subtract the apporopriate amount of electrons so that we will be at the # zero level (on average) after some dark current if "biascorr" in caldir: with asdf.open(caldir["biascorr"]) as f: tbias = float(f["roman"]["t0"]) with asdf.open(caldir["gain"]) as g: with asdf.open(caldir["dark"]) as f: resetnoise -= ( tbias * f["roman"]["dark_slope"][nb:-nb, nb:-nb] / g["roman"]["data"][nb:-nb, nb:-nb] ) # this model includes linearity *and* IPC. # default application is electrons_in=False, electrons_out=False # (the .apply method is called in apportion_counts_to_resultants with # electrons_in = True -- this means that electrons go in and raw DN go out, # so actually includes the gain as well!) e2dn_model = IL(caldir["linearitylegendre"], caldir["gain"], caldir["ipc4d"], start_e=resetnoise) # set the size of the data quality array e2dn_model.set_dq(ngroup=len(read_pattern), nborder=pars.nborder) # print(read_pattern) # print(len(read_pattern)) # print('-->', counts.array[3890,237], e2dn_model.linearity_file, e2dn_model.gain_file, # e2dn_model.ipc_file) # sys.stdout.flush() # generates resultants in DN resultants, dq = rstl1.apportion_counts_to_resultants( counts.array, tij, inv_linearity=e2dn_model, crparam={}, persistence=persistence, tstart=tstart, rng=rng, seed=None, ) # print('resultants array', resultants[:,3890,237]) with asdf.open(caldir["read"]) as f: resultants = rstl1.add_read_noise_to_resultants( resultants, tij, rng=rng, seed=None, read_noise=f["roman"]["data"][nb:-nb, nb:-nb], pedestal_extra_noise=None, ) if "biascorr" in caldir: with asdf.open(caldir["biascorr"]) as f: resultants += f["roman"]["data"] resultants[:, :, :] = np.round(resultants) return resultants * u.DN, dq
[docs] def noise_1f_frame(rng): """ Generates a 4096x128 block of 1/f noise. The frame is normalized to variance of 1 per logarithmic range in frequency, i.e., S(f) = 1/f, where Var X = int_0^infty S(f) df. Parameters ---------- rng : galsim.BaseDeviate The random number generator. Returns ------- np.array of float Shape (4096,128). """ len = 2 * pars.nside * pars.channelwidth this_array = np.zeros(2 * len) galsim.GaussianDeviate(rng).generate(this_array) # get frequencies and amplitude ~ sqrt{power} freq = np.linspace(0, 1 - 1.0 / len, len) freq[len // 2 :] -= 1.0 amp = (1.0e-99 + np.abs(freq * len)) ** (-0.5) amp[0] = 0.0 ftsignal = np.zeros((len,), dtype=np.complex128) ftsignal[:] = this_array[:len] ftsignal[:] += 1j * this_array[len:] ftsignal *= amp block = np.fft.fft(ftsignal).real[: len // 2] / np.sqrt(2.0) block -= np.mean(block) # print('generated noise, std -->', np.std(block)) return block.reshape((pars.nside, pars.channelwidth)).astype(np.float32)
[docs] def fill_in_refdata_and_1f(im, caldir, rng, tij, fill_in_banding=True, amp33=None): """ Fills in the reference pixel data in an image, and adds 1/f noise. If `amp33` is provided, then also tries to fill in the reference output (if the calibration reference files have that information). Noise is added in-place to `im` and (if provided) `amp33`, keeping the same data type. Parameters ---------- im : np.array The image data cube. Shape (ngrp, ny, nx). caldir : dict The dictionary of calibration reference files. rng : galsim.BaseDeviate The random number generator. tij : list of list of float Time stamps of the reads in seconds. fill_in_banding : bool, optional Whether to gennerate 1/f noise. amp33 : np.array, optional If provided, array to put the reference output. Returns ------- None """ (ngrp, ny, nx) = np.shape(im) # get shape nborder = parameters.nborder # the extra layer in noise is for the reset noise, which gets added to everything noise = np.zeros((ngrp + 1, ny, nx), dtype=np.float32) galsim.GaussianDeviate(rng).generate(noise) with asdf.open(caldir["read"]) as f: noise[:-1, :, :] *= f["roman"]["data"][None, :, :] noise[-1, :, :] *= f["roman"]["resetnoise"] for j in range(len(tij)): noise[j, :, :] /= len(tij[j]) ** 0.5 noise[:-1, :, :] += noise[-1, :, :][None, :, :] # adds the reset noise to the reference pixels with asdf.open(caldir["dark"]) as f: noise[:-1, :, :] += np.copy(f["roman"]["data"]) # what we have above is a dark image, but we want to fill in the # active pixels with the data from im noise[:-1, nborder : ny - nborder, nborder : nx - nborder] = im[ :, nborder : ny - nborder, nborder : nx - nborder ].astype(noise.dtype) # reference output amp33info = { "valid": False, "med": np.zeros((pars.nside, pars.channelwidth), dtype=np.float32), "std": np.zeros((pars.nside, pars.channelwidth), dtype=np.float32), "M_PINK": 0.0, "RU_PINK": 0.0, } if amp33 is not None: with asdf.open(caldir["read"]) as f: if "amp33" in f["roman"]: amp33info = copy.deepcopy(f["roman"]["amp33"]) else: warnings.warn("Didn't find reference output information. Skipping ...") # generate correlated noise if fill_in_banding: with asdf.open(caldir["read"]) as f: u_pink = float(f["roman"]["anc"]["U_PINK"]) c_pink = float(f["roman"]["anc"]["C_PINK"]) print("adding correlated noise", u_pink, c_pink) for j in range(len(tij)): common_noise = noise_1f_frame(rng) * c_pink for ch in range(32): pinknoise = noise_1f_frame(rng) * u_pink + common_noise if ch % 2 == 1: pinknoise = pinknoise[:, ::-1] noise[j, :, pars.channelwidth * ch : pars.channelwidth * (ch + 1)] += ( pinknoise / len(tij[j]) ** 0.5 ).astype(noise.dtype) # reference output is completely built here (signal + noise) if amp33info["valid"]: whitenoise = np.zeros((pars.nside, pars.channelwidth), dtype=np.float32) galsim.GaussianDeviate(rng).generate(whitenoise) whitenoise *= amp33info["std"] pinknoise = amp33info["RU_PINK"] * noise_1f_frame(rng) + amp33info["M_PINK"] * common_noise amp33[j, :, :] = (amp33info["med"] + (whitenoise + pinknoise) / len(tij[j]) ** 0.5).astype( amp33.dtype ) # write back to the original im[:, :, :] = np.clip(np.round(noise[:-1, :, :]), 0, 2**16 - 1).astype(im.dtype)
[docs] class Image2D: """ 2D image, along with WCS and sky information. It can be constructed from simulations or (ultimately) from Roman data. Parameters ---------- intype : str Input type (e.g., "anlsim") Attributes ---------- image : np.array of flat A 2D image. Units are e/p/s if generated from a simulation in e. galsimwcs : galsim.wcs.CelestialWCS The world coordinate system for this image. header : astropy.io.fits.header.Header The world coordinate system for this image in FITS WCS format. date : str The observation date (ISO 8601 string). filter : str The observation filter (4 characters, e.g., R062). idsca : (int, int) The observation ID and SCA. ra_ : float Right ascension (in degrees) of the WFI. dec_ : float Declination (in degrees) of the WFI. pa_ : float Position angle (in degrees) of the WFI. refdata : dict Reference data locations. af : asdf.AsdfFile Level 1 data af2 : asdf.AsdfFile Level 2 data Methods ------- __init__ Constructor init_anlsim Constructor from Open Universe simulation file. simulate Simulates the ramps, including L1 and L2 data. L1_write_to Write simulated L1 data file (ASDF) L2_write_to Write simulated L2 data file (ASDF) Notes ----- The legal `intype` strings are: * ``anlsim`` : The Open Universe 2024 simulation "truth" (or equivalent) """
[docs] def __init__(self, intype, **kwargs): if intype == "anlsim": self.init_anlsim(kwargs["fname"])
[docs] def init_anlsim(self, fname, flip=True): """ Constructor from Open Universe 2024-type simulation. Parameters ---------- fname : str The input file name. flip : bool, optional If True, then flips from SCA native coordinates to science-aligned (SOC-like product). Returns ------- None """ # get (id,sca) m = re.search(r"_(\d+)_(\d+)\.fits", fname) self.idsca = (int(m.group(1)), int(m.group(2))) # read header and data with fits.open(fname) as f: data = f[0].data self.header = f[0].header # flip SCAs depending on which row they are in if flip: if self.idsca[1] % 3 == 0: hdu_sip_hflip(data, self.header) else: hdu_sip_vflip(data, self.header) self.image = data / self.header["EXPTIME"] # get this in electrons per second # offset from FITS -> GWCS convention self.header["CRPIX1"] -= 1 self.header["CRPIX2"] -= 1 self.galsimwcs, origin = galsim.wcs.readFromFitsHeader(self.header) try: self.date = self.header["DATE-OBS"] print(self.date) self.date = re.sub(" ", "T", self.date) + "Z" print(self.date) except (ValueError, KeyError): self.date = "2025-01-01T00:00:00.000000" self.filter = self.header["FILTER"][:4] self.ra_ = float(self.header["RA_TARG"]) self.dec_ = float(self.header["DEC_TARG"]) self.pa_ = float(self.header["PA_OBSY"])
[docs] def simulate(self, use_read_pattern, caldir=None, config={}, seed=43): """ Performs Level 1 & 2 simulations. This is based on the ``romanisim.image.simulate`` function, but some functionality has been changed to be useful for this class. Parameters ---------- use_read_pattern : list of list of int The MultiAccum table. caldir : dict, optional Dictionary of where the calibration files are located. (Otherwise uses internal defaults, only good for testing.) config : dict, optional Configuration file (usually expanded from YAML). seed : int, optional Random number seed. Returns ------- None """ target_pattern = 1000000 parameters.read_pattern[target_pattern] = use_read_pattern bandpass_name = "F" + self.filter[1:] # schema wants, e.g., "F129" instead of "J129" metadata = ris.set_metadata( date=self.date, bandpass=bandpass_name, sca=self.idsca[1], ma_table_number=target_pattern ) print("::", self.ra_, self.dec_, self.pa_) coord = SkyCoord(ra=self.ra_ * u.deg, dec=self.dec_ * u.deg, frame="icrs") wcs.fill_in_parameters(metadata, coord, boresight=False, pa_aper=self.pa_) rng = galsim.UniformDeviate(seed) ### steps below are from romanisim.image.simulate ### image_mod = roman_datamodels.datamodels.ImageModel.create_fake_data() meta = image_mod.meta meta["wcs"] = None for key in parameters.default_parameters_dictionary: meta[key].update(parameters.default_parameters_dictionary[key]) for key in metadata: meta[key].update(metadata[key]) util.add_more_metadata(meta) read_pattern = metadata["exposure"].get("read_pattern", use_read_pattern) # for this simulation, we want to build something self-contained refdata = rimage.gather_reference_data(image_mod, usecrds=False) # reffiles = refdata["reffiles"] # persistence -> None persistence = rip.Persistence() # boder reference pixels nborder = parameters.nborder # simulate a blank image if caldir is None: counts, simcatobj = rimage.simulate_counts( image_mod.meta, [], rng=rng, usecrds=False, darkrate=refdata["dark"], flat=refdata["flat"], psf_keywords=dict(), ) else: # get dark current in DN/p/s with asdf.open(caldir["dark"]) as f: this_dark = f["roman"]["dark_slope"][nborder:-nborder, nborder:-nborder] # get flat field with asdf.open(caldir["flat"]) as f: this_flat = f["roman"]["data"][nborder:-nborder, nborder:-nborder] # convert to e/p/s with asdf.open(caldir["gain"]) as f: this_dark = this_dark * f["roman"]["data"][nborder:-nborder, nborder:-nborder] g = np.copy(f["roman"]["data"][nborder:-nborder, nborder:-nborder]) # save gain for later # de-convolve the IPC kernel with asdf.open(caldir["ipc4d"]) as f: this_dark = ipc_rev(this_dark, f["roman"]["data"]) # dark is in e/s this_flat = ipc_rev( this_flat, f["roman"]["data"], gain=g ) # but flat was measured in DN_lin so need gain=g this_flat = np.clip(this_flat, 0.0, 2 - 2**-21) this_dark = np.clip( this_dark, -0.1 * this_flat, None ) # prevent a spurious negative dark from giving an illegal negative total count rate # now run with this version of the dark rate counts, simcatobj = rimage.simulate_counts( image_mod.meta, [], rng=rng, usecrds=False, darkrate=this_dark, flat=this_flat, psf_keywords=dict(), ) util.update_pointing_and_wcsinfo_metadata(image_mod.meta, counts.wcs) # convert from e/s --> e using the parameters file and read pattern t = parameters.read_time * (use_read_pattern[-1][-1] - use_read_pattern[0][0]) # the input simulations include the pixel area in their estimated e/s # but the flat field will ultimately include the pixel area as well! # therefore we need to re-scale the flat to the ideal pixel area (0.11 arcsec)^2 flat_witharea = this_flat / (pixelarea(self.galsimwcs, N=pars.nside_active) / pars.Omega_ideal) C = float(config["CNORM"]) if "CNORM" in config else 1.0 print(pixelarea(self.galsimwcs, N=pars.nside_active)[::1024, ::1024]) print(flat_witharea[::1024, ::1024]) sys.stdout.flush() counts.array[:, :] += rng.np.poisson( lam=np.clip(C * t * g / pars.g_ideal * self.image * flat_witharea, 0, None) ).astype(counts.array.dtype) # this is where the (simulated) L1 data is created if caldir is None: l1, l1dq = rstl1.make_l1( counts, read_pattern, read_noise=refdata["readnoise"], pedestal_extra_noise=parameters.pedestal_extra_noise, rng=rng, gain=refdata["gain"], crparam={}, inv_linearity=refdata["inverselinearity"], tstart=image_mod.meta.exposure.start_time, persistence=persistence, saturation=refdata["saturation"], ) else: # need to run the individual steps ourselves l1, l1dq = make_l1_fullcal( counts, read_pattern, caldir, rng=rng, tstart=image_mod.meta.exposure.start_time, persistence=persistence, ) # convert to asdf im, extras = rstl1.make_asdf(l1, dq=l1dq, metadata=image_mod.meta, persistence=persistence) # fill in the reference pixels and reference output if caldir is not None: # NO_AMP33 would allow us to bypass the reference output, if it isn't in the config file amp33struct = im["amp33"] if "NO_AMP33" in caldir: if caldir["NO_AMP33"]: amp33struct = None fill_in_refdata_and_1f( im["data"], caldir, rng, rstl1.read_pattern_to_tij(read_pattern), fill_in_banding=True, amp33=amp33struct, ) # Create metadata for simulation parameter romanisimdict = {"version": rstversion} # for storage reasons, I took out all the large metadata arrays in romanisimdict # Write file self.af = asdf.AsdfFile() self.af.tree = {"roman": im, "romanisim": romanisimdict} # Make idealized L2 data slopeinfo = rimage.make_l2( l1, read_pattern, read_noise=refdata["readnoise"], gain=refdata["gain"], flat=refdata["flat"], linearity=refdata["linearity"], darkrate=refdata["dark"], dq=l1dq, ) l2dq = np.bitwise_or.reduce(l1dq, axis=0) # package header so that there is a obj.header.header # this is a hack for compatibility with convert_wcs_to_gwcs class Blank: pass obj = Blank() obj.header = Blank() obj.header.header = self.header # im2, extras2 = rimage.make_asdf( *slopeinfo, metadata=image_mod.meta, persistence=persistence, dq=l2dq, imwcs=obj, gain=refdata["gain"], ) # functionality to pull over the WCS from L1 without dependence on wcs.convert_wcs_to_gwcs # im2['roman']['meta'].update(wcs=this_gwcs) # im2['roman']['meta']['wcsinfo']['s_region'] = wcs.create_s_region(this_gwcs) # Create metadata for simulation parameter romanisimdict2 = {"version": rstversion} romanisimdict2.update(**extras2) # Write file self.af2 = asdf.AsdfFile() self.af2.tree = {"roman": im2, "romanisim": romanisimdict2} self.refdata = refdata
[docs] def L1_write_to(self, filename): """ Writes L1 data to a file if available. Parameters ---------- filename : str Where to write the file (should end with ``.asdf``). Returns ------- bool True if successful, False if not written. """ if hasattr(self, "af"): with open(filename, "wb") as f: self.af.write_to(f) else: return False
[docs] def L2_write_to(self, filename): """ Writes L2 data to a file if available. Parameters ---------- filename : str Where to write the file (should end with ``.asdf``). Returns ------- bool True if successful, False if not written. """ if hasattr(self, "af2"): with open(filename, "wb") as f: self.af2.write_to(f) else: return False
[docs] class Image2D_from_L1(Image2D): """Similar to Image2D, but constructed from L1 data file. with Image2D_from_L1(infile, refdata, thewcs) as L1: ... """ # Context manager functions
[docs] def __enter__(self): return self
[docs] def __exit__(self, exception_type, exception_value, exception_traceback): self.af.close()
# Constructor def __init__(self, infile, refdata, thewcs, verbose_err=True): """Constructor. The arguments are: infile : L1 data file (ASDF format) refdata : the calibration reference data thewcs WCS object (in some form -- currently FITS Header or GalSimWCS, though we've had issues with the latter) """
[docs] self.af = asdf.open(infile)
[docs] self.refdata = refdata
[docs] self.thewcs = thewcs
[docs] def psuedocalibrate(self): """Generates a simple calibrated (L2) image. This doesn't use romancal, but can be useful as a pass-through function. """ # collect information nborder = parameters.nborder # Make idealized L2 data refdata = self.refdata l1dq = np.zeros( np.shape(self.af["roman"]["data"][:, nborder:-nborder, nborder:-nborder]), dtype=np.uint32 ) slopeinfo = rimage.make_l2( self.af["roman"]["data"][:, nborder:-nborder, nborder:-nborder] * u.DN, self.af["roman"]["meta"]["exposure"]["read_pattern"], read_noise=refdata["readnoise"], gain=refdata["gain"], flat=refdata["flat"], linearity=refdata["linearity"], darkrate=refdata["dark"], dq=l1dq, ) l2dq = np.bitwise_or.reduce(l1dq, axis=0) # make WCS --- a few ways of doing this while True: wcsobj = None class Blank: pass # first try a FITS header if isinstance(self.thewcs, fits.Header): wcsobj = Blank() wcsobj.header = Blank() wcsobj.header.header = self.thewcs break # should work if this is a GalSim WCS try: header = fits.Header() self.thewcs.writeToFitsHeader( header, galsim.BoundsI(0, pars.nside_active, 0, pars.nside_active) ) # offset to FITS convention -- this is undone later header["CRPIX1"] += 1 header["CRPIX2"] += 1 wcsobj = Blank() wcsobj.header = Blank() wcsobj.header.header = header warnings.warn("Use of GalSim WCS in calibrate is not fully working yet!") break except Exception: wcsobj = None raise Exception("Unrecognized WCS") persistence = rip.Persistence() im2, extras2 = rimage.make_asdf( *slopeinfo, metadata=self.af["roman"]["meta"], persistence=persistence, dq=l2dq, imwcs=wcsobj, gain=refdata["gain"], ) # Create metadata for simulation parameter romanisimdict2 = {"version": rstversion} romanisimdict2.update(**extras2) # Write file self.af2 = asdf.AsdfFile() self.af2.tree = {"roman": im2, "romanisim": romanisimdict2}
[docs] def simpletest(): """ This is a simple script to convert Roman to L1/L2. For internal testing only, not production. """ use_read_pattern = [ [0], [1], [2, 3], [4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15], [16, 17, 18, 19, 20, 21, 22, 23], [24, 25, 26, 27, 28, 29, 30, 31, 32, 33], [34], ] x = Image2D( "anlsim", fname="/fs/scratch/PCON0003/cond0007/anl-run-in-prod/truth/Roman_WAS_truth_F184_14747_10.fits", ) print(x.galsimwcs) print(x.date, x.idsca) print(">>", x.image) x.simulate(use_read_pattern) x.L1_write_to("sim1.asdf") x.L2_write_to("sim2-direct.asdf") f = asdf.open("sim1.asdf") print(f.info()) print("corners:") print(f["romanisim"]["wcs"]) print(f["romanisim"]["wcs"]((0, 0, 4087, 4087), (0, 4087, 0, 4087))) print(f["roman"]["meta"]) fits.PrimaryHDU(f["roman"]["data"]).writeto("L1.fits", overwrite=True) with Image2D_from_L1("sim1.asdf", x.refdata, x.header) as ff: ff.pseudocalibrate() ff.L2_write_to("sim2.asdf") f = asdf.open("sim2.asdf") print(f.info()) print("corners:") print(f["roman"]["meta"]["wcs"]((0, 0, 4087, 4087), (0, 4087, 0, 4087))) fits.PrimaryHDU(f["roman"]["data"]).writeto("L2.fits", overwrite=True)
[docs] def run_config(config): """ This allows the L1 image construction to be called as a Python function instead of a stand-alone code. Parameters ---------- config : dict Configuration file (usually a dictionary unpacked from YAML). Returns ------- None """ # calibration files caldir = config.get("CALDIR", None) print("Reading from <--", config["IN"]) print("Writing to -->", config["OUT"]) print("Using calibration data:") print(caldir) use_read_pattern = [] ng = len(config["READS"]) // 2 for j in range(ng): use_read_pattern.append(list(range(int(config["READS"][2 * j]), int(config["READS"][2 * j + 1])))) print("Read pattern:", use_read_pattern) # Optional inputs seed = 43 if "SEED" in config: seed = int(config["SEED"]) x = Image2D("anlsim", fname=config["IN"]) x.simulate(use_read_pattern, caldir=caldir, config=config, seed=seed) x.L1_write_to(config["OUT"]) # header information for the WCS x.header["COMMENT"] = "truth wcs from sim_to_isim" x.header.tofile(config["OUT"][:-5] + "_asdf_wcshead.txt", overwrite=True) # also write the FITS file for viewing if "FITSOUT" in config: if config["FITSOUT"]: image_out = np.zeros((ng, pars.nside, pars.nside_augmented), dtype=np.uint16) with asdf.open(config["OUT"]) as f: image_out[:, :, : pars.nside] = f["roman"]["data"] image_out[:, :, pars.nside :] = f["roman"]["amp33"] fits.PrimaryHDU(image_out).writeto(config["OUT"][:-5] + "_asdf_to.fits", overwrite=True)
# simpletest() if __name__ == "__main__": """Stand-alone function to convert from OpenUniverse to L1. Call it with: python sim_to_isim <config file> The config file is in YAML format and has the fields: Required: 'IN': input file name (FITS) 'OUT': output file name (must end in '.asdf') 'READS': a list of length 2*Ngrp: 0th group is [READS[0]:READS[1]], then [READS[2]:READS[3]], etc. Optional: 'CALDIR': Python dictionary with the calibration reference files. This can contain: LINEARITY 'FITSOUT': also write a FITS output (default: False; mostly useful for visualization in ds9) 'SEED': RNG seed """ # read settings with open(sys.argv[1]) as f:
[docs] config = yaml.safe_load(f)
run_config(config)