Source code for mocca.dad_data.process_funcs

import numpy as np
from scipy.signal import find_peaks, peak_widths

from mocca.peak.models import BasePeak, PickedPeak
from mocca.chromatogram.model import Chromatogram
from mocca.chromatogram.utils import check_overlap

from mocca.dad_data.utils import sum_absorbance_by_time


[docs]def get_peak_locs(summed_data): """ Finds all peaks of data. Parameters ---------- summed_data : numpy.ndarray A 1D array representing the absorbances over time. Best used on data that already had data below threshold zeroed (see function filter_absorbance_by_threshold). Returns ------- peaks : list List of all peaks, as a list of BasePeak classes """ peak_maxima, _ = find_peaks(summed_data) peak_borders = peak_widths(summed_data, peak_maxima, rel_height=1) # use tolist to avoid problems during JSON export peaks = list(zip(peak_maxima.tolist(), np.floor(peak_borders[2]).astype(int).tolist(), np.ceil(peak_borders[3]).astype(int).tolist())) # peaks is now a list of tuples (maximum, left border, right border) return [BasePeak(maximum=maximum, left=left, right=right, offset=0) for maximum, left, right in peaks]
[docs]def merge_peaks(summed_data, peaks): """ Merges overlapping peaks in the data. Parameters ---------- summed_data : numpy.ndarray A 1D array representing the absorbances over time. Best used on data that already had data below threshold zeroed (see function filter_absorbance_by_threshold). peaks : list List of all peaks as BasePeak objects Returns ------- new_peaks : list List of all peaks in dictionary format with keys maximum, left, and right. Peaks that overlap are merged together into one BasePeak. """ new_peaks = set() for cur_peak in peaks: overlapping_peaks = [] for other in peaks: if check_overlap(cur_peak, other): overlapping_peaks.append(other) merged_peak = [-1, np.inf, 0] for peak in overlapping_peaks: # otherwise, merge peaks merged_peak[0] = (peak.maximum if summed_data[peak.maximum] > summed_data[merged_peak[0]] or merged_peak[0] == -1 else merged_peak[0]) merged_peak[1] = min(merged_peak[1], peak.left) merged_peak[2] = max(merged_peak[2], peak.right) new_peaks.add(tuple(merged_peak)) new_peaks = sorted(new_peaks, key=lambda x: x[0]) peak_list = [BasePeak(maximum=maximum, left=left, right=right, offset=0) for maximum, left, right in new_peaks] # if merge does nothing, then return if peak_list == peaks: return peak_list # else, if the merging results in fewer peaks, recurse # recursion takes care of some special cases of overlapping peaks # and merges the previous list if necessary elif len(merge_peaks(summed_data, peak_list)) < len(peak_list): return merge_peaks(summed_data, peak_list) # otherwise, just return fully merged list return peak_list
[docs]def pick_peaks(compound_data, experiment, absorbance_threshold, peaks_high_pass, peaks_low_pass): """ Finds all peaks of data and returns them as a chromatogram Parameters ---------- data : numpy.ndarray Actual experimental data with shape [# of wavelengths] x [timepoints]. Generated from dataframe with absorbance_to_array function absorbance_threshold : float The threshold below which peaks will. In other words, at at least one (wavelength, timepoint) will have absorbance greater than absorbance_threshold in order to be counted as a peak. peaks_high_pass: float Time high pass filter only using peaks with a retention time greater than the here given value for data analysis peaks_low_pass : float Time low pass filter only using peaks with a retention time lower than the here given value for data analysis expand_peaks : boolean If True, then peaks will be expanded to their peak boundaries. If this is set to False, then only timepoints with cumulative absorbance greater than absorbance_threshold will be counted as part of the peak. Returns ------- peaks : list List of all peaks, as a list of tuples (maximum, left, right) """ summed_data = sum_absorbance_by_time(compound_data.data) new_data_thresh = summed_data.copy() new_data_thresh[new_data_thresh < absorbance_threshold] = 0 peak_locs = get_peak_locs(new_data_thresh) merged_peaks = merge_peaks(summed_data, peak_locs) if peaks_high_pass: merged_peaks = [peak for peak in merged_peaks if compound_data.time[peak.maximum] >= peaks_high_pass] if peaks_low_pass: merged_peaks = [peak for peak in merged_peaks if compound_data.time[peak.maximum] <= peaks_low_pass] merged_peaks = sorted(merged_peaks, key=lambda peak: peak.maximum) chromatogram = Chromatogram(experiment, compound_data) for idx, peak in enumerate(merged_peaks): chromatogram.insert_peak(PickedPeak(left=peak.left, right=peak.right, maximum=peak.maximum, offset=peak.offset, dataset=compound_data, idx=idx+1)) return chromatogram