Source code for pyobs.images.processors.offsets.projected

import logging
import re
from typing import Any, cast
import numpy as np
import numpy.typing as npt
from scipy.interpolate import UnivariateSpline
from scipy.optimize import fmin

from pyobs.images import Image
from pyobs.images.meta import PixelOffsets
from .offsets import Offsets

log = logging.getLogger(__name__)


class ProjectedOffsets(Offsets):
    """
    Compute pixel offsets for guiding by correlating 1D projections of the current image with a reference frame.

    This processor implements an auto-guiding method based on collapsing
    images along the x and y axes and cross-correlating these 1D profiles against a
    stored reference. On the first invocation, it initializes by storing the reference
    projections from the input image and returns. On subsequent calls, it computes the
    current projections, performs background (sky) subtraction, estimates sub-pixel
    shifts via cross-correlation and Gaussian peak fitting, and stores the resulting
    PixelOffsets(dx, dy) in the image metadata. Pixel data and FITS headers are not
    modified.

    :param kwargs: Additional keyword arguments forwarded to
                   :class:`pyobs.images.processors.offsets.Offsets`.

    Behavior
    --------
    - Reference initialization:

      - If no reference is set, processes the input image to obtain sky-subtracted
        1D projections along x and y, stores them as the reference, and returns.

    - Per-image guiding update:

      - Processes the current image to obtain sky-subtracted 1D projections.
      - Computes dx and dy by cross-correlating current vs. reference projections
        and fitting a Gaussian to the correlation peak within a small window to obtain
        sub-pixel offsets.
      - If either axis fails to produce a valid offset, logs a warning and returns
        the image unchanged.
      - Otherwise, attaches PixelOffsets(dx, dy) to image metadata.

    - Cropping via TRIMSEC:

      - If the FITS header contains TRIMSEC in the form "[x0:x1,y0:y1]" (1-based, inclusive),
        the image is cropped to that rectangle before projection.

    - Projection and sky subtraction:

      - Collapses the image by summing rows and columns with NaN-safe summation.
      - Subtracts a smooth sky continuum from each 1D projection:

        - Divides the projection into sbin=10 bins.
        - For each bin, estimates the continuum level as the median of the upper
          fraction (frac=0.15) of values.
        - Fits a spline (UnivariateSpline) through these bin medians and subtracts
          the fitted continuum from the projection.

    Input/Output
    ------------
    - Input: :class:`pyobs.images.Image` with 2D pixel data; optional TRIMSEC header
      for cropping prior to projection.
    - Output: :class:`pyobs.images.Image` with PixelOffsets(dx, dy) set in metadata
      after the reference is initialized; otherwise unchanged.

    Configuration (YAML)
    --------------------
    Initialize on first frame, then report offsets on subsequent frames:

    .. code-block:: yaml

       class: pyobs.images.processors.offsets.ProjectedOffsets

    Notes
    -----
    - Offset sign convention: positive dx indicates that the current image’s x-profile
      is shifted to larger pixel indices relative to the reference; similarly for dy.
      Ensure this matches the downstream module that applies the offsets.
    - The Gaussian fit is performed around the cross-correlation peak within a window
      of width fit_width=10 by default; failed fits or peaks outside the window result
      in no offset update.
    - Sky subtraction parameters (frac=0.15, sbin=10) are fixed in this implementation;
      adjust the code if your background structure requires different settings.
    - Call reset() to clear the stored reference projections and reinitialize on the
      next image.
    """

    __module__ = "pyobs.images.processors.offsets"

    def __init__(self, **kwargs: Any):
        """Initializes a new auto guiding system."""
        Offsets.__init__(self, **kwargs)

        # init
        self._ref_image: tuple[npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]] | None = None

