"""Functions for IPC correction."""
import logging
from collections import namedtuple
import numpy as np
from jwst.ipc import x_irs2
from jwst.lib import pipe_utils
log = logging.getLogger(__name__)
NumRefPixels = namedtuple(
"NumRefPixels", ["bottom_rows", "top_rows", "left_columns", "right_columns"]
)
__all__ = ["do_correction", "ipc_correction", "get_num_ref_pixels", "get_ipc_slice", "ipc_convolve"]
[docs]
def do_correction(input_model, ipc_model):
"""
Execute all tasks for IPC correction.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data to be corrected.
ipc_model : `~stdatamodels.jwst.datamodels.IPCModel`
Deconvolution kernel, either a 2-D or 4-D image in the first
extension.
Returns
-------
output_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
IPC-corrected science data.
"""
# Save some data params for easy use later
sci_nints = input_model.data.shape[0]
sci_ngroups = input_model.data.shape[1]
sci_nframes = input_model.meta.exposure.nframes
sci_groupgap = input_model.meta.exposure.groupgap
log.debug(
f"IPC corr using nints={sci_nints}, ngroups={sci_ngroups}, "
f"nframes={sci_nframes}, groupgap={sci_groupgap}"
)
# Apply the correction.
output_model = ipc_correction(input_model, ipc_model)
return output_model
[docs]
def ipc_correction(output, ipc_model):
"""
Apply the IPC correction to the science arrays.
Parameters
----------
output : `~stdatamodels.jwst.datamodels.JwstDataModel`
The input science data.
ipc_model : `~stdatamodels.jwst.datamodels.IPCModel`
The IPC kernel. The input is corrected for IPC by convolving
with this 2-D or 4-D array.
Returns
-------
output : `~stdatamodels.jwst.datamodels.JwstDataModel`
IPC-corrected science data.
"""
log.debug(
f"ipc_correction: nints={output.meta.exposure.nints}, "
f"ngroups={output.meta.exposure.ngroups}, "
f"size={output.data.shape[-1]},{output.data.shape[-2]}"
)
# Was IRS2 readout used?
is_irs2_format = pipe_utils.is_irs2(output)
if is_irs2_format:
irs2_mask = x_irs2.make_mask(output)
detector = output.meta.instrument.detector
# The number of reference pixels along the bottom edge, top edge,
# left edge, and right edge.
nref = get_num_ref_pixels(output)
# Get the data for the IPC kernel. This can be a slice, if input_model
# is a subarray.
kernel = get_ipc_slice(output, ipc_model)
log.debug(
f"substrt1 = {output.meta.subarray.xstart}, subsize1 = {output.meta.subarray.xsize}, "
f"substrt2 = {output.meta.subarray.ystart}, subsize2 = {output.meta.subarray.ysize}"
)
log.debug(
"Number of reference pixels: bottom, top, left, right = "
f"{nref.bottom_rows}, {nref.top_rows}, {nref.left_columns}, {nref.right_columns}"
)
log.debug(f"Shape of ipc image = {repr(ipc_model.data.shape)}")
# Loop over all integrations and groups in input science data.
for i in range(output.data.shape[0]): # integrations
for j in range(output.data.shape[1]): # groups
# Convolve the current group in-place with the IPC kernel.
if is_irs2_format:
# Extract normal data from input IRS2-format data.
temp = x_irs2.from_irs2(output.data[i, j, :, :], irs2_mask, detector)
ipc_convolve(temp, kernel, nref)
# Insert normal data back into original, IRS2-format data.
x_irs2.to_irs2(output.data[i, j, :, :], temp, irs2_mask, detector)
else:
ipc_convolve(output.data[i, j], kernel, nref)
return output
[docs]
def get_num_ref_pixels(input_model):
"""
Get the number of reference pixel rows and columns.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
The input science data.
Returns
-------
nref : namedtuple
Tuple containing the number of reference pixels at each edge:
bottom_rows : int
The number of reference rows at the bottom of the image.
top_rows : int
The number of reference rows at the top of the image.
left_columns : int
The number of reference columns at the left edge.
right_columns : int
The number of reference columns at the right edge.
"""
xstart = input_model.meta.subarray.xstart - 1 # zero indexed
xsize = input_model.meta.subarray.xsize
if input_model.meta.instrument.name == "MIRI":
nref = NumRefPixels(
bottom_rows=0,
top_rows=0,
left_columns=max(0, 4 - xstart),
right_columns=max(0, xstart + xsize - 1028),
)
else:
ystart = input_model.meta.subarray.ystart - 1 # zero indexed
ysize = input_model.meta.subarray.ysize
nref = NumRefPixels(
bottom_rows=max(0, 4 - ystart),
top_rows=max(0, ystart + ysize - 2044),
left_columns=max(0, 4 - xstart),
right_columns=max(0, xstart + xsize - 2044),
)
return nref
[docs]
def get_ipc_slice(input_model, ipc_model):
"""
Extract a slice from IPC kernel corresponding to science data.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
The input science data.
ipc_model : `~stdatamodels.jwst.datamodels.IPCModel`
The IPC kernel model.
Returns
-------
kernel : ndarray, either 2-D or 4-D
The data array for the IPC kernel. If the IPC kernel is 4-D and
the science data array is a subarray, ``kernel`` will be a slice of
the reference image; otherwise, this will be the full image.
"""
if len(ipc_model.data.shape) == 2:
return ipc_model.data
# Convert xstart and ystart from one indexing to zero indexing.
xstart = input_model.meta.subarray.xstart - 1
ystart = input_model.meta.subarray.ystart - 1
xsize = input_model.meta.subarray.xsize
ysize = input_model.meta.subarray.ysize
if input_model.meta.instrument.name == "MIRI":
is_subarray = xsize < 1032 or ysize < 1024
else:
is_subarray = xsize < 2048 or ysize < 2048
if is_subarray:
return ipc_model.data[:, :, ystart : ystart + ysize, xstart : xstart + xsize]
else:
return ipc_model.data
[docs]
def ipc_convolve(output_data, kernel, nref):
"""
Convolve the science data with the IPC kernel.
Parameters
----------
output_data : ndarray
A copy of the 2-D input science data for one group; this will be
modified in-place.
kernel : ndarray
The IPC kernel (2-D or 4-D); the input is corrected for IPC by convolving with
this array. If it is 4-D, the last two dimensions will be a slice
that matches the last two dimensions of ``output_data``.
nref : namedtuple
Tuple containing number of reference pixels at each edge:
bottom_rows : int
The number of reference rows at the bottom of the image.
top_rows : int
The number of reference rows at the top of the image.
left_columns : int
The number of reference columns at the left edge.
right_columns : int
The number of reference columns at the right edge.
"""
bottom_rows = nref.bottom_rows
top_rows = nref.top_rows
left_columns = nref.left_columns
right_columns = nref.right_columns
kshape = kernel.shape
# This is the shape of the entire image, which may include reference
# pixels.
shape = output_data.shape
# These axis lengths exclude reference pixels, if there are any.
ny = shape[0] - (bottom_rows + top_rows)
nx = shape[1] - (left_columns + right_columns)
# The temporary array temp is larger than the science part of
# output_data by a border (set to zero) that's about half of the
# kernel size, so the convolution can be done without checking for
# out of bounds.
# b_b, t_b, l_b, and r_b are the widths of the borders on the
# bottom, top, left, and right, respectively.
b_b = kshape[0] // 2
t_b = kshape[0] - b_b - 1
l_b = kshape[1] // 2
r_b = kshape[1] - l_b - 1
tny = ny + b_b + t_b
yoff = bottom_rows # offset in output_data
tnx = nx + l_b + r_b
xoff = left_columns # offset in output_data
# Note that when we accumulate sums to output_data below, we will
# always use the same slice: output_data[yoff:yoff+ny, xoff:xoff+nx].
# Copy the science portion (not the reference pixels) of output_data
# to this temporary array, then make subsequent changes in-place to
# output_data.
temp = np.zeros((tny, tnx), dtype=output_data.dtype)
temp[b_b : b_b + ny, l_b : l_b + nx] = output_data[yoff : yoff + ny, xoff : xoff + nx].copy()
# After setting this slice to zero, we'll incrementally add to it.
output_data[yoff : yoff + ny, xoff : xoff + nx] = 0.0
if len(kshape) == 2:
# 2-D IPC kernel. Loop over pixels of the deconvolution kernel.
# In this section, `part` has the same shape as `temp`.
middle_j = kshape[0] // 2
middle_i = kshape[1] // 2
for j in range(kshape[0]):
jstart = kshape[0] - j - 1
for i in range(kshape[1]):
if i == middle_i and j == middle_j:
continue # the middle pixel is done last
part = kernel[j, i] * temp
istart = kshape[1] - i - 1
part_section = part[jstart : jstart + ny, istart : istart + nx]
output_data[yoff : yoff + ny, xoff : xoff + nx] += part_section
# The middle pixel of the IPC kernel is expected to be the largest,
# so add that last.
part = kernel[middle_j, middle_i] * temp
part_section = part[middle_j : middle_j + ny, middle_i : middle_i + nx]
output_data[yoff : yoff + ny, xoff : xoff + nx] += part_section
else:
# 4-D IPC kernel. Extract a subset of the kernel: all of the
# first two axes, but only the portion of the last two axes
# corresponding to the science data (i.e. possibly a subarray,
# and certainly excluding reference pixels).
k_temp = np.zeros((kshape[0], kshape[1], tny, tnx), dtype=kernel.dtype)
kernel_section = kernel[:, :, yoff : yoff + ny, xoff : xoff + nx]
k_temp[:, :, b_b : b_b + ny, l_b : l_b + nx] = kernel_section
# In this section, `part` has shape (ny, nx), which is smaller
# than `temp`.
middle_j = kshape[0] // 2
middle_i = kshape[1] // 2
for j in range(kshape[0]):
jstart = kshape[0] - j - 1
for i in range(kshape[1]):
if i == middle_i and j == middle_j:
continue # the middle pixel is done last
istart = kshape[1] - i - 1
# The slice of k_temp includes different pixels for the
# first or second axes within each loop, but the same slice
# for the last two axes.
# The slice of temp (a copy of the science data) includes
# a different offset for each loop.
part = (
k_temp[j, i, b_b : b_b + ny, l_b : l_b + nx]
* temp[jstart : jstart + ny, istart : istart + nx]
)
output_data[yoff : yoff + ny, xoff : xoff + nx] += part
# Add the product for the middle pixel last.
part = (
k_temp[middle_j, middle_i, b_b : b_b + ny, l_b : l_b + nx]
* temp[middle_j : middle_j + ny, middle_i : middle_i + nx]
)
output_data[yoff : yoff + ny, xoff : xoff + nx] += part