binda

Read and write binary data to and from Pandas DataFrames.

Created on Wed Sep 20 15:27:01 2023

@author: Jamie Cash

  1# -*- coding: utf-8 -*-
  2"""
  3Read and write binary data to and from Pandas DataFrames.
  4
  5Created on Wed Sep 20 15:27:01 2023
  6
  7@author: Jamie Cash
  8"""
  9
 10from enum import Enum
 11import numpy as np
 12import pandas as pd
 13from typing import List, Dict
 14import struct
 15
 16
 17class ByteOrder(Enum):
 18    """
 19    The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian
 20    or ByteOrder.BIG for big endian.
 21    """
 22    LITTLE = 'little'
 23    BIG = 'big'
 24
 25    
 26class Variable:
 27  """
 28  The metadata for a single variable.
 29
 30  Arguments:
 31    name (str): The name of the variable.
 32    size (int): The size of the variable in bytes.
 33    datatype (type): The datatye of the variable.
 34    offset (int, optional): The byte offset where the variable starts.
 35      If not specified, this will be calculated by Structure.
 36    byteorder (ByteOrder, optional): Whether the variable has a  little or big
 37      endian byte order. Default is ByteOrder.LITTLE.
 38    signed (bool, optional): Whether the variable is signed. Default is false.
 39  """
 40  name: str
 41  size: int
 42  datatype: type
 43  offset: int
 44  byteorder: ByteOrder
 45  signed: bool
 46
 47  def __init__(self, name:str, size:int, datatype:type, offset:int=None,
 48               byteorder:ByteOrder=ByteOrder.LITTLE, signed:bool=False):
 49    self.name = name
 50    self.size = size
 51    self.datatype = datatype
 52    self.offset = offset
 53    self.byteorder = byteorder
 54    self.signed = signed
 55
 56  @property
 57  def next_offset(self):
 58    """
 59    Gets the offset of the next variable.
 60    """
 61    return self.offset + self.size
 62
 63  def __len__(self):
 64    """
 65    The length of the variable.
 66    """
 67    return self.size
 68
 69  def __repr__(self):
 70    return f"name: {self.name}, offset: {self.offset}, size: {self.size}, " \
 71      f"datatype: {self.datatype}, byteorder: {self.byteorder}, " \
 72      f"signed: {self.signed}"
 73      
 74      
 75class Structure:
 76  """
 77  The definition of a data structure.
 78  
 79  Arguments:
 80    start (int): The start position of the data structure.
 81    variables (List of Variable): The definitions of all variables in the
 82      structure.
 83    rows (int, optional): If the data structure is repeating, specifies the
 84      number of times that it repeats. Default is 1 for a non repeating
 85      structure.
 86  """
 87  start: int
 88  variables:  List[Variable]
 89  rows: int
 90
 91  def __init__(self, start:int, variables:List[Variable], rows:int=1):
 92    self.start = start
 93    self.variables = variables
 94    self.rows = rows
 95
 96    # Calculate any missing variable offsets.
 97    last = None
 98    for variable in self.variables:
 99      if variable.offset is None:
