Source code for pyampp.sfq.clean

from __future__ import annotations

import time
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view


def _conv1d_edge(arr: np.ndarray, kernel: np.ndarray, axis: int) -> np.ndarray:
    if arr.ndim != 2:
        raise ValueError("_conv1d_edge expects a 2D array.")
    k = np.asarray(kernel, dtype=float)
    ksize = int(k.size)
    pad_l = ksize // 2
    pad_r = ksize - 1 - pad_l
    if axis == 0:
        padded = np.pad(arr, ((pad_l, pad_r), (0, 0)), mode="edge")
        out = np.empty_like(arr, dtype=float)
        for j in range(arr.shape[1]):
            out[:, j] = np.convolve(padded[:, j], k, mode="valid")
        return out
    if axis == 1:
        padded = np.pad(arr, ((0, 0), (pad_l, pad_r)), mode="edge")
        out = np.empty_like(arr, dtype=float)
        for i in range(arr.shape[0]):
            out[i, :] = np.convolve(padded[i, :], k, mode="valid")
        return out
    raise ValueError("axis must be 0 or 1.")


def _box_smooth_edge(arr: np.ndarray, size: int) -> np.ndarray:
    kernel = np.ones(int(size), dtype=float) / float(size)
    return _conv1d_edge(_conv1d_edge(arr, kernel, axis=0), kernel, axis=1)


def _gaussf(n: int, sigma: float) -> np.ndarray:
    x = np.arange(int(n), dtype=float) - (int(n) - 1) / 2.0
    sigma = float(sigma) if sigma > 0 else 1.0
    ker = np.exp(-0.5 * (x / sigma) ** 2)
    s = ker.sum()
    return ker / s if s > 0 else ker


def _pwf_smooth(arr: np.ndarray, sigma: float) -> np.ndarray:
    n = int(np.ceil(float(sigma) * 3.0) * 2 + 1)
    ker = _gaussf(n, sigma)
    return _conv1d_edge(_conv1d_edge(arr, ker, axis=0), ker, axis=1)


def _median_filter_edge(arr: np.ndarray, size: int) -> np.ndarray:
    size = int(size)
    pad = size // 2
    padded = np.pad(arr, ((pad, pad), (pad, pad)), mode="edge")
    win = sliding_window_view(padded, (size, size))
    return np.median(win, axis=(-2, -1))


[docs] def sfq_clean( bx: np.ndarray, by: np.ndarray, s: int | float | None = None, gauss: bool = False, median: bool = False, show: bool = False, mode: int = 0, silent: bool = False, ) -> tuple[np.ndarray, np.ndarray]: bxx = np.asarray(bx, dtype=float).copy() byy = np.asarray(by, dtype=float).copy() if bxx.shape != byy.shape: raise ValueError("bx and by must have the same shape.") if s is None: t0 = time.time() if not silent: print("Sarting SFQ cleaning") bxx, byy = sfq_clean(bxx, byy, 3, median=True, show=show, silent=True) mode_i = 1 if mode != 0 else 0 if mode_i == 0: if min(bxx.shape) > 150: bxx, byy = sfq_clean(bxx, byy, 19, gauss=gauss, median=median, show=show, silent=True) if min(bxx.shape) > 100: bxx, byy = sfq_clean(bxx, byy, 9, gauss=gauss, median=median, show=show, silent=True) bxx, byy = sfq_clean(bxx, byy, 5, gauss=gauss, median=median, show=show, silent=True) bxx, byy = sfq_clean(bxx, byy, 3, median=True, show=show, silent=True) if not silent: dt = int(round(time.time() - t0)) print(f"SFQ cleaning complete in {dt} seconds") return bxx, byy s_val = float(s) n = int(np.ceil(s_val * 3.0) * 2 + 1) ker = _gaussf(n, s_val) gaussk = float(ker[n // 2] ** 2) threshold = max(bxx.size * 0.0001, 5.0) for _ in range(301): if gauss: mbx = _pwf_smooth(bxx, s_val) - bxx * gaussk mby = _pwf_smooth(byy, s_val) - byy * gaussk else: if not median: sval_i = int(max(round(s_val), 1)) mbx = _box_smooth_edge(bxx, sval_i) - bxx / float(sval_i ** 2) mby = _box_smooth_edge(byy, sval_i) - byy / float(sval_i ** 2) else: sval_i = int(max(round(s_val), 1)) mbx = _median_filter_edge(bxx, sval_i) mby = _median_filter_edge(byy, sval_i) flip = (mbx * bxx + byy * mby) < 0 if int(np.count_nonzero(flip)) < threshold: break bxx[flip] = -bxx[flip] byy[flip] = -byy[flip] if show: pass return bxx, byy