Source code for heavyedge.api.preprocess

"""Profile preprocessing."""

import numpy as np

from heavyedge.profile import fill_after, preprocess

__all__ = [
    "prep",
    "fill",
]


[docs] def prep( raw_file, sigma, std_thres, fill_value=0.0, z_thres=None, batch_size=None, logger=lambda x: None, ): """Preprocess raw profiles in the given file. Parameters ---------- raw_file : heavyedge.RawProfileBase Opened raw profile file. sigma : scalar Standard deviation of Gaussian filter for smoothing. std_thres : scalar Standard deviation threshold to detect contact point. fill_value : scalar, default=0.0 Value to fill after the contact point. If None, does not fill the array. z_thres : scalar, optional Z-score threshold to detect outliers. If not passed, outlier detection is not performed. batch_size : int, optional Batch size to load data. If not passed, all data are loaded at once. logger : callable, optional Logger function which accepts a progress message string. Yields ------ Y_processed : (batch_size, M) array Preprocessed profiles. Ls : (batch_size,) array Lengths of the preprocessed profiles. names : (batch_size,) array Names of the preprocessed profiles. Examples -------- >>> from heavyedge import get_sample_path, RawProfileCsvs >>> from heavyedge.api import prep >>> raw = RawProfileCsvs(get_sample_path("Type3")) >>> Ys, Ls, _ = next(prep(raw, 32, 0.01, batch_size=3)) >>> import matplotlib.pyplot as plt # doctest: +SKIP ... for Y, L in zip(Ys, Ls): ... plt.plot(Y[:L]) """ if z_thres is not None: gen = _prep_outlier(raw_file, sigma, std_thres, z_thres) else: gen = _prep(raw_file, sigma, std_thres) N = len(raw_file) batch_count = 0 Ys, Ls, names = [], [], [] for i, (Y, L, name) in enumerate(gen): if fill_value is not None: Y[L:] = fill_value Ys.append(Y) Ls.append(L) names.append(name) batch_count += 1 if batch_count == batch_size: logger(f"{i}/{N}") yield Ys, Ls, names Ys, Ls, names = [], [], [] batch_count = 0 # yield remaining batch logger(f"{N}/{N}") yield Ys, Ls, names
def _prep_outlier(raw, sigma, std_thres, z_thres): idxs, Ls, sums = [], [], [] for i in range(len(raw)): Y, _ = raw[i] if _is_invalid(Y): continue (Y,), (L,) = preprocess(Y.reshape(1, -1), sigma, std_thres) idxs.append(i) Ls.append(L) sums.append(np.sum(Y[:L])) sums = np.array(sums) is_outlier = _outlier(sums, z_thres) idxs = np.array(idxs)[~is_outlier] Ls = np.array(Ls)[~is_outlier] # yield for i, L in zip(idxs, Ls): Y, name = raw[i] if Y[0] < Y[-1]: Y = np.flip(Y) Y = Y - Y[L - 1] yield Y, L, name def _outlier(values, thres=3.5): # Boris Iglewicz and David C Hoaglin, # Volume 16: how to detect and handle outliers. Quality Press, 1993. med = np.median(values) mad = np.median(np.abs(values - med)) mod_z = 0.6745 * (values - med) / mad return np.abs(mod_z) > thres def _prep(raw, sigma, std_thres): for i in range(len(raw)): Y, name = raw[i] if _is_invalid(Y): continue (Y,), (L,) = preprocess(Y.reshape(1, -1), sigma, std_thres) yield Y, L, name def _is_invalid(profile): return (len(profile) == 0) or np.any(np.isnan(profile)) or np.any(np.isinf(profile))
[docs] def fill(file, fill_value, batch_size=None, logger=lambda x: None): """Fill profiles after the contact point. Parameters ---------- file : heavyedge.ProfileData Open h5 file. fill_value : scalar Value to fill after the contact point. batch_size : int, optional Batch size to load data. If not passed, all data are loaded at once. logger : callable, optional Logger function which accepts a progress message string. Yields ------ Ys : (batch_size, M) array Filled profiles. Ls : (batch_size,) array Lengths of the filled profiles. names : (batch_size,) array Names of the filled profiles. Examples -------- >>> from heavyedge import get_sample_path, ProfileData >>> from heavyedge.api import fill >>> with ProfileData(get_sample_path("Prep-Type1.h5")) as file: ... Ys, _, _ = file[:] ... Ys_filled, _, _ = next(fill(file, float("nan"))) >>> import matplotlib.pyplot as plt # doctest: +SKIP ... plt.plot(Ys.T, color="gray") ... plt.plot(Ys_filled.T) """ N = len(file) if batch_size is None: Ys, Ls, names = file[:] fill_after(Ys, Ls, fill_value) logger(f"{N}/{N}") yield Ys, Ls, names else: for i in range(0, N, batch_size): Ys, Ls, names = file[i : i + batch_size] fill_after(Ys, Ls, fill_value) logger(f"{i}/{N}") yield Ys, Ls, names