100        if last is None:
101          variable.offset = self.start
102        else:
103          variable.offset = last.next_offset
104
105      last = variable
106
107  def __len__(self):
108    """
109    Length of the structure. This is the sum of the size of all variables
110    multipled by the number of rows.
111    """
112    return sum([len(i) for i in self.variables]) * self.rows
113
114  def __repr__(self):
115    return f"start: {self.start}, rows: {self.rows}, variables: " + \
116        f"{self.variables}"
117
118
119
120class DataHandler:
121  """
122  Reads and writes binary data to and from pandas DataFrames using the 
123  specification provided in [structures].
124
125  Arguments:
126      data (bytes): The data.
127      structures (dict of str, Structure, optional): The specification of any 
128        data structures. The dict key is the name of the structure that will 
129        be used when reading and updating. Additional structures can be addes
130        later using [add_structure]. If no structures are specified, then 
131        read_structure and write_structure cannot be used, however DataHandler
132        can still be used to read and write variables.
133      str_encode (str, optional): The string encoding. See 
134        https://docs.python.org/3/library/codecs.html#standard-encodings for 
135        a list of all encodings. Default is utf-8.
136  """
137
138  __data: bytes
139  __structures: Dict[str, Structure] = None
140  __str_encode: str
141
142  def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 
143               str_encode='utf-8'):
144    self.__data = data
145    self.__str_encode = str_encode
146
147    # Add the structures individually to benifit for boundary checks.
148    if structures is not None:
149      for structure_name in structures.keys():
150        self.add_structure(structure_name, structures[structure_name])
151
152
153  @property
154  def data(self):
155    return self.__data
156  
157  def add_structure(self, name: str, structure: Structure):
158    """
159    Adds a structure.
160
161    Arguments:
162      name (str): The name of the structure to add. If structure already exists 
163        with that name, it will be overwritten.
164      structure (Structure): The structure to add.
165    """
166    # Check that the structure is within the bounds of the data
167    self.__check_bounds(structure.start, len(structure))
168
169    # Create the structure if not already created and add the structure.
170    if self.__structures is None:
171      self.__structures = {name: structure}
172    else:
173      self.__structures[name] = structure
174
175  def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str:
176    """
177    Returns data as a easily readable string of hexadecimal 
178    characters.
179
180    Arguments:
181      start (int, optional): The starting postion to read from. Default is the
182        first byte of the data.
183      length (int, optional): The number of bytes to return. Default is the 
184        number of bytes available to read in the data given the specified 
185        [start].
186      seperator (str, optional): The seperator character for hex bytes. Default 
187        is ':'.
188
189    Returns:
190      (str): The bytes as a string of hexadecimal byte representations seperated 
191        by [seperator].
192    """
193    # Calculate and set length if not specified.
194    if length is None:
195      length = len(self.data) - start
196
197    # Check that bounds
198    self.__check_bounds(start, length)
199    
200    # Return the data as a hex string
201    return self.data[start:start+length].hex(seperator)
202
203  def read_structure(self, name: str) -> pd.DataFrame:
204    """
205    Reads this instances [data] into a dataframe using the specification 
206    provided by [structures].
207
208    Arguments:
209      name (str): The name of the structure to read.
210
211    Returns:
212      (DataFrame): The dataframe containing the converted data from the
213      structure.
214    """
215    # Assert that structure exists.
216    assert self.__structures is not None, "There are no structures defined."
217    assert name in self.__structures, f"Structure with name '{name}' does not "\
218      + "exist. Add it using [add_structure] before reading."
219
220    # Get the structure
221    structure = self.__structures[name]
222
223    # Iterate rows and variables, building up the data
224    data = []
225    for row in range(structure.rows):
226      row_data = []
227      for var in structure.variables:
228        # Get data for the variable and add to row
229
230        # Get the offset for the row. This is the size of the row * the row 
231        # num.
232        offset = int(row * len(structure) / structure.rows)
233
234        # Get the data for the row
235        row_data.append(self.read_variable(var, offset))
236      data.append(row_data)
237
238      # Get the column names
239      columns = []
240      for var in structure.variables:
241        columns.append(var.name)
242
243    # Create the dataframe and return it
244    df = pd.DataFrame(data=data, columns=columns)
245    return df
246    
247  def read_variable(self, variable:Variable, offset:int=None) -> object:
248    """
249    Reads a variable from [data] and converts it to the data type specified in
250    the [variable].
251
252    Arguments:
253      variable (Variable): The name of the variable to read.
254      offset (int, optional): Offset to apply. Added to variable offset. Used
255        to read repeating structures. If not specified, then no offset is 
256        applied.
257
258    Returns:
259      object: The data converted to the data type specified in [variable].
260    """
261
262    # Get the data as bytes
263    full_offset = variable.offset if offset is None else variable.offset + \
264        offset
265    self.__check_bounds(full_offset, variable.size)
266    bytes_data = self.__read(full_offset, variable.size)
267
268    # Convert it to the correct data type
269    converted_data = None
270    #print(f"variable.byteorder as str: {str(variable.byteorder)}")
271    if variable.datatype == int:
272      converted_data = int.from_bytes(bytes_data,
273                                      byteorder=variable.byteorder.value,
274                                      signed=variable.signed)
275    elif variable.datatype == str:
276      converted_data = (str( bytes_data.decode(self.__str_encode)))
277    elif variable.datatype == bool:
278      converted_data = bool.from_bytes(bytes_data,
279                                       byteorder=variable.byteorder.value,
280                                       signed=variable.signed)
281    elif variable.datatype == float:
282      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
283      converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0]
284    else:
285       # For any other datatype, keep as bytes.
286       converted_data = bytes_data
287
288    # Return the converted data
289    return converted_data
290
291  def write_structure(self, name:str, df:pd.DataFrame):
292    """
293    Writes a dataframe back to this instances [data] using the specification 
294    provided in [structures].
295
296    Arguments:
297      name (str): The name of the structure to write.
298      df (pd.DataFrame): The dataframe to write.
299    """
300     # Assert that structure exists.
301    assert self.__structures is not None, "There are no structures defined."
302    assert name in self.__structures, f"Structure with name '{name}' does not "\
303      + "exist. Add it using [add_structure] before writing to it."
304
305    # Get the structure
306    structure = self.__structures[name]
307
308    # Check that the column names of the dataframe matches the variable names.
309    for i in range(len(df.columns)):
310      assert df.columns[i] == structure.variables[i].name, \
311        f"Structure name '{structure.variables[i].name}' does not match " \
312        + f"DataFrame column name 'df.columns[i]'."
313
314    # Check that number of rows matches [structure.rows]
315    assert df.shape[0] == structure.rows, \
316      f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\
317        + f"rows in structure ({structure.rows})."
318
319    # Iterate the rows of the dataframe saving its contents to [data]
320    for row in range(structure.rows):
321      row_data = df.iloc[row]
322
323      for var in structure.variables:
324        # Calculate the offset as the offset of the row + the offset of the
325        # variable.
326        offset = int(row * len(structure) / structure.rows)
327
328        # Get the data
329        item = row_data[var.name]
330
331        # Write the variable
332        self.write_variable(item, var, offset)
333        
334  def write_variable(self, data:object, variable:Variable, offset:int=None):
335    """
336    Writes a variable to the data, converts it back to bytes.
337
338    Arguments:
339      data (object): The data to write.
340      variable (Variable): The definition of the variable to write.
341      offset (int, optional): Offset to apply. Added to variable offset. Used
342        to write repeating structures. If not specified, then no offset is 
343        applied.
344    """
345    # Convert to native types.
346    if isinstance(data, np.integer):
347      data = int(data)
348    elif type(data) in [bool, np.bool_]:
349      data = bool(data)
350    elif isinstance(data, str):
351      data = str(data)
352    elif isinstance(data, float):
353      data = float(data)
354      
355    # Confirm that the provided data type matches the type specified in the
356    # [variable].
357    assert type(data) == variable.datatype, \
358      f"Datatype {type(data)} is not a subtype of {variable.datatype}"
359
360    # Calculate the full offset. Any offset passed to this method + the 
361    # variable offset.
362    full_offset = variable.offset if offset is None else variable.offset \
363        + offset
364    
365    # Check the bounds
366    self.__check_bounds(full_offset, variable.size)
367
368     # Write the data.
369    if variable.datatype == str:
370      self.__write(full_offset, data.encode(self.__str_encode))
371    elif variable.datatype in [int, bool]:
372      self.__write(full_offset,
373                   data.to_bytes(variable.size,
374                                 byteorder=variable.byteorder.value,
375                                 signed=variable.signed))
376    elif variable.datatype == float:
377      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
378      self.__write(full_offset, struct.pack(f'{endian_sign}f', data))
379    else:
380      self.__write(full_offset, data)
381
382  def __check_bounds(self, offset:int, length:int):
383    """
384    Checks that the offset and length is within the bounds of the data.
385
386     Arguments:
387      offset (int): The start position to test.
388      length: The number of bytes to test.
389    """
390    assert offset < len(self.data), f"Offset {offset} is out of bounds for "\
391      + f"data of length {len(self.data)}."
392    
393    assert offset + length <= len(self.data), f"Length {length} is out of "\
394      + f"bounds for data length {len(self.data)} starting at {offset}."
395    
396
397  def __read(self, offset:int, length:int) -> bytes:
398    """
399    Gets the bytes from [data] at offset.
400
401    Arguments:
402      offset (int): The position of the data to read. The byte at offset
403        position and [length] following bytes will be returned.
404      length: The number of bytes to return.
405
406    Returns:
407      bytes: The [length] bytes starting at [offset]
408    """
409    self.__check_bounds(offset, length)
410    return self.__data[offset:offset+length]
411
412
413  def __write(self, offset:int, value:bytes):
414    """
415    Writes [value] to [data] at position [offset].
416
417    Arguments:
418      offset (int): The position of the data to update. The byte at offset
419        position and any following bytes up to the length of [value] will be
420        updated.
421      value: The value to write.
422    """
423    self.__check_bounds(offset, len(value))
424    self.__data = self.__data[:offset] + value \
425        + self.__data[offset+len(value):]
class ByteOrder(enum.Enum):
18class ByteOrder(Enum):
19    """
20    The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian
21    or ByteOrder.BIG for big endian.
22    """
23    LITTLE = 'little'
24    BIG = 'big'

