Source code for gaitutils.numutils

# -*- coding: utf-8 -*-
"""
Created on Fri Sep 23 11:17:10 2016

Misc numerical utils

@author: Jussi (jnu@iki.fi)
"""


import logging
import numpy as np
import hashlib
from scipy import signal
from scipy.signal import medfilt
from scipy.special import erfcinv

from .config import cfg


logger = logging.getLogger(__name__)


[docs]def kabsch_rotation(P, Q): """Calculate a rotation matrix P->Q using the Kabsch algorithm.""" H = np.dot(P.T, Q) U, _, V = np.linalg.svd(H) E = np.eye(3) E[2, 2] = np.sign(np.linalg.det(np.dot(V.T, U.T))) return np.dot(np.dot(V.T, E), U.T)
def _rotation_matrix(yaw, pitch, roll): """Rotation matrix from yaw, pitch, roll in degrees""" yaw, pitch, roll = np.array([yaw, pitch, roll]) / 180 * np.pi cos, sin = np.cos, np.sin r1 = [ cos(yaw) * cos(pitch), cos(yaw) * sin(pitch) * sin(roll) - sin(yaw) * cos(roll), cos(yaw) * sin(pitch) * cos(roll) + sin(yaw) * sin(roll), ] r2 = [ sin(yaw) * cos(pitch), sin(yaw) * sin(pitch) * sin(roll) + cos(yaw) * cos(roll), sin(yaw) * sin(pitch) * cos(roll) - cos(yaw) * sin(roll), ] r3 = [-sin(pitch), cos(pitch) * sin(roll), cos(pitch) * cos(roll)] return np.array([r1, r2, r3]) def _rigid_body_extrapolate_markers(P0, Pr): """Extrapolate some markers in a rigid set. P0 (N x 3), the marker positions in the static frame. Pr (M x 3), the marker positions in where extrapolation is needed. N-M markers will be extrapolated. Algorithm: -find R and t that take us from Pr0 to Pr, by Kabsch algorithm (Pr0 is the reference markers in P0) -apply R and t to P0 to get the extrapolated positions """ nref = Pr.shape[0] if P0.shape[0] <= nref: raise ValueError('1st dim of P0 needs to be larger than 1st dim of Pr') Pr0 = P0[:nref, :] # reference markers # find rotation and translation that take the static reference position to the # position where extrapolation is needed trans0 = Pr0.mean(axis=0) Pr0_ = Pr0 - trans0 P0_ = P0 - trans0 trans1 = Pr.mean(axis=0) Pr_ = Pr - trans1 R = kabsch_rotation(Pr0_, Pr_) # apply the transformation to the markers to be extrapolated return np.dot(R, P0_[nref:, :].T).T + trans1
[docs]def mad(data, axis=None, scale=1.4826, keepdims=False): """Median absolute deviation (MAD). Defined as the median absolute deviation from the median of the data. A robust alternative to stddev. Results should be identical to scipy.stats.median_absolute_deviation(), which does not take a keepdims argument. Parameters ---------- data : array_like The data. scale : float, optional Scaling of the result. By default, it is scaled to give a consistent estimate of the standard deviation of values from a normal distribution. axis : numpy axis spec, optional Axis or axes along which to compute MAD. keepdims : bool, optional If this is set to True, the axes which are reduced are left in the result as dimensions with size one. Returns ------- ndarray The MAD. """ # keep dims here so that broadcasting works med = np.median(data, axis=axis, keepdims=True) abs_devs = np.abs(data - med) return scale * np.median(abs_devs, axis=axis, keepdims=keepdims)
[docs]def modified_zscore(data, axis=None, single_mad=None): """Modified Z-score. Z-score analogue computed using robust (median-based) statistics. Parameters ---------- data : array_like The data axis : numpy axis spec, optional Axis or axes along which to compute the statistic. single_mad : bool, optional Use a single MAD estimate computed all over the data. If False, MAD will be computed along given axis (e.g. separately for each variable). Returns ------- ndarray The modified Z-score. """ med_ = np.median(data, axis=axis, keepdims=True) mad_ = mad(data, axis=axis, keepdims=True) if single_mad: mad_ = np.median(mad_) return (data - med_) / mad_
[docs]def outliers(x, axis=0, single_mad=None, p_threshold=1e-3): """Robustly detect outliers assuming a normal distribution. A modified Z-score is first computed based on the data. Then a threshold value Zlim is computed from p_threshold, and values that exceed Zlim are rejected. p_threshold is the probability of rejection assuming strictly normally distributed data, i.e. probability for "false outlier" Parameters ---------- data : array_like The data. axis : numpy axis spec, optional Axis or axes along which to compute the Z scores. E.g. axis=0 computes row-wise Z scores and rejects based on those. single_mad : bool Use a single MAD estimate computed all over the data. If False, the MAD will be computed along given axis (e.g. separately for each variable). p_threshold : float The P threshold for rejection. Returns ------- tuple Indexes of rejected values (np.where output) """ zs = modified_zscore(x, axis=axis, single_mad=single_mad) z_threshold = np.sqrt(2) * erfcinv(p_threshold) logger.debug(f'Z threshold: {z_threshold:.2f}') return np.where(abs(zs) > z_threshold)
def _files_digest(files): """Create overall md5 digest for a sequence of files (filenames)""" hashes = sorted(_file_digest(fn) for fn in files) # concat as unicode and encode to get a well defined byte representation in # both py2 and py3 hash_str = ''.join(hashes).encode('utf-8') return hashlib.md5(hash_str).hexdigest() def _file_digest(fn): """Return md5 digest for file""" with open(fn, 'rb') as f: data = f.read() return hashlib.md5(data).hexdigest()
[docs]def rising_zerocross(x): """Find indices of rising zero crossings in array. The indices are defined as n for which x[n] >= 0 and x[n-1] < 0. Parameters ---------- x : array_like The data. Returns ------- tuple The indices (np.where output) """ x = np.array(x) # we can also handle lists etc. return np.where(np.logical_and(x[1:] >= 0, x[:-1] < 0))[0] + 1
[docs]def falling_zerocross(x): """Find indices of falling zero crossings in array. The indices are defined as n for which x[n] <= 0 and x[n-1] > 0. Parameters ---------- x : array_like The data. Returns ------- tuple The indices (np.where output) """ return rising_zerocross(-x)
[docs]def digitize_array(v, b): """Replace all elements of v with their closest matches in b. Uses abs(x-y) as the distance metric. This function is similar to np.digitize(), but simpler in usage and implementation. Parameters ---------- v : array_like Array to make replacements in. b : array_like Array to pick replacements from. Returns ------- ndarray The result. """ v = np.array(v) b = np.array(b) if b.size == 0: return v inds = np.abs(v[np.newaxis, :] - b[:, np.newaxis]).argmin(axis=0) return b[inds]
def _isfloat(x): """Test for float-conversible value""" try: float(x) return True except ValueError: return False def _is_ascii(s): """Check for ASCII string""" return all(ord(c) < 128 for c in s) def _isint(x): """Test for int-conversible value""" try: int(x) return True except ValueError: return False def _baseline(v): """Baseline v using histogram. Subtracts the most prominent signal level. """ v = np.array(v) v = v.squeeze() if len(v.shape) != 1: raise ValueError('Need 1-dim input') nbins = max(int(len(v) / 10), 1000) # exact n of bins should not matter ns, edges = np.histogram(v, bins=nbins) peak_ind = np.where(ns == np.max(ns))[0][0] return v - np.mean(edges[peak_ind : peak_ind + 2])
[docs]def center_of_pressure(F, M, dz): """Compute center of pressure from forceplate data. Computes CoP according to AMTI formula. The results differ slightly (typically few mm) from those given by Nexus, for unknown reasons (different filter?) See http://health.uottawa.ca/biomech/courses/apa6903/amticalc.pdf Parameters ---------- F : ndarray The force vector (Nx3). N is the number of observations. M : ndarray The moment vector (Nx3) dz : float The thickness of the plate, or the distance from moment origin to physical origin. Returns ------- ndarray The center of pressure (Nx3) """ FP_FILTFUN = medfilt # the filter function FP_FILTW = 5 # median filter width fx, fy, fz = tuple(F.T) # split columns into separate vars mx, my, _ = tuple(M.T) fz = FP_FILTFUN(fz, FP_FILTW) nz_inds = np.where(np.abs(fz) > 0)[0] # only divide on nonzero inds cop = np.zeros((fx.shape[0], 3)) cop[nz_inds, 0] = -(my[nz_inds] + fx[nz_inds] * dz) / fz[nz_inds] cop[nz_inds, 1] = (mx[nz_inds] - fy[nz_inds] * dz) / fz[nz_inds] return cop
def _change_coords(pts, wR, wT): """Translate pts (Nx3) into a new coordinate system described by rotation matrix wR and translation vector wT""" pts = np.array(pts) return np.dot(wR, pts.T).T + wT def _segment_angles(P): """Compute angles between line segments. The segments are defined by ordered points in P (Nx3 array). For N points, there will be N-1 segments and N-2 angles between those. It can also be 3-d matrix of (TxNx3) to get time-dependent data. Output will be (N-2) vector or Tx(N-2) matrix of angles in radians. If successive points are identical, nan:s will be output for the corresponding angles. """ if P.shape[-1] != 3 or len(P.shape) not in [2, 3]: raise ValueError('Invalid shape of input matrix') if len(P.shape) == 2: P = P[np.newaxis, ...] # insert singleton time axis Pd = np.diff(P, axis=1) # point-to-point vectors vnorms = np.linalg.norm(Pd, axis=2)[..., np.newaxis] # ignore 0/0 and x/0 errors -> nan with np.errstate(divide='ignore', invalid='ignore'): Pdn = Pd / vnorms # take dot products between successive vectors and angles by arccos dots = np.sum(Pdn[:, 0:-1, :] * Pdn[:, 1:, :], axis=2) dots = dots[0, :] if dots.shape[0] == 1 else dots # rm singleton dim return np.pi - np.arccos(dots) def _running_sum(M, win, axis=None): """Calculate running sum for given window length. M is the data (ndarray) and sum will be along given axis. win is the window length. Uses cumulative sum, inspired by: http://arogozhnikov.github.io/2015/09/30/NumpyTipsAndTricks2.html""" if axis is None: M = M.flatten() s = np.cumsum(M, axis=axis) s = np.insert(s, 0, [0], axis=axis) len_ = s.shape[0] if axis is None else s.shape[axis] return s.take(np.arange(win, len_), axis=axis) - s.take( np.arange(0, len_ - win), axis=axis )
[docs]def rms(data, win, axis=None, pad_mode=None): """Calculate rolling window RMS. Parameters ---------- data : ndarray The data. win : int Window length. axis : numpy axis spec, optional Axis along which to compute rms. pad_mode : str or function, optional Padding mode. See np.pad for details. Returns ------- ndarray The rms data. """ if pad_mode is None: pad_mode = 'edge' if win % 2 != 1: raise ValueError('Need RMS window of odd length') datalen = len(data) if axis is None else data.shape[axis] if win > datalen: raise ValueError('Need win length < data length') rms_ = np.sqrt(_running_sum(data ** 2, win, axis=axis) / win) # pad RMS data so that lengths are matched padw = int((win - 1) / 2) padarg_axis = (padw, padw) if axis == None: eff_dim = 1 axis = 0 else: eff_dim = data.ndim padarg = eff_dim * [(0, 0)] padarg[axis] = padarg_axis return np.pad(rms_, padarg, mode=pad_mode)
[docs]def envelope(data, sfrate=None, axis=None): """Calculate an envelope for data using the configured method""" if cfg.emg.envelope_method == 'linear_envelope': if sfrate is None: raise RuntimeError('Linear envelope requires the sampling rate') data = _linear_envelope(data, sfrate, axis=axis) elif cfg.emg.envelope_method == 'rms': data = rms(data, cfg.emg.rms_win, axis=axis) else: raise RuntimeError(f'Invalid envelope method: {cfg.emg.envelope_method}') return data
def _linear_envelope(data, sfrate, axis=None): """Calculate a linear envelope""" data_rect = np.abs(data) return _filtfilt(data_rect, [0, cfg.emg.linear_envelope_lowpass], sfrate, axis=axis) def _filtfilt(data, passband, sfrate, buttord=5, axis=None): """Forward-backward filter. Filter data into given passband, e.g. [1, 40]. Frequencies are given in Hz along with sfrate (sampling rate). Implemented as pure lowpass, if highpass freq = 0. """ if axis is None: axis = -1 # filtfilt() default if passband is None: return data passbandn = 2 * np.array(passband) / sfrate if passbandn[0] > 0: # bandpass b, a = signal.butter(buttord, passbandn, 'bandpass') else: # lowpass b, a = signal.butter(buttord, passbandn[1]) return signal.filtfilt(b, a, data, axis=axis) def _get_local_max(data): """Get local maximum (peak) of 1-D data""" # the simpler argrelextrema() would not return flat peaks, which might occur # at least in theory peak_inds = signal.find_peaks(data)[0] if not any(peak_inds): return (np.nan, np.nan) peak_ind = peak_inds[data[peak_inds].argmax()] peak_val = data[peak_ind] return peak_ind, peak_val def _get_local_min(data): """Get local minimum (peak) of 1-D data""" ind, val = _get_local_max(-data) if ind is np.nan: return np.nan, np.nan else: return ind, -val