Source code for pybdf

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#   Copyright (C) 2012-2013 Samuele Carcagno <sam.carcagno@gmail.com>
#   This file is part of pybdf

#    pybdf is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.

#    pybdf is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.

#    You should have received a copy of the GNU General Public License
#    along with pybdf.  If not, see <http://www.gnu.org/licenses/>.

"""
This module can be used to read the header and data from
24-bit BIOSEMI BDF files recorded with the ActiveTwo system.

 Examples
 --------
 >>> bdfRec = bdfRecording('res1.bdf') #create bdfRecording object
 >>> bdfRec.recordDuration #how many seconds the recording lasts
 >>> bdfRec.sampRate #sampling rate for each channel
 >>> #read 10 seconds of data from the first two channels
 >>> rec = bdfRec.getData(channels=[0, 1], beginning=0, end=10)
"""
from __future__ import nested_scopes, generators, division, absolute_import, with_statement, print_function, unicode_literals
import copy, numpy
import libforbdf
from numpy import concatenate, diff, where

__version__ = "0.1.15"

[docs]class bdfRecording: """ Class for dealing with BIOSEMI 24-bit BDF files. A bdfRecording object is created with the following syntax:: >>> bdfRec = bdfRecording('bdf_file.bdf') This reads the BDF header, but not the data. You need to use the getData() method to read the data. The full documentation of the BDF file format can be found here: http://www.biosemi.com/faq/file_format.htm Attributes ---------- idCode : str Identification code subjId : str Local subject identification recId : str Local recording identification startDate : str Recording start date startTime : str Recording start time nBytes : int Number of bytes occupied by the bdf header versionDataFormat : str Version of data format nDataRecords : int Number of data records "-1" if unknown recordDuration : float Duration of a data record, in seconds nChannels : int Number of channels in data record chanLabels : list of str Channel labels transducer : list of str Transducer type physDim : str Physical dimension of channels physMin : list of int Physical minimum in units of physical dimension physMax : list of int Physical maximum in units of physical dimension digMin : list of int Digital minimum digMax : list of int Digital maximum prefilt : list of str Prefiltering nSampRec : list of int Number of samples in each data record reserved : list of str Reserved scaleFactor : list of floats Scaling factor for digital to physical dimension sampRate : list of int Recording sampling rate statusChanIdx : int Index of the status channel nDataChannels : int Number of data channels containing data (rather than trigger codes) dataChanLabels : list of str Labels of the channels containing data (rather than trigger codes) """ def __init__(self, fileName): self.fileName = fileName f = open(self.fileName, "rb") #except IOError: # print("Could not open file. Check that that the file name\ # is correct") # return #python already throws an IOError if the file #does not exist, but maybe we should check that #the file is indeed of type BDF self.idCodeNonASCII = f.read(1) self.idCode = bytes.decode(f.read(7), 'ascii') self.subjId = bytes.decode(f.read(80), 'ascii') self.recId = bytes.decode(f.read(80), 'ascii') self.startDate = bytes.decode(f.read(8), 'ascii') self.startTime = bytes.decode(f.read(8), 'ascii') self.nBytes = int(bytes.decode(f.read(8), 'ascii')) self.versionDataFormat = bytes.decode(f.read(44), 'ascii') self.nDataRecords = int(bytes.decode(f.read(8), 'ascii')) self.recordDuration = float(bytes.decode(f.read(8), 'ascii').strip()) self.nChannels = int(bytes.decode(f.read(4), 'ascii')) self.chanLabels = [] self.transducer = [] self.physDim = [] self.physMin = [] self.physMax = [] self.digMin = [] self.digMax = [] self.prefilt = [] self.nSampRec = [] self.reserved = [] self.scaleFactor = [] self.sampRate = [] self.duration = self.recordDuration * self.nDataRecords for i in range(self.nChannels): self.chanLabels.append(bytes.decode(f.read(16), 'ascii').strip()) for i in range(self.nChannels): self.transducer.append(bytes.decode(f.read(80), 'ascii').strip()) for i in range(self.nChannels): self.physDim.append(bytes.decode(f.read(8), 'ascii').strip()) for i in range(self.nChannels): self.physMin.append(int(bytes.decode(f.read(8), 'ascii'))) for i in range(self.nChannels): self.physMax.append(int(bytes.decode(f.read(8), 'ascii'))) for i in range(self.nChannels): self.digMin.append(int(bytes.decode(f.read(8), 'ascii'))) for i in range(self.nChannels): self.digMax.append(int(bytes.decode(f.read(8), 'ascii'))) for i in range(self.nChannels): self.prefilt.append(bytes.decode(f.read(80), 'ascii').strip()) for i in range(self.nChannels): self.nSampRec.append(int(bytes.decode(f.read(8), 'ascii'))) for i in range(self.nChannels): self.reserved.append(bytes.decode(f.read(32), 'ascii')) for i in range(self.nChannels): self.scaleFactor.append((self.physMax[i] - self.physMin[i]) / (self.digMax[i] - self.digMin[i])) self.statusChanIdx = self.chanLabels.index("Status") self.nDataChannels = self.nChannels - 1 self.dataChanLabels = copy.copy(self.chanLabels) self.dataChanLabels.pop() self.sampRate = list(numpy.array(numpy.round(numpy.array(self.nSampRec) / self.recordDuration), dtype=numpy.int16)) f.close()
[docs] def getData(self, beginning=0, end=None, channels=None, eventTable=True, trigChan=False, sysCodeChan=False): """ Read the data from a bdfRecording object Parameters ---------- beginning : int Start time of data chunk to read (seconds). end : int End time of data chunk to read (seconds). channels : list of integers or strings Channels to read. Both channel numbers, or channel names are accepted. Note that channel numbers are indexed starting from *zero*. eventTable : boolean If True, return the triggers event table trigChan : boolean If True, return the channel containing the triggers sysCodeChan : boolean If True, return the channel containing the system codes Returns ------- rec : a dictionary with three keys - data : an array of floats with dimenions nChannels X nDataPoints - chanLabels : a list containing the labels of the channels that were read, in the same order they are inserted in the data matrix - trigChannel : an array of integers with the triggers in decimal format - sysCodeChannel : an array of integers with the system codes in decimal format - eventTable : a dictionary with the following keys - code : array of ints The trigger codes - idx : array of ints The indexes of the trigger codes - dur : array of floats The duration of the triggers in seconds Examples -------- >>> x = bdfRecording('res1.bdf') >>> rec = x.getData(channels=[0, 2], beginning=0, end=10) """ if end is None: #read all data end = self.nDataRecords if channels is None: #read all data channels channels = copy.copy(self.dataChanLabels) if len(channels) > self.nDataChannels: print("Requested channels more than available channels. Exiting") return for i in range(len(channels)): #if some or all channels were given as labels convert them to indexes if isinstance(channels[i], str): channels[i] = self.dataChanLabels.index(channels[i]) channels = sorted(channels) chanLabels = [] for i in range(len(channels)): chanLabels.append(self.dataChanLabels[channels[i]]) nChannelsToRead = len(channels) f = open(self.fileName, "rb") recordsToRead = end - beginning data, statchan = libforbdf.read_channels(self.fileName, beginning, end, self.nChannels, self.nSampRec, self.statusChanIdx) data = numpy.array(data*self.scaleFactor[0], dtype=numpy.float32) trigChannel = statchan[0,:] sysCodeChannel = statchan[1,:] chanToDel = [] for c in range(self.nDataChannels): if c not in channels: chanToDel.append(c) if len(chanToDel) > 0: data = numpy.delete(data, numpy.array(chanToDel, dtype=numpy.int16), axis=0) #event table evtTab = {} if eventTable == True: startPoints = concatenate(([0], where(diff(trigChannel) != 0)[0] + 1)) stopPoints = concatenate((where(diff(trigChannel) != 0)[0], [len(trigChannel)-1])) trigDurs = (stopPoints - startPoints)/self.sampRate[0] evt = trigChannel[startPoints] evtTab['code'] = evt evtTab['idx'] = startPoints evtTab['dur'] = trigDurs else: evtTab['code'] = None evtTab['idx'] = None evtTab['dur'] = None rec = {} rec['data'] = data if trigChan == True: rec['trigChan'] = trigChannel else: rec['trigChan'] = None if sysCodeChan == True: rec['sysCodeChan'] = statusChannel else: rec['sysCodeChan'] = None rec['chanLabels'] = chanLabels rec['eventTable'] = evtTab return rec