Source code for mocca.dad_data.utils

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 10 13:31:37 2021

@author: haascp
"""

import numpy as np
import pandas as pd


[docs]def sum_absorbance_by_time(data): """ Sums the absorbances for each time point over all wavelengths Parameters ---------- data : numpy.ndarray Actual experimental data with shape [# of wavelengths] x [timepoints]. Generated from dataframe with absorbance_to_array function Returns ------- numpy.ndarray A 1D array containing the sum of wavelengths at each time point """ return data.sum(axis=0)
[docs]def trim_data(data, time, length): """ Trims the 2D DADData in the time dimension to the length provided. """ if length < data.shape[1]: return data[:, :length], time[:length] else: return data, time
[docs]def absorbance_to_array(df): """ Generates a 2D absorbance array of the absorbance values. """ absorbance_array = df.absorbance.to_numpy().\ reshape(df.wavelength.nunique(), df.time.nunique()) return absorbance_array
[docs]def df_to_array(df): """ Takes a tidy dataframe of HPLC-DAD data and returns a numpy array of " absorbance values as well as a vector for the time domain and a vector for " the wavelength domain. """ data = absorbance_to_array(df) time = df.time.unique() wavelength = df.wavelength.unique() return data, time, wavelength
[docs]def get_reference_signal(dataframe, bandwidth=5): """ Returns the averaged signal over the last number of wavelengths as given by the bandwidth. """ df = dataframe.copy() wls = df.wavelength.unique()[-bandwidth:] signals = [] for wl in wls: signal = list(df[df['wavelength'] == wl].absorbance) signals.append(signal) mean_signal = list(map(lambda x: sum(x)/len(x), zip(*signals))) return pd.DataFrame({'absorbance': mean_signal})
[docs]def apply_filter(dataframe, wl_high_pass, wl_low_pass, bandwidth=2, reference_wl=True): """ Filters absorbance data of tidy 3D DAD dataframes to remove noise and background systematic error. """ df = dataframe.copy() df['absorbance'] = df.groupby('time')['absorbance'].\ rolling(window=bandwidth + 1, center=True).\ mean().reset_index(0, drop=True) df = df.dropna().reset_index(0, drop=True) if reference_wl: n_times = len(df.time.unique()) wls = df.wavelength.unique() reference_df = get_reference_signal(df) reference_series = reference_df.absorbance.\ iloc[np.tile(np.arange(n_times), len(wls))].reset_index(0, drop=True) df['absorbance'] = df.absorbance - reference_series if wl_high_pass: df = df[df.wavelength >= wl_high_pass] if wl_low_pass: df = df[df.wavelength <= wl_low_pass] return df