[docs] async def reset(self) -> None: """Resets guiding.""" log.info("Reset auto-guiding.") self._ref_image = None
async def __call__(self, image: Image) -> Image: """Processes an image and sets x/y pixel offset to reference in offset attribute. Args: image: Image to process. Returns: Original image. Raises: ValueError: If offset could not be found. """ if not self._reference_initialized(): log.info("Initialising auto-guiding with new image...") self._ref_image = self._process(image) return image log.info("Perform auto-guiding on new image...") sum_x, sum_y = self._process(image) if self._ref_image is None: raise ValueError("No reference image.") dx = self._calc_1d_offset(sum_x, self._ref_image[0]) dy = self._calc_1d_offset(sum_y, self._ref_image[1]) if dx is None or dy is None: log.warning("Could not correlate peaks.") return image image.set_meta(PixelOffsets(dx, dy)) return image def _reference_initialized(self) -> bool: return self._ref_image is not None @staticmethod def _process( image: Image, ) -> tuple[npt.NDArray[np.floating[Any]], npt.NDArray[np.floating[Any]]]: """Project image along x and y axes and return results. Args: image: Image to process. Returns: Projected images. """ # get image data and header data, hdr = image.data, image.header # trimsec if "TRIMSEC" in hdr: m = re.match(r"\[([0-9]+):([0-9]+),([0-9]+):([0-9]+)\]", hdr["TRIMSEC"]) if m is None: raise ValueError("Invalid trimsec.") x0, x1, y0, y1 = [int(f) for f in m.groups()] data = data[y0 - 1 : y1, x0 - 1 : x1] # collapse sum_x = np.nansum(data, 0) sum_y = np.nansum(data, 1) # sky subtraction return ProjectedOffsets._subtract_sky(sum_x), ProjectedOffsets._subtract_sky(sum_y) @staticmethod def _subtract_sky( data: npt.NDArray[np.floating[Any]], frac: float = 0.15, sbin: int = 10 ) -> npt.NDArray[np.floating[Any]]: # find continuum for every of the sbin bins bins = np.zeros((sbin,)) binxs = np.zeros((sbin,)) x = list(range(len(data))) w1 = 0.0 w2 = float(len(x)) / sbin for i in range(sbin): # sort data in range bindata = list(reversed(sorted(data[int(w1) : int(w2)].tolist()))) # calculate median and set wavelength bins[i] = np.median(bindata[int(-frac * len(bindata)) : -1]) binxs[i] = np.mean(x[int(w1) : int(w2)]) # reset ranges w1 = w2 w2 += float(len(x)) / sbin # check for last bin if i == sbin - 1: w2 = len(x) # fit it w = np.where(~np.isnan(bins)) ip = UnivariateSpline(binxs[w], bins[w]) cont = ip(x) # return continuum return cast(npt.NDArray[np.floating[Any]], data - cont) @staticmethod def _calc_1d_offset( data1: npt.NDArray[np.floating[Any]], data2: npt.NDArray[np.floating[Any]], fit_width: int = 10 ) -> float | None: # do cross-correlation corr = np.correlate(data1, data2, "full") # find index of maximum i_max = np.argmax(corr) centre = i_max - data1.size + 1 # cut window x = np.linspace(centre - fit_width, centre + fit_width, 2 * fit_width + 1) y = corr[i_max - fit_width : i_max + fit_width + 1] # moment calculation for initial guesses total = float(y.sum()) mean = (x * y).sum() / total variance = (x * x * y).sum() / total - mean**2 # initial guess guesses = [np.max(y), mean, variance] # perform fit result = fmin(ProjectedOffsets._gaussian_fit, guesses, args=(y, x), disp=False) # sanity check and finish up shift = float(result[1]) if shift < centre - fit_width or shift > centre + fit_width: return None return shift @staticmethod def _gaussian_fit(pars: list[float], y: npt.NDArray[np.floating[Any]], x: npt.NDArray[np.floating[Any]]) -> float: err = y - ProjectedOffsets._gaussian(pars, x) return float((err * err).sum()) @staticmethod def _gaussian(pars: list[float], x: npt.NDArray[np.floating[Any]]) -> npt.NDArray[np.floating[Any]]: a = pars[0] x0 = pars[1] sigma = pars[2] return cast(npt.NDArray[np.floating[Any]], a * np.exp(-((x - x0) ** 2) / (2.0 * sigma**2))) __all__ = ["ProjectedOffsets"]