The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian or ByteOrder.BIG for big endian.

LITTLE = <ByteOrder.LITTLE: 'little'>
BIG = <ByteOrder.BIG: 'big'>
Inherited Members
enum.Enum
name
value
class Variable:
27class Variable:
28  """
29  The metadata for a single variable.
30
31  Arguments:
32    name (str): The name of the variable.
33    size (int): The size of the variable in bytes.
34    datatype (type): The datatye of the variable.
35    offset (int, optional): The byte offset where the variable starts.
36      If not specified, this will be calculated by Structure.
37    byteorder (ByteOrder, optional): Whether the variable has a  little or big
38      endian byte order. Default is ByteOrder.LITTLE.
39    signed (bool, optional): Whether the variable is signed. Default is false.
40  """
41  name: str
42  size: int
43  datatype: type
44  offset: int
45  byteorder: ByteOrder
46  signed: bool
47
48  def __init__(self, name:str, size:int, datatype:type, offset:int=None,
49               byteorder:ByteOrder=ByteOrder.LITTLE, signed:bool=False):
50    self.name = name
51    self.size = size
52    self.datatype = datatype
53    self.offset = offset
54    self.byteorder = byteorder
55    self.signed = signed
56
57  @property
58  def next_offset(self):
59    """
60    Gets the offset of the next variable.
61    """
62    return self.offset + self.size
63
64  def __len__(self):
65    """
66    The length of the variable.
67    """
68    return self.size
69
70  def __repr__(self):
71    return f"name: {self.name}, offset: {self.offset}, size: {self.size}, " \
72      f"datatype: {self.datatype}, byteorder: {self.byteorder}, " \
73      f"signed: {self.signed}"

