Source code for stginga.utils

"""Utility functions for ``stginga``."""
# STDLIB
import os
import warnings

# THIRD-PARTY
import numpy as np
from astropy import wcs
from astropy.convolution import convolve_fft, Box2DKernel
from astropy.io import ascii, fits
from astropy.stats import biweight_location
from astropy.stats import sigma_clip
from astropy.utils import minversion
from astropy.utils.exceptions import AstropyUserWarning
from scipy.interpolate import griddata
from scipy.ndimage import zoom

ASTROPY_LT_3_1 = not minversion('astropy', '3.1')
GINGA_LT_3 = not minversion('ginga', '3.0')

__all__ = ['calc_stat', 'interpolate_badpix', 'find_ext', 'DQParser',
           'scale_wcs', 'scale_image', 'scale_image_with_dq']


[docs] def calc_stat(data, sigma=1.8, niter=10, algorithm='median'): """Calculate statistics for given data. Parameters ---------- data : ndarray Data to be calculated from. sigma : float Sigma for sigma clipping. niter : int Number of iterations for sigma clipping. algorithm : {'mean', 'median', 'mode', 'stddev'} Algorithm for statistics calculation. Returns ------- val : float Statistics value. Raises ------ ValueError Invalid algorithm. """ arr = np.ravel(data) if len(arr) < 1: return 0.0 kwargs = {'sigma': sigma} if ASTROPY_LT_3_1: kwargs['iters'] = niter else: kwargs['maxiters'] = niter arr_masked = sigma_clip(arr, **kwargs) arr = arr_masked.data[~arr_masked.mask] if len(arr) < 1: return 0.0 algorithm = algorithm.lower() if algorithm == 'mean': val = arr.mean() elif algorithm == 'median': val = np.median(arr) elif algorithm == 'mode': val = biweight_location(arr) elif algorithm == 'stddev': val = arr.std() else: raise ValueError(f'{algorithm} is not a valid algorithm for sky ' 'background calculations') return val
[docs] def interpolate_badpix(image, badpix_mask, basis_mask, method='linear', rescale=False): """Use spline interpolation to fix bad pixel(s). Parameters ---------- image : ndarray Image to be fixed in-place. badpix_mask, basis_mask : ndarray Boolean masks of bad pixel(s) and the region used as basis for interpolation. method : {'nearest', 'linear', 'cubic'} See :func:`~scipy.interpolate.griddata`. rescale : bool See :func:`~scipy.interpolate.griddata`. """ y, x = np.where(basis_mask) z = image[basis_mask] ynew, xnew = np.where(badpix_mask) image[badpix_mask] = griddata((x, y), z, (xnew, ynew), method=method, rescale=rescale)
[docs] def find_ext(imfile, ext): """Determine whether given FITS file has the requested extension. Parameters ---------- imfile : str Filename. ext : tuple Desired ``(EXTNAME, EXTVER)``. Returns ------- has_ext : bool `True` if the extension exists. Examples -------- >>> find_ext('myimage.fits', ('DQ', 1)) """ if imfile is None: # This is needed to handle Ginga mosaic return False with fits.open(imfile) as pf: has_ext = ext in pf return has_ext
# STScI reftools.interpretdq.DQParser class modified for Ginga plugin.
[docs] class DQParser(object): """Class to handle parsing of DQ flags. **Definition Table** A "definition table" is an ASCII table that defines each DQ flag and its short and long descriptions. It can have optional comment line(s) for metadata, e.g.:: # TELESCOPE = ANY # INSTRUMENT = ANY It must have three columns: 1. ``DQFLAG`` contains the flag value (``uint16``). 2. ``SHORT_DESCRIPTION`` (string). 3. ``LONG_DESCRIPTION`` (string). Example file contents:: # INSTRUMENT = HSTGENERIC DQFLAG SHORT_DESCRIPTION LONG_DESCRIPTION 0 "OK" "Good pixel" 1 "LOST" "Lost during compression" ... ... ... The table format must be readable by ``astropy.io.ascii``. Parameters ---------- definition_file : str ASCII table that defines the DQ flags (see above). Attributes ---------- tab : ``astropy.table.Table`` Table object from given definition file. metadata : ``astropy.table.Table`` Table object from file metadata. """ def __init__(self, definition_file): self._dqcol = 'DQFLAG' self._sdcol = 'short' # SHORT_DESCRIPTION self._ldcol = 'long' # LONG_DESCRIPTION # Need to replace ~ with $HOME self.tab = ascii.read( os.path.expanduser(definition_file), names=(self._dqcol, self._sdcol, self._ldcol), converters={self._dqcol: [ascii.convert_numpy(np.uint)], self._sdcol: [ascii.convert_numpy(str)], self._ldcol: [ascii.convert_numpy(str)]}) # Another table to store metadata self.metadata = ascii.read(self.tab.meta['comments'], delimiter='=', format='no_header', names=['key', 'val']) # Ensure table has OK flag to detect good pixel self._okflag = 0 if self._okflag not in self.tab[self._dqcol]: self.tab.add_row([self._okflag, 'OK', 'Good pixel']) # Sort table in ascending order self.tab.sort(self._dqcol) # Compile a list of flags self._valid_flags = self.tab[self._dqcol]
[docs] def interpret_array(self, data): """Interpret DQ values for an array. .. warning:: If the array is large and has a lot of flagged elements, this can be resource intensive. Parameters ---------- data : ndarray DQ values. Returns ------- dqs_by_flag : dict Dictionary mapping each interpreted DQ value to indices of affected array elements. """ data = np.asarray(data, dtype=np.uint) # Ensure uint array dqs_by_flag = {} def _one_flag(vf): dqs_by_flag[vf] = np.where((data & vf) != 0) # Skip good flag list(map(_one_flag, self._valid_flags[1:])) return dqs_by_flag
[docs] def interpret_dqval(self, dqval): """Interpret DQ values for a single pixel. Parameters ---------- dqval : int DQ value. Returns ------- dqs : ``astropy.table.Table`` Table object containing a list of interpreted DQ values and their meanings. """ dqval = int(dqval) # Good pixel, nothing to do if dqval == self._okflag: idx = np.where(self.tab[self._dqcol] == self._okflag) # Find all the possible DQ flags else: idx = (dqval & self._valid_flags) != 0 return self.tab[idx]
[docs] def scale_wcs(input_hdr, zoom_factor, debug=False): """Rescale FITS WCS by the given zoom factor. This is used in :func:`scale_image` and :func:`scale_image_with_dq`. Both PC and CD matrices are supported. Distortion is not taken into account; therefore, this does not work on an image with ``CTYPE`` that ends in ``-SIP``. .. note:: WCS transformation provided by Mihai Cara. Some warnings are suppressed. Parameters ---------- input_hdr : `astropy.io.fits.Header` FITS header containing the WCS. zoom_factor : float See :func:`scipy.ndimage.zoom`. debug : bool If `True`, print extra information to screen. Returns ------- hdr : `astropy.io.fits.Header` Simple FITS header containing the rescaled WCS. Raises ------ ValueError Invalid WCS. """ slice_factor = int(1 / zoom_factor) old_wcs = wcs.WCS(input_hdr) # To account for distortion, add "pf" as 2nd arg # noqa # Supress RuntimeWarning about ignoring cdelt because cd is present. with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) new_wcs = old_wcs.slice( (np.s_[::slice_factor], np.s_[::slice_factor])) if old_wcs.wcs.has_pc(): # PC matrix wshdr = new_wcs.to_header() elif old_wcs.wcs.has_cd(): # CD matrix # Supress RuntimeWarning about ignoring cdelt because cd is present with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) new_wcs.wcs.cd *= new_wcs.wcs.cdelt new_wcs.wcs.set() wshdr = new_wcs.to_header() for i in range(1, 3): for j in range(1, 3): key = f'PC{i}_{j}' if key in wshdr: newkey = f'CD{i}_{j}' wshdr.rename_keyword(key, newkey) if debug: print(f'{key} -> {newkey}') else: raise ValueError('Missing CD or PC matrix for WCS') hdr = input_hdr.copy() if 'XTENSION' in hdr: del hdr['XTENSION'] if 'SIMPLE' in hdr: # pragma: no cover hdr['SIMPLE'] = True else: hdr.insert(0, ('SIMPLE', True)) hdr.extend( [c if c[0] not in hdr else c[0:] for c in wshdr.cards], update=True) if debug: old_wcs.printwcs() wcs.WCS(wshdr).printwcs() wcs.WCS(hdr).printwcs() return hdr
# This is needed by QUIP to pre-shrink input images for quick-look mosaic # in Ginga, but useful enough to put here for stginga's use if needed. # Warnings are suppressed because WEx that calls QUIP treats all screen outputs # as error messages.
[docs] def scale_image(infile, outfile, zoom_factor, ext=('SCI', 1), clobber=False, debug=False): """Rescale the image size in the given extension by the given zoom factor and adjust WCS accordingly. WCS adjustment is done using :func:`scale_wcs`. Output image is a single-extension FITS file with only the given extension header and data. Parameters ---------- infile, outfile : str Input and output filenames. zoom_factor : float See :func:`scipy.ndimage.zoom`. ext : int, str, or tuple Extension to extract. clobber : bool If `True`, overwrite existing output file. debug : bool If `True`, print extra information to screen. Raises ------ ValueError Invalid data. """ if not clobber and os.path.exists(outfile): # pragma: no cover if debug: warnings.warn(f'{outfile} already exists', AstropyUserWarning) return # Instead of raising error at the very end with fits.open(infile) as pf: prihdr = pf['PRIMARY'].header hdr = pf[ext].header data = pf[ext].data # Inherit some keywords from primary header for key in ('ROOTNAME', 'TARGNAME', 'INSTRUME', 'DETECTOR', 'FILTER', 'PUPIL', 'DATE-OBS', 'TIME-OBS'): if (key in hdr) or (key not in prihdr): continue hdr[key] = prihdr[key] if data.ndim != 2: # pragma: no cover raise ValueError(f'Unsupported ndim={data.ndim}') # Scale the data. data = zoom(data, zoom_factor) # Adjust WCS outhdr = scale_wcs(hdr, zoom_factor, debug=debug) # Write to output file hdu = fits.PrimaryHDU(data) hdu.header = outhdr hdu.writeto(outfile, overwrite=clobber)
[docs] def scale_image_with_dq(infile, outfile, zoom_factor, dq_parser, kernel_width=99, sci_ext=('SCI', 1), dq_ext=('DQ', 1), bad_flag=1, ignore_edge_pixels=4, overwrite=False, debug=False): """Rescale the image size in the given extension by the given block size, taking data quality (DQ) flags into account, and adjust WCS accordingly. WCS adjustment is done using :func:`scale_wcs`. Output image is a single-extension FITS file with only the given extension header and data. Parameters ---------- infile, outfile : str Input and output filenames. zoom_factor : float See :func:`scipy.ndimage.zoom`. dq_parser : `DQParser` DQ parser for interpreting DQ flag. kernel_width : int See :class:`astropy.convolution.Box2DKernel`. sci_ext, dq_ext : int, str, or tuple Science and DQ extensions to extract, respectively. bad_flag : int DQ flag value to indicate bad pixels to exclude from calculations. Compound flag is currently not supported. ignore_edge_pixels : int Ignore these number of pixels along the edges. The default value of 4 is for the reference pixels on JWST NIRCam detectors. overwrite : bool If `True`, overwrite existing output file. debug : bool If `True`, print extra information to screen. Raises ------ ValueError Invalid data. """ if not overwrite and os.path.exists(outfile): # pragma: no cover if debug: warnings.warn(f'{outfile} already exists', AstropyUserWarning) return # Instead of raising error at the very end with fits.open(infile) as pf: prihdr = pf['PRIMARY'].header hdr = pf[sci_ext].header data = pf[sci_ext].data dq = pf[dq_ext].data if data.ndim != 2: # pragma: no cover raise ValueError(f'Unsupported ndim={data.ndim}') # Inherit some keywords from primary header for key in ('ROOTNAME', 'TARGNAME', 'INSTRUME', 'DETECTOR', 'FILTER', 'PUPIL', 'DATE-OBS', 'TIME-OBS'): if (key in hdr) or (key not in prihdr): continue hdr[key] = prihdr[key] dqs_by_flags = dq_parser.interpret_array(dq) # Edge pixels iy_max = data.shape[0] - ignore_edge_pixels ix_max = data.shape[1] - ignore_edge_pixels edge_mask = np.ones_like(dq, dtype=bool) edge_mask[ignore_edge_pixels:iy_max, ignore_edge_pixels:ix_max] = False badpix_mask = np.zeros_like(dq, dtype=bool) badpix_mask[dqs_by_flags[bad_flag]] = True badpix_mask[edge_mask] = False # Ignore edge # Fix bad pixels with convolution box_kernel = Box2DKernel(kernel_width) smoothed_data = convolve_fft(data, box_kernel, mask=badpix_mask) # 5 secs data[badpix_mask] = smoothed_data[badpix_mask] # Should not have NaN in fixed image? if not np.all(np.isfinite(data)): raise ValueError('Fixed image has NaN(s)') # Scale the data. data = zoom(data, zoom_factor) # Adjust WCS outhdr = scale_wcs(hdr, zoom_factor, debug=debug) # Write to output file hdu = fits.PrimaryHDU(data) hdu.header = outhdr hdu.writeto(outfile, overwrite=overwrite)