Source code for gaitutils.utils

# -*- coding: utf-8 -*-
"""

Utility functions for processing gait data.

@author: Jussi (jnu@iki.fi)
"""

from scipy import signal
from matplotlib import path
import matplotlib.pyplot as plt
import numpy as np
import logging

from .envutils import GaitDataError
from .config import cfg
from .numutils import rising_zerocross, falling_zerocross, _baseline
from .events import GaitEvent, GaitEvents


logger = logging.getLogger(__name__)


[docs]def get_contexts(right_first=False): """Return the usual contexts and their names as pairs. Our default is to return left size first. """ _contexts = [('L', 'Left'), ('R', 'Right')] if right_first: _contexts.reverse() return _contexts
[docs]def marker_gaps(mdata, ignore_edge_gaps=True): """Find gaps for a marker. Parameters ---------- mdata : ndarray Marker position data. See read_data.get_marker_data() ignore_edge_gaps : bool, optional If True, leading and trailing gaps are ignored. Returns ------- ndarray The indices of frames where gaps occur. """ allzero = np.any(mdata, axis=1).astype(int) if ignore_edge_gaps: nleading = allzero.argmax() allzero_trim = np.trim_zeros(allzero) gap_inds = np.where(allzero_trim == 0)[0] + nleading else: gap_inds = np.where(allzero == 0)[0] return gap_inds
def _step_width(source): """Compute step width over trial cycles. For details of computation, see: https://www.vicon.com/faqs/software/how-does-nexus-plug-in-gait-and-polygon-calculate-gait-cycle-parameters-spatial-and-temporal Returns context keyed dict of lists. FIXME: marker name into params? FIXME: this (and similar) may also need to take Trial instance as argument to avoid creating new Trials """ from .trial import Trial tr = Trial(source) sw = dict() mkr = 'TOE' # marker name without context mkrdata = tr._full_marker_data # XXX: could use cycles here, instead of iterating over foot strikes strike_frames = dict() for context in 'LR': strike_frames[context] = [ ev.frame for ev in tr.events.get_events(event_type='strike', context=context) ] for context in 'LR': strikes = strike_frames[context] sw[context] = list() nstrikes = len(strikes) if nstrikes < 2: continue # contralateral vars context_co = 'L' if context == 'R' else 'R' strikes_co = strike_frames[context_co] mname = context + mkr mname_co = context_co + mkr for j, strike in enumerate(strikes): if strike == strikes[-1]: # last strike on this context break pos_this = mkrdata[mname][strike] pos_next = mkrdata[mname][strikes[j + 1]] strikes_next_co = [k for k in strikes_co if k > strike] if len(strikes_next_co) == 0: # no subsequent contralateral strike break pos_next_co = mkrdata[mname_co][strikes_next_co[0]] # vector distance between 'step lines' (see url above) V1 = pos_next - pos_this V1 /= np.linalg.norm(V1) VC = pos_next_co - pos_this VCP = V1 * np.dot(VC, V1) # proj to ipsilateral line VSW = VCP - VC # marker data is in mm, but return step width in m sw[context].append(np.linalg.norm(VSW) / 1000.0) return sw
[docs]def avg_markerdata(mkrdata, markers, roi=None, fail_on_gaps=True, avg_velocity=False): """Average marker data. Parameters ---------- mkrdata : dict The markerdata dict. markers : list Markers to average. roi : array-like If given, specified a ROI. Gaps outside the ROI will be ignored. fail_on_gaps : bool, optional If True, raise an exception on ANY gaps. Otherwise, markers with gaps will not be included in the average. avg_velocity : bool, optional If True, return averaged marker velocity data instead of position. Returns ------- ndarray The averaged data (Nx3). """ data_shape = mkrdata[markers[0]].shape mdata_avg = np.zeros(data_shape) if roi is None: roi = [0, data_shape[0]] roi_frames = np.arange(roi[0], roi[1]) n_ok = 0 for marker in markers: mdata = mkrdata[marker] if avg_velocity: mdata = np.gradient(mdata)[0] gap_frames = marker_gaps(mdata) if np.intersect1d(roi_frames, gap_frames).size > 0: if fail_on_gaps: raise GaitDataError(f'Averaging data for {marker} has gaps') else: logger.warning( f'marker {marker} cannot be included in average due to gaps' ) continue else: mdata_avg += mdata n_ok += 1 if n_ok == 0: raise GaitDataError('all markers have gaps, cannot average') else: return mdata_avg / n_ok
# FIXME: marker sets could be moved into models.py? def _pig_markerset(fullbody=True, sacr=True): """PiG marker set as dict (empty values)""" _pig = [ 'LASI', 'RASI', 'LTHI', 'LKNE', 'LTIB', 'LANK', 'LHEE', 'LTOE', 'RTHI', 'RKNE', 'RTIB', 'RANK', 'RHEE', 'RTOE', ] if fullbody: _pig += [ 'LFHD', 'RFHD', 'LBHD', 'RBHD', 'C7', 'T10', 'CLAV', 'STRN', 'RBAK', 'LSHO', 'LELB', 'LWRA', 'LWRB', 'LFIN', 'RSHO', 'RELB', 'RWRA', 'RWRB', 'RFIN', ] # add pelvis posterior markers; SACR or RPSI/LPSI _pig.extend(['SACR'] if sacr else ['RPSI', 'LPSI']) return {mkr: None for mkr in _pig} def _pig_pelvis_markers(): return ['RASI', 'RPSI', 'LASI', 'LPSI', 'SACR']
[docs]def is_plugingait_set(mkrdata): """Check whether marker data set corresponds to Plug-in Gait (full body or lower body only). Extra markers are accepted.""" mkrs = set(mkrdata.keys()) # required markers lb_mkrs_sacr = set(_pig_markerset(fullbody=False).keys()) lb_mkrs_psi = set(_pig_markerset(fullbody=False, sacr=False).keys()) return lb_mkrs_psi.issubset(mkrs) or lb_mkrs_sacr.issubset(mkrs)
def _check_markers_flipped(mkrdata): """Check for markers that may typically get flipped in labeling. Yields pairs of flipped markers""" MAX_ANGLE = 90 # max angle to consider vectors 'similarly oriented' # HEE-TOE checks for context in ['L', 'R']: # compare HEE-TOE line to pelvis orientation mkr_toe, mkr_hee = f'{context}TOE', f'{context}HEE' ht = _normalize(mkrdata[mkr_toe] - mkrdata[mkr_hee]) if context + 'PSI' in mkrdata: pa = _normalize(mkrdata[context + 'ASI'] - mkrdata[context + 'PSI']) elif 'SACR' in mkrdata: pa = _normalize(mkrdata[context + 'ASI'] - mkrdata['SACR']) angs = np.arccos(np.sum(ht * pa, axis=1)) / np.pi * 180 if np.nanmedian(angs) > MAX_ANGLE: yield mkr_toe, mkr_hee def _principal_movement_direction(mdata): """Return principal movement direction (dimension of maximum variance). Used to find out whether gait occurs in the x- or y-dimension of lab frame. Returns 0 for x, 1 for y. """ inds_ok = np.where(np.any(mdata, axis=1)) # make sure that gaps are ignored return np.argmax(np.var(mdata[inds_ok], axis=0)) def _get_foot_contact_vel(mkrdata, fp_events, medians=True, roi=None): """Return foot velocities during forceplate strike/toeoff frames. fp_events is output from detect_forceplate_events(). If medians=True, return median values.""" vels = dict() for context, markers in zip( ('R', 'L'), [cfg.autoproc.right_foot_markers, cfg.autoproc.left_foot_markers] ): # we can be less strict about gaps here, since missing a marker or two # probably won't really affect the velocity footctrv_ = avg_markerdata( mkrdata, markers, roi=roi, fail_on_gaps=False, avg_velocity=True, ) footctrv = np.linalg.norm(footctrv_, axis=1) strikes = [ ev.frame for ev in fp_events.get_events(event_type='strike', context=context) ] toeoffs = [ ev.frame for ev in fp_events.get_events(event_type='toeoff', context=context) ] vels[context + '_strike'] = footctrv[strikes] vels[context + '_toeoff'] = footctrv[toeoffs] if medians: vels = { key: (np.array([np.median(x)]) if x.size > 0 else x) for key, x in vels.items() } return vels def _get_foot_swing_velocity(footctrv, max_peak_velocity, min_swing_velocity): """Compute foot swing velocity from scalar velocity data (footctrv)""" # find maxima of velocity: derivative crosses zero and values ok vd = np.gradient(footctrv) vdz_ind = falling_zerocross(vd) inds = np.where(footctrv[vdz_ind] < max_peak_velocity)[0] if len(inds) == 0: raise GaitDataError('Cannot find acceptable velocity peaks') # delete spurious peaks (where min swing velocity is not attained) vs = footctrv[vdz_ind[inds]] not_ok = np.where(vs < vs.max() * min_swing_velocity) vs = np.delete(vs, not_ok) return np.median(vs) def _normalize(V): """Normalize rows of matrix V""" Vn = np.linalg.norm(V, axis=1) # quietly return all nans for length zero vectors with np.errstate(divide='ignore', invalid='ignore'): return V / Vn[:, np.newaxis] def _get_foot_points(mkrdata, context, footlen=None): """Estimate points in the xy plane enclosing the foot. Foot is modeled as a triangle with three points: heel, lateral leading edge (small toe) and medial leading edge (hallux)""" # marker data as N x 3 matrices heeP = mkrdata[context + 'HEE'] toeP = mkrdata[context + 'TOE'] ankP = mkrdata[context + 'ANK'] # heel - toe vectors htV = toeP - heeP htVn = _normalize(htV) # heel - ankle vectors haV = ankP - heeP ha_len = np.linalg.norm(haV, axis=1) # end point of foot (just beyond 2nd toe) if footlen is None: # rough estimate based on marker distances logger.debug('using estimated foot length') # a lot of zero entries (gaps) can throw off the median computation ha_len_nonzero = ha_len[np.where(ha_len > 0)] footlen = np.median(ha_len_nonzero) * cfg.autoproc.foot_relative_len foot_end = heeP + htVn * footlen # projection of HEE-ANK to HEE-TOE line ha_htV = htVn * np.sum(haV * htVn, axis=1)[:, np.newaxis] # lateral ANK vector (from HEE-TOE line to ANK) lankV = haV - ha_htV # edge points are coplanar with markers but not with the foot # lateral foot edge lat_edge = foot_end + lankV # medial foot edge med_edge = foot_end - lankV # heel edge (compensate for marker diameter) heel_edge = heeP + htVn * cfg.autoproc.marker_diam / 2 logger.debug( 'foot length: %.1f mm width: %.1f mm' % ( np.nanmedian(np.linalg.norm(heel_edge - foot_end, axis=1)), np.nanmedian(np.linalg.norm(lat_edge - med_edge, axis=1)), ) ) return {'heel': heel_edge, 'lateral': lat_edge, 'medial': med_edge, 'toe': foot_end} def _leading_foot(mkrdata, roi=None): """Determine which foot is leading (ahead in the direction of gait). Returns n-length list of 'R' or 'L' correspondingly (n = number of frames). Gaps are indicated as None. mkrdata must include foot and pelvis markers""" subj_pos = avg_markerdata( mkrdata, cfg.autoproc.track_markers, roi=roi, fail_on_gaps=False ) # FIXME: should not use a single dim here gait_dim = _principal_movement_direction(subj_pos) pos_diff = np.diff(subj_pos, axis=0)[:, gait_dim] gait_dir = np.median(pos_diff[np.where(pos_diff != 0.0)]) if gait_dir > 0: cmpfun = np.greater elif gait_dir < 0: cmpfun = np.less else: raise GaitDataError('cannot determine gait direction') lfoot = avg_markerdata( mkrdata, cfg.autoproc.left_foot_markers, roi=roi, fail_on_gaps=False )[:, gait_dim] rfoot = avg_markerdata( mkrdata, cfg.autoproc.right_foot_markers, roi=roi, fail_on_gaps=False )[:, gait_dim] return [ None if R == 0.0 or L == 0.0 else ('R' if cmpfun(R, L) else 'L') for R, L in zip(rfoot, lfoot) ] def _trial_median_velocity(source, return_curve=False): """Compute median velocity (walking speed) over whole trial by differentiation of marker data from track markers. Up/down movement of markers may slightly increase speed compared to time-distance values. If return_curve, return velocity curve normalized to 0..100% of trial""" from . import read_data try: frate = read_data.get_metadata(source)['framerate'] mkrdata = read_data.get_marker_data(source, cfg.autoproc.track_markers) vel_3 = avg_markerdata(mkrdata, cfg.autoproc.track_markers, avg_velocity=True) vel_ = np.sqrt(np.sum(vel_3 ** 2, 1)) # scalar velocity except (GaitDataError, ValueError): if return_curve: nanvec = np.empty((100, 1)) nanvec[:] = np.nan return np.nan, nanvec else: return np.nan vel = np.median(vel_[np.where(vel_)]) # ignore zeros vel_ms = vel * frate / 1000.0 # convert to m/s if return_curve: tn = np.linspace(0, 100, 101) vel_curve = np.interp(tn, np.linspace(0, 100, len(vel_)), vel_) return vel_ms, vel_curve * frate / 1000.0 else: return vel_ms def _point_in_poly(poly, pt): """Point-in-polygon. poly is ordered nx3 array of vertices and P is mx3 array of points. Returns mx3 array of booleans. 3rd dim is currently ignored""" p = path.Path(poly[:, :2]) return p.contains_point(pt)
[docs]def detect_forceplate_events( source, marker_data=None, eclipse_fp_info=None, roi=None, return_nplates=False ): """Detect frames where valid forceplate strikes and toeoffs occur. Uses forceplate data and estimated foot shape. If supplied, marker_data must include foot and pelvis markers. Otherwise the marker data will be read from source. If the fp_info dict is supplied, no marker-based checks will be done; instead the Eclipse forceplate info will be used to determine the foot. Eclipse info is written e.g. as {FP1: 'Left'}, where plate indices start from 1 and the value can be 'Auto', 'Left', 'Right' or 'Invalid'. NB: even if Eclipse info is used, foot strike and toeoff frames must be determined from forceplate data. If roi is given e.g. [100, 300], all marker data checks will be restricted to roi. """ def _foot_plate_check(fpdata, marker_data, fr0, context, footlen): """Helper for foot-plate check. Returns 0, 1, 2 for: completely outside plate, partially outside plate, inside plate, respectively. """ foot_points = _get_foot_points(marker_data, context, footlen) plate_corners = fpdata['plate_corners'] # logger.debug(f'{plate_corners=}') pts_ok = list() for label, pts in foot_points.items(): foot_point = pts[fr0, :] # logger.debug(f'{foot_point=}') pt_ok = _point_in_poly(plate_corners, foot_point) logger.debug(f"{label} point {'' if pt_ok else 'not '}on plate") pts_ok.append(pt_ok) if all(pts_ok): return 2 elif any(pts_ok): return 1 else: return 0 def _threshold_forceplate(fp, bodymass=None): """Get candidate foot strike and toeoff frames by considering force only""" # apply median filter to remove spikes # XXX: kernel size should maybe depend on sampling freq? forcetot = signal.medfilt(fp['Ftot'], kernel_size=3) forcetot = _baseline(forcetot) fmaxind = np.argmax(forcetot) fmax = forcetot[fmaxind] logger.debug('peak force: %.2f N at sample %d' % (fmax, fmaxind)) # find force threshold if bodymass is None: f_threshold = cfg.autoproc.forceplate_contact_threshold * fmax logger.warning( 'body mass unknown, thresholding force at %.2f N', f_threshold ) else: logger.debug(f'body mass {bodymass:.2f} kg') f_threshold = cfg.autoproc.forceplate_contact_threshold * bodymass * 9.81 if fmax < cfg.autoproc.forceplate_min_weight * bodymass * 9.81: logger.debug('insufficient max. force on plate') force_checks_ok = False # find the indices around peak where force crosses threshold f_rising_inds = rising_zerocross(forcetot - f_threshold) f_falling_inds = falling_zerocross(forcetot - f_threshold) try: f_rising_ind = f_rising_inds[np.where(f_rising_inds < fmaxind)][-1] f_falling_ind = f_falling_inds[np.where(f_falling_inds > fmaxind)][0] logger.debug('force rise: %d fall: %d' % (f_rising_ind, f_falling_ind)) # these are 0-based frame indices (=1 less than Nexus frame index) strike_fr = int(np.round(f_rising_ind / info['samplesperframe'])) toeoff_fr = int(np.round(f_falling_ind / info['samplesperframe'])) logger.debug( 'foot strike at frame %d, toeoff at %d' % (strike_fr, toeoff_fr) ) force_checks_ok = True except IndexError: logger.debug('cannot detect force rise/fall') force_checks_ok = False strike_fr, toeoff_fr = None, None return strike_fr, toeoff_fr, force_checks_ok def _context_from_eclipse(eclipse_fp_info, plate): """Interpret context from Eclipse data. Returns tuple of (context, detect_context), where context is 'R', 'L' (or None for invalid/unknown) and detect_context indicates whether we should do our own autodetection. """ context = None if eclipse_fp_info is not None and plate in eclipse_fp_info: eclipse_context = eclipse_fp_info[plate] logger.debug(f'using Eclipse forceplate info: {eclipse_context}') if eclipse_context == 'Right': detect_context = False context = 'R' elif eclipse_context == 'Left': detect_context = False context = 'L' elif eclipse_context == 'Invalid': detect_context = False elif eclipse_context == 'Auto': detect_context = True else: raise GaitDataError('unexpected Eclipse forceplate field') else: logger.debug('not using Eclipse forceplate info') detect_context = True return context, detect_context from . import read_data logger.debug(f'detecting forceplate events from {source}') results = GaitEvents() # get subject info info = read_data.get_metadata(source) fpdata = read_data.get_forceplate_data(source) if not fpdata: logger.warning('no forceplates') return results footlen = info['subj_params']['FootLen'] rfootlen = info['subj_params']['RFootLen'] lfootlen = info['subj_params']['LFootLen'] if footlen is not None: logger.debug(f'(obsolete) single foot length parameter set to {footlen:.2f}') rfootlen = lfootlen = footlen elif rfootlen is not None and lfootlen is not None: logger.debug( f'foot length parameters set to r={rfootlen:.2f}, l={lfootlen:.2f}' ) else: logger.debug('foot length parameter not set') bodymass = info['subj_params']['Bodymass'] req_markers = ( cfg.autoproc.right_foot_markers + cfg.autoproc.left_foot_markers + cfg.autoproc.track_markers ) if marker_data is None: # not supplied as parameter marker_data = read_data.get_marker_data(source, req_markers) if not all(mkr in marker_data for mkr in req_markers): logger.warning('required markers missing, cannot detect forceplate contacts') return results datalen = info['length'] logger.debug('acquiring marker-based gait events') events_marker = automark_events(source, mkrdata=marker_data, roi=roi) # loop over the plates; our internal forceplate index is 0-based for plate_ind, fpdata_this in enumerate(fpdata): eclipse_key = fpdata_this['eclipse_key'] logger.debug(f'analyzing plate {eclipse_key}') context, detect_context = _context_from_eclipse(eclipse_fp_info, eclipse_key) strike_fr, toeoff_fr, force_checks_ok = _threshold_forceplate( fpdata_this, bodymass ) if not force_checks_ok: context = None # check foot markers (or points) to determine context and validity if force_checks_ok and detect_context: logger.debug('autodetecting context') # allows foot to settle for 50 ms after strike settle_fr = int(50 / 1000 * info['framerate']) fr0 = strike_fr + settle_fr # context is determined by leading foot at strike time this_context = _leading_foot(marker_data, roi=roi)[fr0] if this_context is None: raise GaitDataError('cannot determine leading foot from marker data') footlen = rfootlen if this_context == 'R' else lfootlen logger.debug(f'checking contact for leading foot: {this_context}') foot_contacts_ok = ( _foot_plate_check(fpdata_this, marker_data, fr0, this_context, footlen) == 2 ) # to eliminate double contacts, check that contralateral foot is not on plate # this needs marker-based events if foot_contacts_ok and events_marker is not None: contra_context = 'R' if this_context == 'L' else 'L' contra_strikes = [ ev.frame for ev in events_marker.get_events('strike', contra_context) ] contra_strikes = np.array(contra_strikes) contra_strikes_next = contra_strikes[ np.where(contra_strikes > strike_fr) ] if contra_strikes_next.size == 0: logger.debug('no subsequent contralateral strike') else: fr0 = contra_strikes_next[0] + settle_fr if fr0 > datalen: # data overrun logger.debug('no subsequent contralateral strike (overrun)') else: logger.debug( 'checking the subsequent contralateral strike ' '(at frame %d)' % fr0 ) contra_next_ok = ( _foot_plate_check( fpdata_this, marker_data, fr0, contra_context, footlen ) == 0 ) foot_contacts_ok &= contra_next_ok contra_strikes_prev = contra_strikes[ np.where(contra_strikes < strike_fr) ] if contra_strikes_prev.size == 0: logger.debug('no previous contralateral strike') else: fr0 = contra_strikes_prev[-1] + settle_fr logger.debug( 'checking previous contact for contralateral ' 'foot (at frame %d)' % fr0 ) contra_prev_ok = ( _foot_plate_check( fpdata_this, marker_data, fr0, contra_context, footlen ) == 0 ) foot_contacts_ok &= contra_prev_ok context = this_context if foot_contacts_ok else None if context: logger.debug(f'{eclipse_key}: {context} strike at frame {strike_fr}') strike_ev = GaitEvent( strike_fr, 'strike', context, forceplate_index=plate_ind ) toeoff_ev = GaitEvent( toeoff_fr, 'toeoff', context, forceplate_index=plate_ind ) results.append(strike_ev) results.append(toeoff_ev) else: logger.debug(f'{eclipse_key}: no valid foot strike') if return_nplates: return results, len(fpdata) else: return results
[docs]def automark_events( source, mkrdata=None, events_range=None, vel_thresholds=None, roi=None, plot=False, ): """Automatically mark foot strike and toeoff events. Events are marked based on velocity thresholding. Absolute thresholds can be specified as arguments. Otherwise, relative thresholds will be calculated based on the data. Optimal results will be obtained when thresholds are predetermined based on forceplate data, but it is not necessary. Before running automark, run reconstruct, label, gap fill and filter pipelines. Filtering is important to get reasonably smooth derivatives. Parameters ---------- source : str | ViconNexus The data source, either c3d filename or ViconNexus connection. For Nexus connections, the events can automatically be inserted into Nexus. For c3d files, the events are returned but not actually written to the c3d file. mkrdata : dict, optional The marker data dict. If not given, it will be read from the source. If given, it must include foot markers and subject tracking markers (see cfg). events_range : array_like, optional If specified, the events will be restricted to given coordinate range in the principal gait direction. E.g. [-1000, 1000] vel_thresholds : dict, optional Absolute velocity thresholds for identifying events. These can be obtained from forceplate data (utils.check_forceplate_contact). If None, relative thresholds will be computed based on marker data. roi : array_like, optional If not None, specifies a ROI (in frames) inside which to mark events. plot : bool, optional Plot velocity curves and events using matplotlib. Mostly for debug purposes. """ from .read_data import get_metadata, get_marker_data info = get_metadata(source) frate = info['framerate'] # TODO: move into config # marker data is assumed to be in mm # mm/frame = 1000 m/frame = 1000/frate m/s VEL_CONV = 1000 / frate # reasonable limit for peak velocity (m/s before multiplier) MAX_PEAK_VELOCITY = 12 * VEL_CONV # reasonable limits for velocity on slope (increasing/decreasing) MAX_SLOPE_VELOCITY = 6 * VEL_CONV MIN_SLOPE_VELOCITY = 0 # not currently in use # minimum swing velocity (rel to max velocity) MIN_SWING_VELOCITY = 0.5 # median prefilter width PREFILTER_MEDIAN_WIDTH = 3 if vel_thresholds is None: vel_thresholds = { 'L_strike': None, 'L_toeoff': None, 'R_strike': None, 'R_toeoff': None, } if mkrdata is None: # FIXME: missing markers are not detected here? reqd_markers = ( cfg.autoproc.right_foot_markers + cfg.autoproc.left_foot_markers + cfg.autoproc.track_markers ) mkrdata = get_marker_data(source, reqd_markers) rfootctrv_ = avg_markerdata( mkrdata, cfg.autoproc.right_foot_markers, roi=roi, fail_on_gaps=False, avg_velocity=True, ) rfootctrv = np.linalg.norm(rfootctrv_, axis=1) lfootctrv_ = avg_markerdata( mkrdata, cfg.autoproc.left_foot_markers, roi=roi, fail_on_gaps=False, avg_velocity=True, ) lfootctrv = np.linalg.norm(lfootctrv_, axis=1) # position data: use ANK marker rfootctrP = mkrdata['RANK'] lfootctrP = mkrdata['LANK'] events = GaitEvents() # loop: same operations for left / right foot for context, footctrP, footctrv in zip( ('R', 'L'), (rfootctrP, lfootctrP), (rfootctrv, lfootctrv) ): logger.debug(f'marking context {context}') # foot center position # filter scalar velocity data to suppress noise and spikes footctrv = signal.medfilt(footctrv, PREFILTER_MEDIAN_WIDTH) # get peak (swing) velocity maxv = _get_foot_swing_velocity(footctrv, MAX_PEAK_VELOCITY, MIN_SWING_VELOCITY) # compute thresholds if cfg.autoproc.use_fp_vel_thresholds and vel_thresholds[context + '_strike']: threshold_fall_ = vel_thresholds[context + '_strike'] else: threshold_fall_ = maxv * cfg.autoproc.strike_vel_threshold if cfg.autoproc.use_fp_vel_thresholds and vel_thresholds[context + '_toeoff']: threshold_rise_ = vel_thresholds[context + '_toeoff'] else: threshold_rise_ = maxv * cfg.autoproc.toeoff_vel_threshold logger.debug( 'context: %s, default thresholds fall/rise: %.2f/%.2f' % ( context, maxv * cfg.autoproc.strike_vel_threshold, maxv * cfg.autoproc.toeoff_vel_threshold, ) ) logger.debug(f'using thresholds: {threshold_fall_}/{threshold_rise_}') # find point where velocity crosses threshold # foot strikes (velocity decreases) cross = falling_zerocross(footctrv - threshold_fall_) # exclude edges of data vector fmax = len(footctrv) - 1 cross = cross[np.where(np.logical_and(cross > 0, cross < fmax))] # check velocity on slope cind_min = np.logical_and( footctrv[cross - 1] < MAX_SLOPE_VELOCITY, footctrv[cross - 1] > MIN_SLOPE_VELOCITY, ) cind_max = np.logical_and( footctrv[cross + 1] < MAX_SLOPE_VELOCITY, footctrv[cross + 1] > MIN_SLOPE_VELOCITY, ) strikes = cross[np.logical_and(cind_min, cind_max)] # check for foot swing (velocity maximum) between consecutive strikes # if no swing, keep deleting the latter event until swing is found bad = [] for sind in range(len(strikes)): if sind in bad: continue for sind2 in range(sind + 1, len(strikes)): swing_max_vel = footctrv[strikes[sind] : strikes[sind2]].max() # logger.debug('check %d-%d' % (strikes[sind], strikes[sind2])) if swing_max_vel < maxv * MIN_SWING_VELOCITY: logger.debug( 'no swing between strikes %d-%d, deleting %d' % (strikes[sind], strikes[sind2], strikes[sind2]) ) bad.append(sind2) else: break strikes = np.delete(strikes, bad) if len(strikes) == 0: raise GaitDataError('No valid foot strikes detected') # toe offs (velocity increases) cross = rising_zerocross(footctrv - threshold_rise_) cross = cross[np.where(np.logical_and(cross > 0, cross < len(footctrv)))] cind_min = np.logical_and( footctrv[cross - 1] < MAX_SLOPE_VELOCITY, footctrv[cross - 1] > MIN_SLOPE_VELOCITY, ) cind_max = np.logical_and( footctrv[cross + 1] < MAX_SLOPE_VELOCITY, footctrv[cross + 1] > MIN_SLOPE_VELOCITY, ) toeoffs = cross[np.logical_and(cind_min, cind_max)] if len(toeoffs) == 0: raise GaitDataError('Could not detect any toe-off events') # check for multiple toeoffs for s1, s2 in list(zip(strikes, np.roll(strikes, -1)))[:-1]: to_this = np.where(np.logical_and(toeoffs > s1, toeoffs < s2))[0] if len(to_this) > 1: logger.debug( '%d toeoffs during cycle, keeping the last one' % len(to_this) ) toeoffs = np.delete(toeoffs, to_this[:-1]) logger.debug(f'autodetected strike events: {strikes}') logger.debug(f'autodetected toeoff events: {toeoffs}') # select events for which the foot is close enough to center frame if events_range: mdata = avg_markerdata(mkrdata, cfg.autoproc.track_markers, roi=roi) fwd_dim = _principal_movement_direction(mdata) strike_pos = footctrP[strikes, fwd_dim] dist_ok = np.logical_and( strike_pos > events_range[0], strike_pos < events_range[1] ) # exactly zero position at strike should indicate a gap -> exclude # TODO: smarter gap handling dist_ok = np.logical_and(dist_ok, strike_pos != 0) strikes = strikes[dist_ok] if roi is not None: strikes = np.extract( np.logical_and(roi[0] <= strikes + 1, strikes + 1 <= roi[1]), strikes ) toeoffs = np.extract( np.logical_and(roi[0] <= toeoffs + 1, toeoffs + 1 <= roi[1]), toeoffs ) if len(strikes) == 0: raise GaitDataError('No valid foot strikes detected') # delete toeoffs that are not between strike events not_ok = np.where( np.logical_or(toeoffs <= min(strikes), toeoffs >= max(strikes)) ) toeoffs = np.delete(toeoffs, not_ok) logger.debug(f'final strike events: {strikes}') logger.debug(f'final toeoff events: {toeoffs}') for fr in strikes: e = GaitEvent(int(fr), 'strike', context) events.append(e) for fr in toeoffs: e = GaitEvent(int(fr), 'toeoff', context) events.append(e) # plot velocities w/ thresholds and marked events if plot: first_call = context == 'R' if first_call: _, (ax1, ax2) = plt.subplots(2, 1) ax = ax1 if first_call else ax2 ax.plot(footctrv, 'g', label='foot center velocity ' + context) # algorithm, fixed thresholds ax.plot(strikes, footctrv[strikes], 'kD', markersize=10, label='strike') ax.plot(toeoffs, footctrv[toeoffs], 'k^', markersize=10, label='toeoff') ax.legend(numpoints=1, fontsize=10) ax.set_ylim(0, maxv + 10) if not first_call: plt.xlabel('Frame') ax.set_ylabel('Velocity (mm/frame)') ax.set_title('Left' if context == 'L' else 'Right') if plot: plt.show() return events