The metadata for a single variable.

Arguments:
  • name (str): The name of the variable.
  • size (int): The size of the variable in bytes.
  • datatype (type): The datatye of the variable.
  • offset (int, optional): The byte offset where the variable starts. If not specified, this will be calculated by Structure.
  • byteorder (ByteOrder, optional): Whether the variable has a little or big endian byte order. Default is ByteOrder.LITTLE.
  • signed (bool, optional): Whether the variable is signed. Default is false.
Variable( name: str, size: int, datatype: type, offset: int = None, byteorder: ByteOrder = <ByteOrder.LITTLE: 'little'>, signed: bool = False)
48  def __init__(self, name:str, size:int, datatype:type, offset:int=None,
49               byteorder:ByteOrder=ByteOrder.LITTLE, signed:bool=False):
50    self.name = name
51    self.size = size
52    self.datatype = datatype
53    self.offset = offset
54    self.byteorder = byteorder
55    self.signed = signed
name: str
size: int
datatype: type
offset: int
byteorder: ByteOrder
signed: bool
next_offset

Gets the offset of the next variable.

class Structure:
 76class Structure:
 77  """
 78  The definition of a data structure.
 79  
 80  Arguments:
 81    start (int): The start position of the data structure.
 82    variables (List of Variable): The definitions of all variables in the
 83      structure.
 84    rows (int, optional): If the data structure is repeating, specifies the
 85      number of times that it repeats. Default is 1 for a non repeating
 86      structure.
 87  """
 88  start: int
 89  variables:  List[Variable]
 90  rows: int
 91
 92  def __init__(self, start:int, variables:List[Variable], rows:int=1):
 93    self.start = start
 94    self.variables = variables
 95    self.rows = rows
 96
 97    # Calculate any missing variable offsets.
 98    last = None
 99    for variable in self.variables:
100      if variable.offset is None:
101        if last is None:
102          variable.offset = self.start
103        else:
104          variable.offset = last.next_offset
105
106      last = variable
107
108  def __len__(self):
109    """
110    Length of the structure. This is the sum of the size of all variables
111    multipled by the number of rows.
112    """
113    return sum([len(i) for i in self.variables]) * self.rows
114
115  def __repr__(self):
116    return f"start: {self.start}, rows: {self.rows}, variables: " + \
117        f"{self.variables}"

The definition of a data structure.

Arguments:
  • start (int): The start position of the data structure.
  • variables (List of Variable): The definitions of all variables in the structure.
  • rows (int, optional): If the data structure is repeating, specifies the number of times that it repeats. Default is 1 for a non repeating structure.
Structure(start: int, variables: List[Variable], rows: int = 1)
 92  def __init__(self, start:int, variables:List[Variable], rows:int=1):
 93    self.start = start
 94    self.variables = variables
 95    self.rows = rows
 96
 97    # Calculate any missing variable offsets.
 98    last = None
 99    for variable in self.variables:
