# -*- coding: utf-8 -*-
"""
C3D reader functions.
NB: do not use the data readers from this file directly. They are intended to be
called via the read_data module.
@author: Jussi (jnu@iki.fi)
"""
from collections import defaultdict
import logging
import numpy as np
from pathlib import Path
import sys
import shutil
from .numutils import center_of_pressure, _change_coords, _is_ascii
from .events import GaitEvents, GaitEvent
from .utils import _step_width
from .envutils import lru_cache_checkfile, _named_tempfile, GaitDataError
logger = logging.getLogger(__name__)
try:
import btk
BTK_IMPORTED = True
except ImportError:
BTK_IMPORTED = False
logger.warning('cannot import btk module; unable to read .c3d files')
def _is_c3d_file(source):
"""Check if source is a valid c3d file.
XXX: Currently we just check existence. Would be good to check file header.
"""
try:
return Path(source).is_file()
except TypeError:
return False
def _get_c3d_metadata_subfields(acq, field):
"""Return names of C3D metadata subfields for a given field"""
meta = acq.GetMetaData()
meta_s = meta.GetChild(field)
return [f.GetLabel() for f in btk.Iterate(meta_s)]
def _get_c3d_metadata_field(acq, field, subfield):
"""Get c3d metadata FIELD:SUBFIELD as Python type.
Always returns a list.
"""
meta = acq.GetMetaData()
def _get_child(field, child):
try:
return field.GetChild(child)
except RuntimeError:
raise RuntimeError(f'Invalid c3d metadata field: {child}')
info = _get_child(_get_child(meta, field), subfield).GetInfo()
if (strformat := info.GetFormatAsString()) == 'Char':
return [s.strip() for s in info.ToString()]
elif strformat == 'Real':
return [x for x in info.ToDouble()]
else:
raise RuntimeError('Unhandled btk meta info type')
@lru_cache_checkfile
def _get_c3dacq(c3dfile):
"""Get a btk c3dacq object.
Object is returned from cache if filename and digest match.
"""
reader = btk.btkAcquisitionFileReader()
c3dfile = str(c3dfile) # accept Path objects too (btk won't eat those)
reader.SetFilename(c3dfile)
try:
reader.Update()
except RuntimeError as e:
# this is a workaround for an unresolved BTK Python 3 bug, where
# filenames containing extended characters cannot be read
# this is somehow related to Windows character set handling, since
# it only manifests on some machines
if not _is_ascii(c3dfile) and sys.version_info.major == 3:
logger.warning('trying to work around possible btk extended chars bug')
# just copy the file into another directory
temp_path = _named_tempfile(suffix='.c3d')
shutil.copy2(c3dfile, temp_path)
logger.warning(f'using {temp_path}')
# we need to create a new reader here
# (the previous update call screws it up somehow)
reader = btk.btkAcquisitionFileReader()
reader.SetFilename(str(temp_path))
reader.Update()
# we should be able to remove the temp file now
temp_path.unlink()
else:
# something else went wrong during read
raise e
return reader.GetOutput()
[docs]def get_analysis(c3dfile, condition='unknown'):
"""Get ANALYSIS values from a c3d file.
Usually these are the time-distance parameters, but other values may be
stored as well.
Parameters
----------
c3dfile : str | Path
Path to the file.
condition : str, optional
The condition, by default 'unknown'. The condition is only used to
annotate the output dictionary.
Returns
-------
dict
A nested dict of the analysis values. Keys are condition, variable, and
context/unit. For example, di['unknown']['Step Width']['Right'] would contain
the step width for the right foot.
"""
# used to change units; we currently change steps/min for brevity
UNIT_CONVERSIONS = {'steps/min': '1/min'}
logger.debug(f'getting analysis values from {c3dfile}')
acq = _get_c3dacq(c3dfile)
try:
vars_ = _get_c3d_metadata_field(acq, 'ANALYSIS', 'NAMES')
units = _get_c3d_metadata_field(acq, 'ANALYSIS', 'UNITS')
contexts = _get_c3d_metadata_field(acq, 'ANALYSIS', 'CONTEXTS')
vals = _get_c3d_metadata_field(acq, 'ANALYSIS', 'VALUES')
except RuntimeError:
raise GaitDataError(f'Cannot read time-distance parameters from {c3dfile}')
# build a nice output dict
di = defaultdict(lambda: defaultdict(dict))
di_ = di[condition]
for (var, unit, context, val) in zip(vars_, units, contexts, vals):
if unit in UNIT_CONVERSIONS:
unit = UNIT_CONVERSIONS[unit]
di_[var]['unit'] = unit
di_[var][context] = val
# if c3d was missing vals for some var/context, insert nans
for var in di_:
for context in ['Left', 'Right']:
if context not in di_[var]:
logger.warning(f'{c3dfile} has missing value: {var} / {context}')
di_[var][context] = np.nan
# Nexus version <2.8 did not output step width into c3d, so compute it here
# if needed
if 'Step Width' not in di_:
logger.warning(f'computing step widths (not found in {c3dfile})')
sw = _step_width(c3dfile)
di[condition]['Step Width'] = dict()
# XXX: currently uses average of all cycles from trial
di[condition]['Step Width']['Right'] = np.array(sw['R']).mean()
di[condition]['Step Width']['Left'] = np.array(sw['L']).mean()
di[condition]['Step Width']['unit'] = 'm'
return di
def _get_emg_data(c3dfile):
"""Read EMG data from a c3d file.
See read_data.get_emg_data() for details.
"""
return _get_analog_data(c3dfile, 'EMG')
def _get_accelerometer_data(c3dfile):
"""Read accelerometer data from a c3d file"""
data = _get_analog_data(c3dfile, 'Accelerometer')
# Remove the 'Acceleration.' prefix if inserted by Nexus, so that channel
# names match Nexus. This is a bit ugly (not done for EMG which uses fuzzy
# matching)
for key in data['data']:
if key.find('Acceleration.') == 0:
# replace key names
data['data'][key[13:]] = data['data'].pop(key)
return data
def _get_analog_data(c3dfile, devname):
"""Read analog data from a c3d file.
devname is matched against channel names.
"""
acq = _get_c3dacq(c3dfile)
data = dict()
chnames = []
for i in btk.Iterate(acq.GetAnalogs()):
if i.GetDescription().find(devname) >= 0:
if (chname := i.GetLabel()) in chnames:
raise GaitDataError(
'Duplicate channel names in C3D file. Please rename your channels.'
)
chnames.append(chname)
data[chname] = np.squeeze(i.GetValues())
if chnames:
return {
't': np.arange(len(data[chname])) / acq.GetAnalogFrequency(),
'data': data,
}
else:
raise GaitDataError(
f'No analog channels matching device {devname} found in data'
)
def _get_marker_data(c3dfile, markers, ignore_missing=False):
"""Get position data for specified markers.
See read_data.get_marker_data for details.
"""
if not isinstance(markers, list): # listify if not already a list
markers = [markers]
acq = _get_c3dacq(c3dfile)
mkrdata = dict()
for marker in markers:
try:
mkrdata[marker] = np.squeeze(acq.GetPoint(marker).GetValues())
except RuntimeError:
if ignore_missing:
logger.warning(f'Cannot read trajectory {marker} from c3d file')
continue
else:
raise GaitDataError(f'Cannot read trajectory {marker} from c3d file')
return mkrdata
def _get_c3d_subject_param(acq, param):
try:
param = _get_c3d_metadata_field(acq, 'PROCESSING', param)[0]
except RuntimeError:
logger.warning(f'Cannot get subject parameter {param}')
param = None
return param
def _get_metadata(c3dfile):
"""Read trial and subject metadata from c3d file.
See read_data.get_metadata() for details.
"""
c3dfile = Path(c3dfile)
trialname = c3dfile.stem
sessionpath = c3dfile.parent
acq = _get_c3dacq(c3dfile)
# frame offset (start of trial data in frames)
offset = acq.GetFirstFrame()
lastfr = acq.GetLastFrame()
length = lastfr - offset + 1 # or acq.GetPointFrameNumber()
framerate = acq.GetPointFrequency()
analograte = acq.GetAnalogFrequency()
samplesperframe = acq.GetNumberAnalogSamplePerFrame()
# count forceplates
fpe = btk.btkForcePlatformsExtractor()
fpe.SetInput(acq)
fpe.Update()
n_forceplates = len(list(btk.Iterate(fpe.GetOutput())))
# get markers
try:
markers = _get_c3d_metadata_field(acq, 'POINT', 'LABELS')
except RuntimeError:
markers = list()
# XXX: not sure what the '*xx' markers are, but delete them for now
markers = [m for m in markers if m[0] != '*']
# get events
events = GaitEvents()
for i in btk.Iterate(acq.GetEvents()):
fr = i.GetFrame()
context = i.GetContext()[0]
ev_type = i.GetLabel()
if ev_type == 'Foot Strike':
ev_type_ours = 'strike'
elif ev_type == 'Foot Off':
ev_type_ours = 'toeoff'
else:
ev_type_ours = ev_type
fr_offset = fr - offset
ev = GaitEvent(fr_offset, ev_type_ours, context)
events.append(ev)
# get subject info
try:
subj_name = _get_c3d_metadata_field(acq, 'SUBJECTS', 'NAMES')[0]
except RuntimeError:
logger.warning('Cannot get subject name')
subj_name = 'Unknown'
subj_params = defaultdict(lambda: None)
try:
par_names = _get_c3d_metadata_subfields(acq, 'PROCESSING')
except RuntimeError:
logger.warning(
f'{c3dfile} is missing the PROCESSING section that contains '
'subject info (bodyweight etc.)'
)
else:
subj_params.update({par: _get_c3d_subject_param(acq, par) for par in par_names})
return {
'trialname': trialname,
'sessionpath': sessionpath,
'offset': offset,
'framerate': framerate,
'analograte': analograte,
'subject_name': subj_name,
'subj_params': subj_params,
'events': events,
'length': length,
'samplesperframe': samplesperframe,
'n_forceplates': n_forceplates,
'markers': markers,
}
def _get_model_data(c3dfile, model):
"""Read model output variables (e.g. Plug-in Gait).
See read_data.get_model_data for details.
"""
modeldata = dict()
acq = _get_c3dacq(c3dfile)
var_dims = (3, acq.GetPointFrameNumber())
for var in model.read_vars:
try:
vals = acq.GetPoint(var).GetValues()
modeldata[var] = np.transpose(np.squeeze(vals))
except RuntimeError:
logger.info(f'cannot read model variable {var}, returning nans')
data = np.empty(var_dims)
data[:] = np.nan
modeldata[var] = data
# c3d stores scalars as last dim of 3-d array
if model.read_strategy == 'last':
modeldata[var] = modeldata[var][2, :]
return modeldata
def _get_1_forceplate_data(plate):
"""Read data of a single forceplate from a c3d file.
plate is an instance of btk.btkForcePlatform.
"""
READ_CHS = ['Fx', 'Fy', 'Fz', 'Mx', 'My', 'Mz']
if plate.GetType() != 2:
# Nexus should always write forceplates as type 2
raise GaitDataError('Only type 2 forceplates are supported for now')
rawdata = dict()
data = dict()
for ch in btk.Iterate(plate.GetChannels()):
label = ch.GetLabel()[-3:-1] # strip descriptor and plate number
rawdata[label] = np.squeeze(ch.GetData().GetValues())
if not all([ch in rawdata for ch in READ_CHS]):
logger.warning(f'could not read force/moment data for plate {plate}')
return None
F = np.stack([rawdata['Fx'], rawdata['Fy'], rawdata['Fz']], axis=1)
M = np.stack([rawdata['Mx'], rawdata['My'], rawdata['Mz']], axis=1)
# we need to calculate the center of pressure, since it's not in the C3D
# dz is the plate thickness (from moment origin to physical origin) needed
# for center of pressure calculations
dz = np.abs(plate.GetOrigin()[2])
cop = center_of_pressure(F, M, dz) # in plate local coords
Ftot = np.linalg.norm(F, axis=1)
# locations of +x+y, -x+y, -x-y, +x-y plate corners in world coords
# (in that order)
cor = plate.GetCorners()
wT = np.mean(cor, axis=1) # translation vector, plate -> world
# upper and lower bounds of forceplate
ub = np.max(cor, axis=1)
lb = np.min(cor, axis=1)
# plate unit vectors in world system
px = cor[:, 0] - cor[:, 1]
py = cor[:, 0] - cor[:, 3]
pz = np.array([0, 0, -1])
P = np.stack([px, py, pz], axis=1)
wR = P / np.linalg.norm(P, axis=0) # rotation matrix, plate -> world
# check whether CoP stays inside forceplate area and clip if necessary
cop_w = _change_coords(cop, wR, wT)
cop_wx = np.clip(cop_w[:, 0], lb[0], ub[0])
cop_wy = np.clip(cop_w[:, 1], lb[1], ub[1])
if not (cop_wx == cop_w[:, 0]).all() and (cop_wy == cop_w[:, 1]).all():
logger.warning(
'center of pressure outside forceplate bounds, clipping to plate'
)
cop[:, 0] = cop_wx
cop[:, 1] = cop_wy
# XXX moment and force transformations may still be wrong
data['F'] = _change_coords(-F, wR, 0) # not sure why sign flip needed
data['Ftot'] = Ftot
data['M'] = _change_coords(-M, wR, 0) # not sure why sign flip needed
data['CoP'] = cop_w
data['wR'] = wR
data['wT'] = wT
data['plate_corners'] = cor.T
return data
def _get_forceplate_data(c3dfile):
"""Read data of all forceplates from c3d file.
See read_data.get_forceplate_data() for details.
"""
logger.debug(f'reading forceplate data from {c3dfile}')
acq = _get_c3dacq(c3dfile)
fpe = btk.btkForcePlatformsExtractor()
fpe.SetInput(acq)
fpe.Update()
fpdata = list()
for eclipse_ind, plate in enumerate(btk.Iterate(fpe.GetOutput()), 1):
logger.debug(f'reading from plate {eclipse_ind}')
data = _get_1_forceplate_data(plate)
if data is not None:
# generate the Eclipse key
data['eclipse_key'] = f'FP{eclipse_ind}'
fpdata.append(data)
return fpdata