Source code for utils.intan.load_intan_rhd_format

#! /bin/env python
#
# Michael Gibson 17 July 2015

import math
import os
import struct
import sys
import time

import numpy as np

from .intanutil.data_to_result import data_to_result
from .intanutil.get_bytes_per_data_block import get_bytes_per_data_block
from .intanutil.notch_filter import notch_filter
from .intanutil.read_header import read_header
from .intanutil.read_one_data_block import read_one_data_block


[docs]def read_rhd(filename): """Reads Intan Technologies RHD2000 data file generated by evaluation board GUI. Data are returned in a dictionary, for future extensibility. """ tic = time.time() fid = open(filename, "rb") filesize = os.path.getsize(filename) header = read_header(fid) # print('Found {} amplifier channel{}.'.format(header['num_amplifier_channels'], # plural(header['num_amplifier_channels']))) # print('Found {} auxiliary input channel{}.'.format(header['num_aux_input_channels'], # plural(header['num_aux_input_channels']))) # print('Found {} supply voltage channel{}.'.format(header['num_supply_voltage_channels'], # plural(header['num_supply_voltage_channels']))) # print('Found {} board ADC channel{}.'.format(header['num_board_adc_channels'], # plural(header['num_board_adc_channels']))) # print('Found {} board digital input channel{}.'.format(header['num_board_dig_in_channels'], # plural(header['num_board_dig_in_channels']))) # print('Found {} board digital output channel{}.'.format(header['num_board_dig_out_channels'], # plural(header['num_board_dig_out_channels']))) # print('Found {} temperature sensors channel{}.'.format(header['num_temp_sensor_channels'], # plural(header['num_temp_sensor_channels']))) # print('') # Determine how many samples the data file contains. bytes_per_block = get_bytes_per_data_block(header) # How many data blocks remain in this file? data_present = False bytes_remaining = filesize - fid.tell() if bytes_remaining > 0: data_present = True if bytes_remaining % bytes_per_block != 0: raise Exception( "Something is wrong with file size : should have a whole number of data blocks" ) num_data_blocks = int(bytes_remaining / bytes_per_block) num_amplifier_samples = 60 * num_data_blocks num_aux_input_samples = 15 * num_data_blocks num_supply_voltage_samples = 1 * num_data_blocks num_board_adc_samples = 60 * num_data_blocks num_board_dig_in_samples = 60 * num_data_blocks num_board_dig_out_samples = 60 * num_data_blocks record_time = num_amplifier_samples / header["sample_rate"] # if data_present: # print('File contains {:0.3f} seconds of data. Amplifiers were sampled at {:0.2f} kS/s.'.format(record_time, # header[ # 'sample_rate'] / 1000)) # else: # print('Header file contains no data. Amplifiers were sampled at {:0.2f} kS/s.'.format( # header['sample_rate'] / 1000)) if data_present: # Pre-allocate memory for data. # print('') # print('Allocating memory for data...') data = {} if (header["version"]["major"] == 1 and header["version"]["minor"] >= 2) or ( header["version"]["major"] > 1 ): data["t_amplifier"] = np.zeros(num_amplifier_samples, dtype=np.int) else: data["t_amplifier"] = np.zeros(num_amplifier_samples, dtype=np.uint) data["amplifier_data"] = np.zeros( [header["num_amplifier_channels"], num_amplifier_samples], dtype=np.uint ) data["aux_input_data"] = np.zeros( [header["num_aux_input_channels"], num_aux_input_samples], dtype=np.uint ) data["supply_voltage_data"] = np.zeros( [header["num_supply_voltage_channels"], num_supply_voltage_samples], dtype=np.uint, ) data["temp_sensor_data"] = np.zeros( [header["num_temp_sensor_channels"], num_supply_voltage_samples], dtype=np.uint, ) data["board_adc_data"] = np.zeros( [header["num_board_adc_channels"], num_board_adc_samples], dtype=np.uint ) data["board_dig_in_data"] = np.zeros( [header["num_board_dig_in_channels"], num_board_dig_in_samples], dtype=np.uint, ) data["board_dig_in_raw"] = np.zeros(num_board_dig_in_samples, dtype=np.uint) data["board_dig_out_data"] = np.zeros( [header["num_board_dig_out_channels"], num_board_dig_out_samples], dtype=np.uint, ) data["board_dig_out_raw"] = np.zeros(num_board_dig_out_samples, dtype=np.uint) # Read sampled data from file. # print('Reading data from file...') # Initialize indices used in looping indices = {} indices["amplifier"] = 0 indices["aux_input"] = 0 indices["supply_voltage"] = 0 indices["board_adc"] = 0 indices["board_dig_in"] = 0 indices["board_dig_out"] = 0 print_increment = 10 percent_done = print_increment for i in range(num_data_blocks): read_one_data_block(data, header, indices, fid) # Increment indices indices["amplifier"] += 60 indices["aux_input"] += 15 indices["supply_voltage"] += 1 indices["board_adc"] += 60 indices["board_dig_in"] += 60 indices["board_dig_out"] += 60 fraction_done = 100 * (1.0 * i / num_data_blocks) if fraction_done >= percent_done: # print('{}% done...'.format(percent_done)) percent_done = percent_done + print_increment # Make sure we have read exactly the right amount of data. bytes_remaining = filesize - fid.tell() if bytes_remaining != 0: raise Exception("Error: End of file not reached.") # Close data file. fid.close() if data_present: # print('Parsing data...') # Extract digital input channels to separate variables. for i in range(header["num_board_dig_in_channels"]): data["board_dig_in_data"][i, :] = np.not_equal( np.bitwise_and( data["board_dig_in_raw"], (1 << header["board_dig_in_channels"][i]["native_order"]), ), 0, ) # Extract digital output channels to separate variables. for i in range(header["num_board_dig_out_channels"]): data["board_dig_out_data"][i, :] = np.not_equal( np.bitwise_and( data["board_dig_out_raw"], (1 << header["board_dig_out_channels"][i]["native_order"]), ), 0, ) # Scale voltage levels appropriately. data["amplifier_data"] = np.multiply( 0.195, (data["amplifier_data"].astype(np.int32) - 32768) ) # units = microvolts data["aux_input_data"] = np.multiply( 37.4e-6, data["aux_input_data"] ) # units = volts data["supply_voltage_data"] = np.multiply( 74.8e-6, data["supply_voltage_data"] ) # units = volts if header["eval_board_mode"] == 1: data["board_adc_data"] = np.multiply( 152.59e-6, (data["board_adc_data"].astype(np.int32) - 32768) ) # units = volts else: data["board_adc_data"] = np.multiply( 50.354e-6, data["board_adc_data"] ) # units = volts data["temp_sensor_data"] = np.multiply( 0.01, data["temp_sensor_data"] ) # units = deg C # Check for gaps in timestamps. num_gaps = np.sum( np.not_equal(data["t_amplifier"][1:] - data["t_amplifier"][:-1], 1) ) if num_gaps == 0: pass # print('No missing timestamps in data.') else: print( "Warning: {0} gaps in timestamp data found. Time scale will not be uniform!".format( num_gaps ) ) # Scale time steps (units = seconds). data["t_amplifier"] = data["t_amplifier"] / header["sample_rate"] data["t_aux_input"] = data["t_amplifier"][range(0, len(data["t_amplifier"]), 4)] data["t_supply_voltage"] = data["t_amplifier"][ range(0, len(data["t_amplifier"]), 60) ] data["t_board_adc"] = data["t_amplifier"] data["t_dig"] = data["t_amplifier"] data["t_temp_sensor"] = data["t_supply_voltage"] # If the software notch filter was selected during the recording, apply the # same notch filter to amplifier data here. if header["notch_filter_frequency"] > 0: # print('Applying notch filter...') print_increment = 10 percent_done = print_increment for i in range(header["num_amplifier_channels"]): data["amplifier_data"][i, :] = notch_filter( data["amplifier_data"][i, :], header["sample_rate"], header["notch_filter_frequency"], 10, ) fraction_done = 100 * (i / header["num_amplifier_channels"]) if fraction_done >= percent_done: # print('{}% done...'.format(percent_done)) percent_done += print_increment else: data = [] # Move variables to result struct. result = data_to_result(header, data, data_present) # print('Done! Elapsed time: {0:0.1f} seconds'.format(time.time() - tic)) return result
[docs]def plural(n): """Utility function to optionally pluralize words based on the value of n.""" if n == 1: return "" else: return "s"
if __name__ == "__main__": a = read_rhd(sys.argv[1]) # print a