100      if variable.offset is None:
101        if last is None:
102          variable.offset = self.start
103        else:
104          variable.offset = last.next_offset
105
106      last = variable
start: int
variables: List[Variable]
rows: int
class DataHandler:
121class DataHandler:
122  """
123  Reads and writes binary data to and from pandas DataFrames using the 
124  specification provided in [structures].
125
126  Arguments:
127      data (bytes): The data.
128      structures (dict of str, Structure, optional): The specification of any 
129        data structures. The dict key is the name of the structure that will 
130        be used when reading and updating. Additional structures can be addes
131        later using [add_structure]. If no structures are specified, then 
132        read_structure and write_structure cannot be used, however DataHandler
133        can still be used to read and write variables.
134      str_encode (str, optional): The string encoding. See 
135        https://docs.python.org/3/library/codecs.html#standard-encodings for 
136        a list of all encodings. Default is utf-8.
137  """
138
139  __data: bytes
140  __structures: Dict[str, Structure] = None
141  __str_encode: str
142
143  def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 
144               str_encode='utf-8'):
145    self.__data = data
146    self.__str_encode = str_encode
147
148    # Add the structures individually to benifit for boundary checks.
149    if structures is not None:
150      for structure_name in structures.keys():
151        self.add_structure(structure_name, structures[structure_name])
152
153
154  @property
155  def data(self):
156    return self.__data
157  
158  def add_structure(self, name: str, structure: Structure):
159    """
160    Adds a structure.
161
162    Arguments:
163      name (str): The name of the structure to add. If structure already exists 
164        with that name, it will be overwritten.
165      structure (Structure): The structure to add.
166    """
167    # Check that the structure is within the bounds of the data
168    self.__check_bounds(structure.start, len(structure))
169
170    # Create the structure if not already created and add the structure.
171    if self.__structures is None:
172      self.__structures = {name: structure}
173    else:
174      self.__structures[name] = structure
175
176  def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str:
177    """
178    Returns data as a easily readable string of hexadecimal 
179    characters.
180
181    Arguments:
182      start (int, optional): The starting postion to read from. Default is the
183        first byte of the data.
184      length (int, optional): The number of bytes to return. Default is the 
185        number of bytes available to read in the data given the specified 
186        [start].
187      seperator (str, optional): The seperator character for hex bytes. Default 
188        is ':'.
189
190    Returns:
191      (str): The bytes as a string of hexadecimal byte representations seperated 
192        by [seperator].
193    """
194    # Calculate and set length if not specified.
195    if length is None:
196      length = len(self.data) - start
197
198    # Check that bounds
199    self.__check_bounds(start, length)
200    
201    # Return the data as a hex string
202    return self.data[start:start+length].hex(seperator)
203
204  def read_structure(self, name: str) -> pd.DataFrame:
205    """
206    Reads this instances [data] into a dataframe using the specification 
207    provided by [structures].
208
209    Arguments:
210      name (str): The name of the structure to read.
211
212    Returns:
213      (DataFrame): The dataframe containing the converted data from the
214      structure.
215    """
216    # Assert that structure exists.
217    assert self.__structures is not None, "There are no structures defined."
218    assert name in self.__structures, f"Structure with name '{name}' does not "\
219      + "exist. Add it using [add_structure] before reading."
220
221    # Get the structure
222    structure = self.__structures[name]
223
224    # Iterate rows and variables, building up the data
225    data = []
226    for row in range(structure.rows):
227      row_data = []
228      for var in structure.variables:
229        # Get data for the variable and add to row
230
231        # Get the offset for the row. This is the size of the row * the row 
232        # num.
233        offset = int(row * len(structure) / structure.rows)
234
235        # Get the data for the row
236        row_data.append(self.read_variable(var, offset))
237      data.append(row_data)
238
239      # Get the column names
240      columns = []
241      for var in structure.variables:
242        columns.append(var.name)
243
244    # Create the dataframe and return it
245    df = pd.DataFrame(data=data, columns=columns)
246    return df
247    
248  def read_variable(self, variable:Variable, offset:int=None) -> object:
249    """
250    Reads a variable from [data] and converts it to the data type specified in
251    the [variable].
252
253    Arguments:
254      variable (Variable): The name of the variable to read.
255      offset (int, optional): Offset to apply. Added to variable offset. Used
256        to read repeating structures. If not specified, then no offset is 
257        applied.
258
259    Returns:
260      object: The data converted to the data type specified in [variable].
261    """
262
263    # Get the data as bytes
264    full_offset = variable.offset if offset is None else variable.offset + \
265        offset
266    self.__check_bounds(full_offset, variable.size)
267    bytes_data = self.__read(full_offset, variable.size)
268
269    # Convert it to the correct data type
270    converted_data = None
271    #print(f"variable.byteorder as str: {str(variable.byteorder)}")
272    if variable.datatype == int:
273      converted_data = int.from_bytes(bytes_data,
274                                      byteorder=variable.byteorder.value,
275                                      signed=variable.signed)
276    elif variable.datatype == str:
277      converted_data = (str( bytes_data.decode(self.__str_encode)))
278    elif variable.datatype == bool:
279      converted_data = bool.from_bytes(bytes_data,
280                                       byteorder=variable.byteorder.value,
281                                       signed=variable.signed)
282    elif variable.datatype == float:
283      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
284      converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0]
285    else:
286       # For any other datatype, keep as bytes.
287       converted_data = bytes_data
288
289    # Return the converted data
290    return converted_data
291
292  def write_structure(self, name:str, df:pd.DataFrame):
293    """
294    Writes a dataframe back to this instances [data] using the specification 
295    provided in [structures].
296
297    Arguments:
298      name (str): The name of the structure to write.
299      df (pd.DataFrame): The dataframe to write.
300    """
301     # Assert that structure exists.
302    assert self.__structures is not None, "There are no structures defined."
303    assert name in self.__structures, f"Structure with name '{name}' does not "\
304      + "exist. Add it using [add_structure] before writing to it."
305
306    # Get the structure
307    structure = self.__structures[name]
308
309    # Check that the column names of the dataframe matches the variable names.
310    for i in range(len(df.columns)):
311      assert df.columns[i] == structure.variables[i].name, \
312        f"Structure name '{structure.variables[i].name}' does not match " \
313        + f"DataFrame column name 'df.columns[i]'."
314
315    # Check that number of rows matches [structure.rows]
316    assert df.shape[0] == structure.rows, \
317      f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\
318        + f"rows in structure ({structure.rows})."
319
320    # Iterate the rows of the dataframe saving its contents to [data]
321    for row in range(structure.rows):
322      row_data = df.iloc[row]
323
324      for var in structure.variables:
325        # Calculate the offset as the offset of the row + the offset of the
326        # variable.
327        offset = int(row * len(structure) / structure.rows)
328
329        # Get the data
330        item = row_data[var.name]
331
332        # Write the variable
333        self.write_variable(item, var, offset)
334        
335  def write_variable(self, data:object, variable:Variable, offset:int=None):
336    """
337    Writes a variable to the data, converts it back to bytes.
338
339    Arguments:
340      data (object): The data to write.
341      variable (Variable): The definition of the variable to write.
342      offset (int, optional): Offset to apply. Added to variable offset. Used
343        to write repeating structures. If not specified, then no offset is 
344        applied.
345    """
346    # Convert to native types.
347    if isinstance(data, np.integer):
348      data = int(data)
349    elif type(data) in [bool, np.bool_]:
350      data = bool(data)
351    elif isinstance(data, str):
352      data = str(data)
353    elif isinstance(data, float):
354      data = float(data)
355      
356    # Confirm that the provided data type matches the type specified in the
357    # [variable].
358    assert type(data) == variable.datatype, \
359      f"Datatype {type(data)} is not a subtype of {variable.datatype}"
360
361    # Calculate the full offset. Any offset passed to this method + the 
362    # variable offset.
363    full_offset = variable.offset if offset is None else variable.offset \
364        + offset
365    
366    # Check the bounds
367    self.__check_bounds(full_offset, variable.size)
368
369     # Write the data.
370    if variable.datatype == str:
371      self.__write(full_offset, data.encode(self.__str_encode))
372    elif variable.datatype in [int, bool]:
373      self.__write(full_offset,
374                   data.to_bytes(variable.size,
375                                 byteorder=variable.byteorder.value,
376                                 signed=variable.signed))
377    elif variable.datatype == float:
378      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
379      self.__write(full_offset, struct.pack(f'{endian_sign}f', data))
380    else:
381      self.__write(full_offset, data)
382
383  def __check_bounds(self, offset:int, length:int):
384    """
385    Checks that the offset and length is within the bounds of the data.
386
387     Arguments:
388      offset (int): The start position to test.
389      length: The number of bytes to test.
390    """
391    assert offset < len(self.data), f"Offset {offset} is out of bounds for "\
392      + f"data of length {len(self.data)}."
393    
394    assert offset + length <= len(self.data), f"Length {length} is out of "\
395      + f"bounds for data length {len(self.data)} starting at {offset}."
396    
397
398  def __read(self, offset:int, length:int) -> bytes:
399    """
400    Gets the bytes from [data] at offset.
401
402    Arguments:
403      offset (int): The position of the data to read. The byte at offset
404        position and [length] following bytes will be returned.
405      length: The number of bytes to return.
406
407    Returns:
408      bytes: The [length] bytes starting at [offset]
409    """
410    self.__check_bounds(offset, length)
411    return self.__data[offset:offset+length]
412
413
414  def __write(self, offset:int, value:bytes):
415    """
416    Writes [value] to [data] at position [offset].
417
418    Arguments:
419      offset (int): The position of the data to update. The byte at offset
420        position and any following bytes up to the length of [value] will be
421        updated.
422      value: The value to write.
423    """
424    self.__check_bounds(offset, len(value))
425    self.__data = self.__data[:offset] + value \
426        + self.__data[offset+len(value):]

