Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 78 additions & 9 deletions galvani/BioLogic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import time
from datetime import date, datetime, timedelta
from collections import defaultdict, OrderedDict
import warnings

import numpy as np

UNKNOWN_COLUMN_TYPE_HIERARCHY = ("<f8", "<f4", "<u4", "<u2", "<u1")


def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of
Expand Down Expand Up @@ -316,6 +319,7 @@ def MPTfileCSV(file_or_path):
174: ("<Ewe>/V", "<f4"),
178: ("(Q-Qo)/C", "<f4"),
179: ("dQ/C", "<f4"),
182: ("step time/s", "<f8"),
211: ("Q charge/discharge/mA.h", "<f8"),
212: ("half cycle", "<u4"),
213: ("z cycle", "<u4"),
Expand Down Expand Up @@ -427,18 +431,23 @@ def parse_BioLogic_date(date_text):
return date(tm.tm_year, tm.tm_mon, tm.tm_mday)


def VMPdata_dtype_from_colIDs(colIDs):
def VMPdata_dtype_from_colIDs(colIDs, error_on_unknown_column: bool = True):
"""Get a numpy record type from a list of column ID numbers.

The binary layout of the data in the MPR file is described by the sequence
of column ID numbers in the file header. This function converts that
sequence into a numpy dtype which can then be used to load data from the
sequence into a list that can be used with numpy dtype load data from the
file with np.frombuffer().

Some column IDs refer to small values which are packed into a single byte.
The second return value is a dict describing the bit masks with which to
extract these columns from the flags byte.

If error_on_unknown_column is True, an error will be raised if an unknown
column ID is encountered. If it is False, a warning will be emited and attempts
will be made to read the column with a few different dtypes.


"""
type_list = []
field_name_counts = defaultdict(int)
Expand Down Expand Up @@ -468,11 +477,19 @@ def VMPdata_dtype_from_colIDs(colIDs):
unique_field_name = field_name
type_list.append((unique_field_name, field_type))
else:
raise NotImplementedError(
"Column ID {cid} after column {prev} "
"is unknown".format(cid=colID, prev=type_list[-1][0])
if error_on_unknown_column:
raise NotImplementedError(
"Column ID {cid} after column {prev} is unknown".format(
cid=colID, prev=type_list[-1][0]
)
)
warnings.warn(
"Unknown column ID %d -- will attempt to read as common dtypes"
% colID
)
return np.dtype(type_list), flags_dict
type_list.append(("unknown_colID_%d" % colID, UNKNOWN_COLUMN_TYPE_HIERARCHY[0]))

return type_list, flags_dict


def read_VMP_modules(fileobj, read_module_data=True):
Expand Down Expand Up @@ -543,7 +560,17 @@ class MPRfile:
enddate - The date when the experiment finished
"""

def __init__(self, file_or_path):
def __init__(self, file_or_path, error_on_unknown_column: bool = True):
"""Pass an EC-lab .mpr file to be parsed.

Parameters:
file_or_path: Either the open file data or a path to it.
error_on_unknown_column: Whether or not to raise an error if an
unknown column ID is encountered. A warning will be emited and
the column will be added 'unknown_<colID>', with an attempt to read
it with a few different dtypes.

"""
self.loop_index = None
if isinstance(file_or_path, str):
mpr_file = open(file_or_path, "rb")
Expand Down Expand Up @@ -595,8 +622,50 @@ def __init__(self, file_or_path):

assert not any(remaining_headers)

self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
self.data = np.frombuffer(main_data, dtype=self.dtype)
dtypes, self.flags_dict = VMPdata_dtype_from_colIDs(
column_types, error_on_unknown_column=error_on_unknown_column
)

unknown_cols = []
# Iteratively work through the unknown columns and try to read them
if not error_on_unknown_column:
for col, _ in dtypes:
if col.startswith("unknown_colID"):
unknown_cols.append(col)
if len(unknown_cols) > 3:
raise RuntimeError(
"Too many unknown columns to attempt to read combinatorially: %s"
% unknown_cols
)

if unknown_cols:
# create a list of all possible combinations of dtypes
# for the unknown columns
from itertools import product
perms = product(UNKNOWN_COLUMN_TYPE_HIERARCHY, repeat=len(unknown_cols))
for perm in perms:
for unknown_col_ind, c in enumerate(unknown_cols):
for ind, (col, _) in enumerate(dtypes):
if c == col:
dtypes[ind] = (col, perm[unknown_col_ind])

try:
self.dtype = np.dtype(dtypes)
self.data = np.frombuffer(main_data, dtype=self.dtype)
break
except ValueError:
continue
else:
raise RuntimeError(
"Unable to read data for unknown columns %s with any of the common dtypes %s",
unknown_cols,
UNKNOWN_COLUMN_TYPE_HIERARCHY
)

else:
self.dtype = np.dtype(dtypes)
self.data = np.frombuffer(main_data, dtype=self.dtype)

assert self.data.shape[0] == n_data_points

# No idea what these 'column types' mean or even if they are actually
Expand Down
2 changes: 1 addition & 1 deletion tests/test_BioLogic.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_colID_to_dtype(colIDs, expected):
return
expected_dtype = np.dtype(expected)
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
assert dtype == expected_dtype
assert np.dtype(dtype) == expected_dtype


@pytest.mark.parametrize(
Expand Down