Reads and writes binary data to and from pandas DataFrames using the specification provided in [structures].

Arguments:
  • data (bytes): The data.
  • structures (dict of str, Structure, optional): The specification of any data structures. The dict key is the name of the structure that will be used when reading and updating. Additional structures can be addes later using [add_structure]. If no structures are specified, then read_structure and write_structure cannot be used, however DataHandler can still be used to read and write variables.
  • str_encode (str, optional): The string encoding. See https://docs.python.org/3/library/codecs.html#standard-encodings for a list of all encodings. Default is utf-8.
DataHandler( data: bytes, structures: Dict[str, Structure] = None, str_encode='utf-8')
143  def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 
144               str_encode='utf-8'):
145    self.__data = data
146    self.__str_encode = str_encode
147
148    # Add the structures individually to benifit for boundary checks.
149    if structures is not None:
150      for structure_name in structures.keys():
151        self.add_structure(structure_name, structures[structure_name])
data
def add_structure(self, name: str, structure: Structure):
158  def add_structure(self, name: str, structure: Structure):
159    """
160    Adds a structure.
161
162    Arguments:
163      name (str): The name of the structure to add. If structure already exists 
164        with that name, it will be overwritten.
165      structure (Structure): The structure to add.
166    """
167    # Check that the structure is within the bounds of the data
168    self.__check_bounds(structure.start, len(structure))
169
170    # Create the structure if not already created and add the structure.
171    if self.__structures is None:
172      self.__structures = {name: structure}
173    else:
174      self.__structures[name] = structure

Adds a structure.

Arguments:
  • name (str): The name of the structure to add. If structure already exists with that name, it will be overwritten.
  • structure (Structure): The structure to add.
def read_hex(self, start: int = 0, length: int = None, seperator: str = ':') -> str:
176  def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str:
177    """
178    Returns data as a easily readable string of hexadecimal 
179    characters.
180
181    Arguments:
182      start (int, optional): The starting postion to read from. Default is the
183        first byte of the data.
184      length (int, optional): The number of bytes to return. Default is the 
185        number of bytes available to read in the data given the specified 
186        [start].
187      seperator (str, optional): The seperator character for hex bytes. Default 
188        is ':'.
189
190    Returns:
191      (str): The bytes as a string of hexadecimal byte representations seperated 
192        by [seperator].
193    """
194    # Calculate and set length if not specified.
195    if length is None:
196      length = len(self.data) - start
197
198    # Check that bounds
199    self.__check_bounds(start, length)
200    
201    # Return the data as a hex string
202    return self.data[start:start+length].hex(seperator)

Returns data as a easily readable string of hexadecimal characters.

Arguments:
  • start (int, optional): The starting postion to read from. Default is the first byte of the data.
  • length (int, optional): The number of bytes to return. Default is the number of bytes available to read in the data given the specified [start].
  • seperator (str, optional): The seperator character for hex bytes. Default is ':'.
Returns:

(str): The bytes as a string of hexadecimal byte representations seperated by [seperator].

def read_structure(self, name: str) -> pandas.core.frame.DataFrame:
204  def read_structure(self, name: str) -> pd.DataFrame:
205    """
206    Reads this instances [data] into a dataframe using the specification 
207    provided by [structures].
208
209    Arguments:
210      name (str): The name of the structure to read.
211
212    Returns:
213      (DataFrame): The dataframe containing the converted data from the
214      structure.
215    """
216    # Assert that structure exists.
217    assert self.__structures is not None, "There are no structures defined."
218    assert name in self.__structures, f"Structure with name '{name}' does not "\
219      + "exist. Add it using [add_structure] before reading."
220
221    # Get the structure
222    structure = self.__structures[name]
223
224    # Iterate rows and variables, building up the data
225    data = []
226    for row in range(structure.rows):
227      row_data = []
228      for var in structure.variables:
229        # Get data for the variable and add to row
230
231        # Get the offset for the row. This is the size of the row * the row 
232        # num.
233        offset = int(row * len(structure) / structure.rows)
234
235        # Get the data for the row
236        row_data.append(self.read_variable(var, offset))
237      data.append(row_data)
238
239      # Get the column names
240      columns = []
241      for var in structure.variables:
242        columns.append(var.name)
243
244    # Create the dataframe and return it
245    df = pd.DataFrame(data=data, columns=columns)
246    return df

Reads this instances [data] into a dataframe using the specification provided by [structures].

Arguments:
  • name (str): The name of the structure to read.
Returns:

(DataFrame): The dataframe containing the converted data from the structure.

def read_variable(self, variable: Variable, offset: int = None) -> object:
248  def read_variable(self, variable:Variable, offset:int=None) -> object:
249    """
250    Reads a variable from [data] and converts it to the data type specified in
251    the [variable].
252
253    Arguments:
254      variable (Variable): The name of the variable to read.
255      offset (int, optional): Offset to apply. Added to variable offset. Used
256        to read repeating structures. If not specified, then no offset is 
257        applied.
258
259    Returns:
260      object: The data converted to the data type specified in [variable].
261    """
262
263    # Get the data as bytes
264    full_offset = variable.offset if offset is None else variable.offset + \
265        offset
266    self.__check_bounds(full_offset, variable.size)
267    bytes_data = self.__read(full_offset, variable.size)
268
269    # Convert it to the correct data type
270    converted_data = None
271    #print(f"variable.byteorder as str: {str(variable.byteorder)}")
272    if variable.datatype == int:
273      converted_data = int.from_bytes(bytes_data,
274                                      byteorder=variable.byteorder.value,
275                                      signed=variable.signed)
276    elif variable.datatype == str:
277      converted_data = (str( bytes_data.decode(self.__str_encode)))
278    elif variable.datatype == bool:
279      converted_data = bool.from_bytes(bytes_data,
280                                       byteorder=variable.byteorder.value,
281                                       signed=variable.signed)
282    elif variable.datatype == float:
283      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
284      converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0]
285    else:
286       # For any other datatype, keep as bytes.
287       converted_data = bytes_data
288
289    # Return the converted data
290    return converted_data

Reads a variable from [data] and converts it to the data type specified in the [variable].

Arguments:
  • variable (Variable): The name of the variable to read.
  • offset (int, optional): Offset to apply. Added to variable offset. Used to read repeating structures. If not specified, then no offset is applied.
Returns:

object: The data converted to the data type specified in [variable].

def write_structure(self, name: str, df: pandas.core.frame.DataFrame):
292  def write_structure(self, name:str, df:pd.DataFrame):
293    """
294    Writes a dataframe back to this instances [data] using the specification 
295    provided in [structures].
296
297    Arguments:
298      name (str): The name of the structure to write.
299      df (pd.DataFrame): The dataframe to write.
300    """
301     # Assert that structure exists.
302    assert self.__structures is not None, "There are no structures defined."
303    assert name in self.__structures, f"Structure with name '{name}' does not "\
304      + "exist. Add it using [add_structure] before writing to it."
305
306    # Get the structure
307    structure = self.__structures[name]
308
309    # Check that the column names of the dataframe matches the variable names.
310    for i in range(len(df.columns)):
311      assert df.columns[i] == structure.variables[i].name, \
312        f"Structure name '{structure.variables[i].name}' does not match " \
313        + f"DataFrame column name 'df.columns[i]'."
314
315    # Check that number of rows matches [structure.rows]
316    assert df.shape[0] == structure.rows, \
317      f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\
318        + f"rows in structure ({structure.rows})."
319
320    # Iterate the rows of the dataframe saving its contents to [data]
321    for row in range(structure.rows):
322      row_data = df.iloc[row]
323
324      for var in structure.variables:
325        # Calculate the offset as the offset of the row + the offset of the
326        # variable.
327        offset = int(row * len(structure) / structure.rows)
328
329        # Get the data
330        item = row_data[var.name]
331
332        # Write the variable
333        self.write_variable(item, var, offset)

Writes a dataframe back to this instances [data] using the specification provided in [structures].

Arguments:
  • name (str): The name of the structure to write.
  • df (pd.DataFrame): The dataframe to write.
def write_variable(self, data: object, variable: Variable, offset: int = None):
335  def write_variable(self, data:object, variable:Variable, offset:int=None):
336    """
337    Writes a variable to the data, converts it back to bytes.
338
339    Arguments:
340      data (object): The data to write.
341      variable (Variable): The definition of the variable to write.
342      offset (int, optional): Offset to apply. Added to variable offset. Used
343        to write repeating structures. If not specified, then no offset is 
344        applied.
345    """
346    # Convert to native types.
347    if isinstance(data, np.integer):
348      data = int(data)
349    elif type(data) in [bool, np.bool_]:
350      data = bool(data)
351    elif isinstance(data, str):
352      data = str(data)
353    elif isinstance(data, float):
354      data = float(data)
355      
356    # Confirm that the provided data type matches the type specified in the
357    # [variable].
358    assert type(data) == variable.datatype, \
359      f"Datatype {type(data)} is not a subtype of {variable.datatype}"
360
361    # Calculate the full offset. Any offset passed to this method + the 
362    # variable offset.
363    full_offset = variable.offset if offset is None else variable.offset \
364        + offset
365    
366    # Check the bounds
367    self.__check_bounds(full_offset, variable.size)
368
369     # Write the data.
370    if variable.datatype == str:
371      self.__write(full_offset, data.encode(self.__str_encode))
372    elif variable.datatype in [int, bool]:
373      self.__write(full_offset,
374                   data.to_bytes(variable.size,
375                                 byteorder=variable.byteorder.value,
376                                 signed=variable.signed))
377    elif variable.datatype == float:
378      endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>'
379      self.__write(full_offset, struct.pack(f'{endian_sign}f', data))
380    else:
381      self.__write(full_offset, data)

Writes a variable to the data, converts it back to bytes.

Arguments:
  • data (object): The data to write.
  • variable (Variable): The definition of the variable to write.
  • offset (int, optional): Offset to apply. Added to variable offset. Used to write repeating structures. If not specified, then no offset is applied.