binda
Read and write binary data to and from Pandas DataFrames.
Created on Wed Sep 20 15:27:01 2023
@author: Jamie Cash
1# -*- coding: utf-8 -*- 2""" 3Read and write binary data to and from Pandas DataFrames. 4 5Created on Wed Sep 20 15:27:01 2023 6 7@author: Jamie Cash 8""" 9 10from enum import Enum 11import numpy as np 12import pandas as pd 13from typing import List, Dict 14import struct 15 16 17class ByteOrder(Enum): 18 """ 19 The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian 20 or ByteOrder.BIG for big endian. 21 """ 22 LITTLE = 'little' 23 BIG = 'big' 24 25 26class Variable: 27 """ 28 The metadata for a single variable. 29 30 Arguments: 31 name (str): The name of the variable. 32 size (int): The size of the variable in bytes. 33 datatype (type): The datatye of the variable. 34 offset (int, optional): The byte offset where the variable starts. 35 If not specified, this will be calculated by Structure. 36 byteorder (ByteOrder, optional): Whether the variable has a little or big 37 endian byte order. Default is ByteOrder.LITTLE. 38 signed (bool, optional): Whether the variable is signed. Default is false. 39 """ 40 name: str 41 size: int 42 datatype: type 43 offset: int 44 byteorder: ByteOrder 45 signed: bool 46 47 def __init__(self, name:str, size:int, datatype:type, offset:int=None, 48 byteorder:ByteOrder=ByteOrder.LITTLE, signed:bool=False): 49 self.name = name 50 self.size = size 51 self.datatype = datatype 52 self.offset = offset 53 self.byteorder = byteorder 54 self.signed = signed 55 56 @property 57 def next_offset(self): 58 """ 59 Gets the offset of the next variable. 60 """ 61 return self.offset + self.size 62 63 def __len__(self): 64 """ 65 The length of the variable. 66 """ 67 return self.size 68 69 def __repr__(self): 70 return f"name: {self.name}, offset: {self.offset}, size: {self.size}, " \ 71 f"datatype: {self.datatype}, byteorder: {self.byteorder}, " \ 72 f"signed: {self.signed}" 73 74 75class Structure: 76 """ 77 The definition of a data structure. 78 79 Arguments: 80 start (int): The start position of the data structure. 81 variables (List of Variable): The definitions of all variables in the 82 structure. 83 rows (int, optional): If the data structure is repeating, specifies the 84 number of times that it repeats. Default is 1 for a non repeating 85 structure. 86 """ 87 start: int 88 variables: List[Variable] 89 rows: int 90 91 def __init__(self, start:int, variables:List[Variable], rows:int=1): 92 self.start = start 93 self.variables = variables 94 self.rows = rows 95 96 # Calculate any missing variable offsets. 97 last = None 98 for variable in self.variables: 99 if variable.offset is None: 100 if last is None: 101 variable.offset = self.start 102 else: 103 variable.offset = last.next_offset 104 105 last = variable 106 107 def __len__(self): 108 """ 109 Length of the structure. This is the sum of the size of all variables 110 multipled by the number of rows. 111 """ 112 return sum([len(i) for i in self.variables]) * self.rows 113 114 def __repr__(self): 115 return f"start: {self.start}, rows: {self.rows}, variables: " + \ 116 f"{self.variables}" 117 118 119 120class DataHandler: 121 """ 122 Reads and writes binary data to and from pandas DataFrames using the 123 specification provided in [structures]. 124 125 Arguments: 126 data (bytes): The data. 127 structures (dict of str, Structure, optional): The specification of any 128 data structures. The dict key is the name of the structure that will 129 be used when reading and updating. Additional structures can be addes 130 later using [add_structure]. If no structures are specified, then 131 read_structure and write_structure cannot be used, however DataHandler 132 can still be used to read and write variables. 133 str_encode (str, optional): The string encoding. See 134 https://docs.python.org/3/library/codecs.html#standard-encodings for 135 a list of all encodings. Default is utf-8. 136 """ 137 138 __data: bytes 139 __structures: Dict[str, Structure] = None 140 __str_encode: str 141 142 def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 143 str_encode='utf-8'): 144 self.__data = data 145 self.__str_encode = str_encode 146 147 # Add the structures individually to benifit for boundary checks. 148 if structures is not None: 149 for structure_name in structures.keys(): 150 self.add_structure(structure_name, structures[structure_name]) 151 152 153 @property 154 def data(self): 155 return self.__data 156 157 def add_structure(self, name: str, structure: Structure): 158 """ 159 Adds a structure. 160 161 Arguments: 162 name (str): The name of the structure to add. If structure already exists 163 with that name, it will be overwritten. 164 structure (Structure): The structure to add. 165 """ 166 # Check that the structure is within the bounds of the data 167 self.__check_bounds(structure.start, len(structure)) 168 169 # Create the structure if not already created and add the structure. 170 if self.__structures is None: 171 self.__structures = {name: structure} 172 else: 173 self.__structures[name] = structure 174 175 def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str: 176 """ 177 Returns data as a easily readable string of hexadecimal 178 characters. 179 180 Arguments: 181 start (int, optional): The starting postion to read from. Default is the 182 first byte of the data. 183 length (int, optional): The number of bytes to return. Default is the 184 number of bytes available to read in the data given the specified 185 [start]. 186 seperator (str, optional): The seperator character for hex bytes. Default 187 is ':'. 188 189 Returns: 190 (str): The bytes as a string of hexadecimal byte representations seperated 191 by [seperator]. 192 """ 193 # Calculate and set length if not specified. 194 if length is None: 195 length = len(self.data) - start 196 197 # Check that bounds 198 self.__check_bounds(start, length) 199 200 # Return the data as a hex string 201 return self.data[start:start+length].hex(seperator) 202 203 def read_structure(self, name: str) -> pd.DataFrame: 204 """ 205 Reads this instances [data] into a dataframe using the specification 206 provided by [structures]. 207 208 Arguments: 209 name (str): The name of the structure to read. 210 211 Returns: 212 (DataFrame): The dataframe containing the converted data from the 213 structure. 214 """ 215 # Assert that structure exists. 216 assert self.__structures is not None, "There are no structures defined." 217 assert name in self.__structures, f"Structure with name '{name}' does not "\ 218 + "exist. Add it using [add_structure] before reading." 219 220 # Get the structure 221 structure = self.__structures[name] 222 223 # Iterate rows and variables, building up the data 224 data = [] 225 for row in range(structure.rows): 226 row_data = [] 227 for var in structure.variables: 228 # Get data for the variable and add to row 229 230 # Get the offset for the row. This is the size of the row * the row 231 # num. 232 offset = int(row * len(structure) / structure.rows) 233 234 # Get the data for the row 235 row_data.append(self.read_variable(var, offset)) 236 data.append(row_data) 237 238 # Get the column names 239 columns = [] 240 for var in structure.variables: 241 columns.append(var.name) 242 243 # Create the dataframe and return it 244 df = pd.DataFrame(data=data, columns=columns) 245 return df 246 247 def read_variable(self, variable:Variable, offset:int=None) -> object: 248 """ 249 Reads a variable from [data] and converts it to the data type specified in 250 the [variable]. 251 252 Arguments: 253 variable (Variable): The name of the variable to read. 254 offset (int, optional): Offset to apply. Added to variable offset. Used 255 to read repeating structures. If not specified, then no offset is 256 applied. 257 258 Returns: 259 object: The data converted to the data type specified in [variable]. 260 """ 261 262 # Get the data as bytes 263 full_offset = variable.offset if offset is None else variable.offset + \ 264 offset 265 self.__check_bounds(full_offset, variable.size) 266 bytes_data = self.__read(full_offset, variable.size) 267 268 # Convert it to the correct data type 269 converted_data = None 270 #print(f"variable.byteorder as str: {str(variable.byteorder)}") 271 if variable.datatype == int: 272 converted_data = int.from_bytes(bytes_data, 273 byteorder=variable.byteorder.value, 274 signed=variable.signed) 275 elif variable.datatype == str: 276 converted_data = (str( bytes_data.decode(self.__str_encode))) 277 elif variable.datatype == bool: 278 converted_data = bool.from_bytes(bytes_data, 279 byteorder=variable.byteorder.value, 280 signed=variable.signed) 281 elif variable.datatype == float: 282 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 283 converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0] 284 else: 285 # For any other datatype, keep as bytes. 286 converted_data = bytes_data 287 288 # Return the converted data 289 return converted_data 290 291 def write_structure(self, name:str, df:pd.DataFrame): 292 """ 293 Writes a dataframe back to this instances [data] using the specification 294 provided in [structures]. 295 296 Arguments: 297 name (str): The name of the structure to write. 298 df (pd.DataFrame): The dataframe to write. 299 """ 300 # Assert that structure exists. 301 assert self.__structures is not None, "There are no structures defined." 302 assert name in self.__structures, f"Structure with name '{name}' does not "\ 303 + "exist. Add it using [add_structure] before writing to it." 304 305 # Get the structure 306 structure = self.__structures[name] 307 308 # Check that the column names of the dataframe matches the variable names. 309 for i in range(len(df.columns)): 310 assert df.columns[i] == structure.variables[i].name, \ 311 f"Structure name '{structure.variables[i].name}' does not match " \ 312 + f"DataFrame column name 'df.columns[i]'." 313 314 # Check that number of rows matches [structure.rows] 315 assert df.shape[0] == structure.rows, \ 316 f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\ 317 + f"rows in structure ({structure.rows})." 318 319 # Iterate the rows of the dataframe saving its contents to [data] 320 for row in range(structure.rows): 321 row_data = df.iloc[row] 322 323 for var in structure.variables: 324 # Calculate the offset as the offset of the row + the offset of the 325 # variable. 326 offset = int(row * len(structure) / structure.rows) 327 328 # Get the data 329 item = row_data[var.name] 330 331 # Write the variable 332 self.write_variable(item, var, offset) 333 334 def write_variable(self, data:object, variable:Variable, offset:int=None): 335 """ 336 Writes a variable to the data, converts it back to bytes. 337 338 Arguments: 339 data (object): The data to write. 340 variable (Variable): The definition of the variable to write. 341 offset (int, optional): Offset to apply. Added to variable offset. Used 342 to write repeating structures. If not specified, then no offset is 343 applied. 344 """ 345 # Convert to native types. 346 if isinstance(data, np.integer): 347 data = int(data) 348 elif type(data) in [bool, np.bool_]: 349 data = bool(data) 350 elif isinstance(data, str): 351 data = str(data) 352 elif isinstance(data, float): 353 data = float(data) 354 355 # Confirm that the provided data type matches the type specified in the 356 # [variable]. 357 assert type(data) == variable.datatype, \ 358 f"Datatype {type(data)} is not a subtype of {variable.datatype}" 359 360 # Calculate the full offset. Any offset passed to this method + the 361 # variable offset. 362 full_offset = variable.offset if offset is None else variable.offset \ 363 + offset 364 365 # Check the bounds 366 self.__check_bounds(full_offset, variable.size) 367 368 # Write the data. 369 if variable.datatype == str: 370 self.__write(full_offset, data.encode(self.__str_encode)) 371 elif variable.datatype in [int, bool]: 372 self.__write(full_offset, 373 data.to_bytes(variable.size, 374 byteorder=variable.byteorder.value, 375 signed=variable.signed)) 376 elif variable.datatype == float: 377 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 378 self.__write(full_offset, struct.pack(f'{endian_sign}f', data)) 379 else: 380 self.__write(full_offset, data) 381 382 def __check_bounds(self, offset:int, length:int): 383 """ 384 Checks that the offset and length is within the bounds of the data. 385 386 Arguments: 387 offset (int): The start position to test. 388 length: The number of bytes to test. 389 """ 390 assert offset < len(self.data), f"Offset {offset} is out of bounds for "\ 391 + f"data of length {len(self.data)}." 392 393 assert offset + length <= len(self.data), f"Length {length} is out of "\ 394 + f"bounds for data length {len(self.data)} starting at {offset}." 395 396 397 def __read(self, offset:int, length:int) -> bytes: 398 """ 399 Gets the bytes from [data] at offset. 400 401 Arguments: 402 offset (int): The position of the data to read. The byte at offset 403 position and [length] following bytes will be returned. 404 length: The number of bytes to return. 405 406 Returns: 407 bytes: The [length] bytes starting at [offset] 408 """ 409 self.__check_bounds(offset, length) 410 return self.__data[offset:offset+length] 411 412 413 def __write(self, offset:int, value:bytes): 414 """ 415 Writes [value] to [data] at position [offset]. 416 417 Arguments: 418 offset (int): The position of the data to update. The byte at offset 419 position and any following bytes up to the length of [value] will be 420 updated. 421 value: The value to write. 422 """ 423 self.__check_bounds(offset, len(value)) 424 self.__data = self.__data[:offset] + value \ 425 + self.__data[offset+len(value):]
18class ByteOrder(Enum): 19 """ 20 The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian 21 or ByteOrder.BIG for big endian. 22 """ 23 LITTLE = 'little' 24 BIG = 'big'
The order of bytes in a variable. Can be ByteOrder.LITTLE for little endian or ByteOrder.BIG for big endian.
Inherited Members
- enum.Enum
- name
- value
27class Variable: 28 """ 29 The metadata for a single variable. 30 31 Arguments: 32 name (str): The name of the variable. 33 size (int): The size of the variable in bytes. 34 datatype (type): The datatye of the variable. 35 offset (int, optional): The byte offset where the variable starts. 36 If not specified, this will be calculated by Structure. 37 byteorder (ByteOrder, optional): Whether the variable has a little or big 38 endian byte order. Default is ByteOrder.LITTLE. 39 signed (bool, optional): Whether the variable is signed. Default is false. 40 """ 41 name: str 42 size: int 43 datatype: type 44 offset: int 45 byteorder: ByteOrder 46 signed: bool 47 48 def __init__(self, name:str, size:int, datatype:type, offset:int=None, 49 byteorder:ByteOrder=ByteOrder.LITTLE, signed:bool=False): 50 self.name = name 51 self.size = size 52 self.datatype = datatype 53 self.offset = offset 54 self.byteorder = byteorder 55 self.signed = signed 56 57 @property 58 def next_offset(self): 59 """ 60 Gets the offset of the next variable. 61 """ 62 return self.offset + self.size 63 64 def __len__(self): 65 """ 66 The length of the variable. 67 """ 68 return self.size 69 70 def __repr__(self): 71 return f"name: {self.name}, offset: {self.offset}, size: {self.size}, " \ 72 f"datatype: {self.datatype}, byteorder: {self.byteorder}, " \ 73 f"signed: {self.signed}"
The metadata for a single variable.
Arguments:
- name (str): The name of the variable.
- size (int): The size of the variable in bytes.
- datatype (type): The datatye of the variable.
- offset (int, optional): The byte offset where the variable starts. If not specified, this will be calculated by Structure.
- byteorder (ByteOrder, optional): Whether the variable has a little or big endian byte order. Default is ByteOrder.LITTLE.
- signed (bool, optional): Whether the variable is signed. Default is false.
76class Structure: 77 """ 78 The definition of a data structure. 79 80 Arguments: 81 start (int): The start position of the data structure. 82 variables (List of Variable): The definitions of all variables in the 83 structure. 84 rows (int, optional): If the data structure is repeating, specifies the 85 number of times that it repeats. Default is 1 for a non repeating 86 structure. 87 """ 88 start: int 89 variables: List[Variable] 90 rows: int 91 92 def __init__(self, start:int, variables:List[Variable], rows:int=1): 93 self.start = start 94 self.variables = variables 95 self.rows = rows 96 97 # Calculate any missing variable offsets. 98 last = None 99 for variable in self.variables: 100 if variable.offset is None: 101 if last is None: 102 variable.offset = self.start 103 else: 104 variable.offset = last.next_offset 105 106 last = variable 107 108 def __len__(self): 109 """ 110 Length of the structure. This is the sum of the size of all variables 111 multipled by the number of rows. 112 """ 113 return sum([len(i) for i in self.variables]) * self.rows 114 115 def __repr__(self): 116 return f"start: {self.start}, rows: {self.rows}, variables: " + \ 117 f"{self.variables}"
The definition of a data structure.
Arguments:
- start (int): The start position of the data structure.
- variables (List of Variable): The definitions of all variables in the structure.
- rows (int, optional): If the data structure is repeating, specifies the number of times that it repeats. Default is 1 for a non repeating structure.
92 def __init__(self, start:int, variables:List[Variable], rows:int=1): 93 self.start = start 94 self.variables = variables 95 self.rows = rows 96 97 # Calculate any missing variable offsets. 98 last = None 99 for variable in self.variables: 100 if variable.offset is None: 101 if last is None: 102 variable.offset = self.start 103 else: 104 variable.offset = last.next_offset 105 106 last = variable
121class DataHandler: 122 """ 123 Reads and writes binary data to and from pandas DataFrames using the 124 specification provided in [structures]. 125 126 Arguments: 127 data (bytes): The data. 128 structures (dict of str, Structure, optional): The specification of any 129 data structures. The dict key is the name of the structure that will 130 be used when reading and updating. Additional structures can be addes 131 later using [add_structure]. If no structures are specified, then 132 read_structure and write_structure cannot be used, however DataHandler 133 can still be used to read and write variables. 134 str_encode (str, optional): The string encoding. See 135 https://docs.python.org/3/library/codecs.html#standard-encodings for 136 a list of all encodings. Default is utf-8. 137 """ 138 139 __data: bytes 140 __structures: Dict[str, Structure] = None 141 __str_encode: str 142 143 def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 144 str_encode='utf-8'): 145 self.__data = data 146 self.__str_encode = str_encode 147 148 # Add the structures individually to benifit for boundary checks. 149 if structures is not None: 150 for structure_name in structures.keys(): 151 self.add_structure(structure_name, structures[structure_name]) 152 153 154 @property 155 def data(self): 156 return self.__data 157 158 def add_structure(self, name: str, structure: Structure): 159 """ 160 Adds a structure. 161 162 Arguments: 163 name (str): The name of the structure to add. If structure already exists 164 with that name, it will be overwritten. 165 structure (Structure): The structure to add. 166 """ 167 # Check that the structure is within the bounds of the data 168 self.__check_bounds(structure.start, len(structure)) 169 170 # Create the structure if not already created and add the structure. 171 if self.__structures is None: 172 self.__structures = {name: structure} 173 else: 174 self.__structures[name] = structure 175 176 def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str: 177 """ 178 Returns data as a easily readable string of hexadecimal 179 characters. 180 181 Arguments: 182 start (int, optional): The starting postion to read from. Default is the 183 first byte of the data. 184 length (int, optional): The number of bytes to return. Default is the 185 number of bytes available to read in the data given the specified 186 [start]. 187 seperator (str, optional): The seperator character for hex bytes. Default 188 is ':'. 189 190 Returns: 191 (str): The bytes as a string of hexadecimal byte representations seperated 192 by [seperator]. 193 """ 194 # Calculate and set length if not specified. 195 if length is None: 196 length = len(self.data) - start 197 198 # Check that bounds 199 self.__check_bounds(start, length) 200 201 # Return the data as a hex string 202 return self.data[start:start+length].hex(seperator) 203 204 def read_structure(self, name: str) -> pd.DataFrame: 205 """ 206 Reads this instances [data] into a dataframe using the specification 207 provided by [structures]. 208 209 Arguments: 210 name (str): The name of the structure to read. 211 212 Returns: 213 (DataFrame): The dataframe containing the converted data from the 214 structure. 215 """ 216 # Assert that structure exists. 217 assert self.__structures is not None, "There are no structures defined." 218 assert name in self.__structures, f"Structure with name '{name}' does not "\ 219 + "exist. Add it using [add_structure] before reading." 220 221 # Get the structure 222 structure = self.__structures[name] 223 224 # Iterate rows and variables, building up the data 225 data = [] 226 for row in range(structure.rows): 227 row_data = [] 228 for var in structure.variables: 229 # Get data for the variable and add to row 230 231 # Get the offset for the row. This is the size of the row * the row 232 # num. 233 offset = int(row * len(structure) / structure.rows) 234 235 # Get the data for the row 236 row_data.append(self.read_variable(var, offset)) 237 data.append(row_data) 238 239 # Get the column names 240 columns = [] 241 for var in structure.variables: 242 columns.append(var.name) 243 244 # Create the dataframe and return it 245 df = pd.DataFrame(data=data, columns=columns) 246 return df 247 248 def read_variable(self, variable:Variable, offset:int=None) -> object: 249 """ 250 Reads a variable from [data] and converts it to the data type specified in 251 the [variable]. 252 253 Arguments: 254 variable (Variable): The name of the variable to read. 255 offset (int, optional): Offset to apply. Added to variable offset. Used 256 to read repeating structures. If not specified, then no offset is 257 applied. 258 259 Returns: 260 object: The data converted to the data type specified in [variable]. 261 """ 262 263 # Get the data as bytes 264 full_offset = variable.offset if offset is None else variable.offset + \ 265 offset 266 self.__check_bounds(full_offset, variable.size) 267 bytes_data = self.__read(full_offset, variable.size) 268 269 # Convert it to the correct data type 270 converted_data = None 271 #print(f"variable.byteorder as str: {str(variable.byteorder)}") 272 if variable.datatype == int: 273 converted_data = int.from_bytes(bytes_data, 274 byteorder=variable.byteorder.value, 275 signed=variable.signed) 276 elif variable.datatype == str: 277 converted_data = (str( bytes_data.decode(self.__str_encode))) 278 elif variable.datatype == bool: 279 converted_data = bool.from_bytes(bytes_data, 280 byteorder=variable.byteorder.value, 281 signed=variable.signed) 282 elif variable.datatype == float: 283 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 284 converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0] 285 else: 286 # For any other datatype, keep as bytes. 287 converted_data = bytes_data 288 289 # Return the converted data 290 return converted_data 291 292 def write_structure(self, name:str, df:pd.DataFrame): 293 """ 294 Writes a dataframe back to this instances [data] using the specification 295 provided in [structures]. 296 297 Arguments: 298 name (str): The name of the structure to write. 299 df (pd.DataFrame): The dataframe to write. 300 """ 301 # Assert that structure exists. 302 assert self.__structures is not None, "There are no structures defined." 303 assert name in self.__structures, f"Structure with name '{name}' does not "\ 304 + "exist. Add it using [add_structure] before writing to it." 305 306 # Get the structure 307 structure = self.__structures[name] 308 309 # Check that the column names of the dataframe matches the variable names. 310 for i in range(len(df.columns)): 311 assert df.columns[i] == structure.variables[i].name, \ 312 f"Structure name '{structure.variables[i].name}' does not match " \ 313 + f"DataFrame column name 'df.columns[i]'." 314 315 # Check that number of rows matches [structure.rows] 316 assert df.shape[0] == structure.rows, \ 317 f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\ 318 + f"rows in structure ({structure.rows})." 319 320 # Iterate the rows of the dataframe saving its contents to [data] 321 for row in range(structure.rows): 322 row_data = df.iloc[row] 323 324 for var in structure.variables: 325 # Calculate the offset as the offset of the row + the offset of the 326 # variable. 327 offset = int(row * len(structure) / structure.rows) 328 329 # Get the data 330 item = row_data[var.name] 331 332 # Write the variable 333 self.write_variable(item, var, offset) 334 335 def write_variable(self, data:object, variable:Variable, offset:int=None): 336 """ 337 Writes a variable to the data, converts it back to bytes. 338 339 Arguments: 340 data (object): The data to write. 341 variable (Variable): The definition of the variable to write. 342 offset (int, optional): Offset to apply. Added to variable offset. Used 343 to write repeating structures. If not specified, then no offset is 344 applied. 345 """ 346 # Convert to native types. 347 if isinstance(data, np.integer): 348 data = int(data) 349 elif type(data) in [bool, np.bool_]: 350 data = bool(data) 351 elif isinstance(data, str): 352 data = str(data) 353 elif isinstance(data, float): 354 data = float(data) 355 356 # Confirm that the provided data type matches the type specified in the 357 # [variable]. 358 assert type(data) == variable.datatype, \ 359 f"Datatype {type(data)} is not a subtype of {variable.datatype}" 360 361 # Calculate the full offset. Any offset passed to this method + the 362 # variable offset. 363 full_offset = variable.offset if offset is None else variable.offset \ 364 + offset 365 366 # Check the bounds 367 self.__check_bounds(full_offset, variable.size) 368 369 # Write the data. 370 if variable.datatype == str: 371 self.__write(full_offset, data.encode(self.__str_encode)) 372 elif variable.datatype in [int, bool]: 373 self.__write(full_offset, 374 data.to_bytes(variable.size, 375 byteorder=variable.byteorder.value, 376 signed=variable.signed)) 377 elif variable.datatype == float: 378 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 379 self.__write(full_offset, struct.pack(f'{endian_sign}f', data)) 380 else: 381 self.__write(full_offset, data) 382 383 def __check_bounds(self, offset:int, length:int): 384 """ 385 Checks that the offset and length is within the bounds of the data. 386 387 Arguments: 388 offset (int): The start position to test. 389 length: The number of bytes to test. 390 """ 391 assert offset < len(self.data), f"Offset {offset} is out of bounds for "\ 392 + f"data of length {len(self.data)}." 393 394 assert offset + length <= len(self.data), f"Length {length} is out of "\ 395 + f"bounds for data length {len(self.data)} starting at {offset}." 396 397 398 def __read(self, offset:int, length:int) -> bytes: 399 """ 400 Gets the bytes from [data] at offset. 401 402 Arguments: 403 offset (int): The position of the data to read. The byte at offset 404 position and [length] following bytes will be returned. 405 length: The number of bytes to return. 406 407 Returns: 408 bytes: The [length] bytes starting at [offset] 409 """ 410 self.__check_bounds(offset, length) 411 return self.__data[offset:offset+length] 412 413 414 def __write(self, offset:int, value:bytes): 415 """ 416 Writes [value] to [data] at position [offset]. 417 418 Arguments: 419 offset (int): The position of the data to update. The byte at offset 420 position and any following bytes up to the length of [value] will be 421 updated. 422 value: The value to write. 423 """ 424 self.__check_bounds(offset, len(value)) 425 self.__data = self.__data[:offset] + value \ 426 + self.__data[offset+len(value):]
Reads and writes binary data to and from pandas DataFrames using the specification provided in [structures].
Arguments:
- data (bytes): The data.
- structures (dict of str, Structure, optional): The specification of any data structures. The dict key is the name of the structure that will be used when reading and updating. Additional structures can be addes later using [add_structure]. If no structures are specified, then read_structure and write_structure cannot be used, however DataHandler can still be used to read and write variables.
- str_encode (str, optional): The string encoding. See https://docs.python.org/3/library/codecs.html#standard-encodings for a list of all encodings. Default is utf-8.
143 def __init__(self, data: bytes, structures: Dict[str, Structure]=None, 144 str_encode='utf-8'): 145 self.__data = data 146 self.__str_encode = str_encode 147 148 # Add the structures individually to benifit for boundary checks. 149 if structures is not None: 150 for structure_name in structures.keys(): 151 self.add_structure(structure_name, structures[structure_name])
158 def add_structure(self, name: str, structure: Structure): 159 """ 160 Adds a structure. 161 162 Arguments: 163 name (str): The name of the structure to add. If structure already exists 164 with that name, it will be overwritten. 165 structure (Structure): The structure to add. 166 """ 167 # Check that the structure is within the bounds of the data 168 self.__check_bounds(structure.start, len(structure)) 169 170 # Create the structure if not already created and add the structure. 171 if self.__structures is None: 172 self.__structures = {name: structure} 173 else: 174 self.__structures[name] = structure
Adds a structure.
Arguments:
- name (str): The name of the structure to add. If structure already exists with that name, it will be overwritten.
- structure (Structure): The structure to add.
176 def read_hex(self, start: int=0, length: int=None, seperator: str=':') -> str: 177 """ 178 Returns data as a easily readable string of hexadecimal 179 characters. 180 181 Arguments: 182 start (int, optional): The starting postion to read from. Default is the 183 first byte of the data. 184 length (int, optional): The number of bytes to return. Default is the 185 number of bytes available to read in the data given the specified 186 [start]. 187 seperator (str, optional): The seperator character for hex bytes. Default 188 is ':'. 189 190 Returns: 191 (str): The bytes as a string of hexadecimal byte representations seperated 192 by [seperator]. 193 """ 194 # Calculate and set length if not specified. 195 if length is None: 196 length = len(self.data) - start 197 198 # Check that bounds 199 self.__check_bounds(start, length) 200 201 # Return the data as a hex string 202 return self.data[start:start+length].hex(seperator)
Returns data as a easily readable string of hexadecimal characters.
Arguments:
- start (int, optional): The starting postion to read from. Default is the first byte of the data.
- length (int, optional): The number of bytes to return. Default is the number of bytes available to read in the data given the specified [start].
- seperator (str, optional): The seperator character for hex bytes. Default is ':'.
Returns:
(str): The bytes as a string of hexadecimal byte representations seperated by [seperator].
204 def read_structure(self, name: str) -> pd.DataFrame: 205 """ 206 Reads this instances [data] into a dataframe using the specification 207 provided by [structures]. 208 209 Arguments: 210 name (str): The name of the structure to read. 211 212 Returns: 213 (DataFrame): The dataframe containing the converted data from the 214 structure. 215 """ 216 # Assert that structure exists. 217 assert self.__structures is not None, "There are no structures defined." 218 assert name in self.__structures, f"Structure with name '{name}' does not "\ 219 + "exist. Add it using [add_structure] before reading." 220 221 # Get the structure 222 structure = self.__structures[name] 223 224 # Iterate rows and variables, building up the data 225 data = [] 226 for row in range(structure.rows): 227 row_data = [] 228 for var in structure.variables: 229 # Get data for the variable and add to row 230 231 # Get the offset for the row. This is the size of the row * the row 232 # num. 233 offset = int(row * len(structure) / structure.rows) 234 235 # Get the data for the row 236 row_data.append(self.read_variable(var, offset)) 237 data.append(row_data) 238 239 # Get the column names 240 columns = [] 241 for var in structure.variables: 242 columns.append(var.name) 243 244 # Create the dataframe and return it 245 df = pd.DataFrame(data=data, columns=columns) 246 return df
Reads this instances [data] into a dataframe using the specification provided by [structures].
Arguments:
- name (str): The name of the structure to read.
Returns:
(DataFrame): The dataframe containing the converted data from the structure.
248 def read_variable(self, variable:Variable, offset:int=None) -> object: 249 """ 250 Reads a variable from [data] and converts it to the data type specified in 251 the [variable]. 252 253 Arguments: 254 variable (Variable): The name of the variable to read. 255 offset (int, optional): Offset to apply. Added to variable offset. Used 256 to read repeating structures. If not specified, then no offset is 257 applied. 258 259 Returns: 260 object: The data converted to the data type specified in [variable]. 261 """ 262 263 # Get the data as bytes 264 full_offset = variable.offset if offset is None else variable.offset + \ 265 offset 266 self.__check_bounds(full_offset, variable.size) 267 bytes_data = self.__read(full_offset, variable.size) 268 269 # Convert it to the correct data type 270 converted_data = None 271 #print(f"variable.byteorder as str: {str(variable.byteorder)}") 272 if variable.datatype == int: 273 converted_data = int.from_bytes(bytes_data, 274 byteorder=variable.byteorder.value, 275 signed=variable.signed) 276 elif variable.datatype == str: 277 converted_data = (str( bytes_data.decode(self.__str_encode))) 278 elif variable.datatype == bool: 279 converted_data = bool.from_bytes(bytes_data, 280 byteorder=variable.byteorder.value, 281 signed=variable.signed) 282 elif variable.datatype == float: 283 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 284 converted_data = struct.unpack(f'{endian_sign}f', bytes_data)[0] 285 else: 286 # For any other datatype, keep as bytes. 287 converted_data = bytes_data 288 289 # Return the converted data 290 return converted_data
Reads a variable from [data] and converts it to the data type specified in the [variable].
Arguments:
- variable (Variable): The name of the variable to read.
- offset (int, optional): Offset to apply. Added to variable offset. Used to read repeating structures. If not specified, then no offset is applied.
Returns:
object: The data converted to the data type specified in [variable].
292 def write_structure(self, name:str, df:pd.DataFrame): 293 """ 294 Writes a dataframe back to this instances [data] using the specification 295 provided in [structures]. 296 297 Arguments: 298 name (str): The name of the structure to write. 299 df (pd.DataFrame): The dataframe to write. 300 """ 301 # Assert that structure exists. 302 assert self.__structures is not None, "There are no structures defined." 303 assert name in self.__structures, f"Structure with name '{name}' does not "\ 304 + "exist. Add it using [add_structure] before writing to it." 305 306 # Get the structure 307 structure = self.__structures[name] 308 309 # Check that the column names of the dataframe matches the variable names. 310 for i in range(len(df.columns)): 311 assert df.columns[i] == structure.variables[i].name, \ 312 f"Structure name '{structure.variables[i].name}' does not match " \ 313 + f"DataFrame column name 'df.columns[i]'." 314 315 # Check that number of rows matches [structure.rows] 316 assert df.shape[0] == structure.rows, \ 317 f"Number of rows in DataFrame ({df.shape[0]}) does not match number of "\ 318 + f"rows in structure ({structure.rows})." 319 320 # Iterate the rows of the dataframe saving its contents to [data] 321 for row in range(structure.rows): 322 row_data = df.iloc[row] 323 324 for var in structure.variables: 325 # Calculate the offset as the offset of the row + the offset of the 326 # variable. 327 offset = int(row * len(structure) / structure.rows) 328 329 # Get the data 330 item = row_data[var.name] 331 332 # Write the variable 333 self.write_variable(item, var, offset)
Writes a dataframe back to this instances [data] using the specification provided in [structures].
Arguments:
- name (str): The name of the structure to write.
- df (pd.DataFrame): The dataframe to write.
335 def write_variable(self, data:object, variable:Variable, offset:int=None): 336 """ 337 Writes a variable to the data, converts it back to bytes. 338 339 Arguments: 340 data (object): The data to write. 341 variable (Variable): The definition of the variable to write. 342 offset (int, optional): Offset to apply. Added to variable offset. Used 343 to write repeating structures. If not specified, then no offset is 344 applied. 345 """ 346 # Convert to native types. 347 if isinstance(data, np.integer): 348 data = int(data) 349 elif type(data) in [bool, np.bool_]: 350 data = bool(data) 351 elif isinstance(data, str): 352 data = str(data) 353 elif isinstance(data, float): 354 data = float(data) 355 356 # Confirm that the provided data type matches the type specified in the 357 # [variable]. 358 assert type(data) == variable.datatype, \ 359 f"Datatype {type(data)} is not a subtype of {variable.datatype}" 360 361 # Calculate the full offset. Any offset passed to this method + the 362 # variable offset. 363 full_offset = variable.offset if offset is None else variable.offset \ 364 + offset 365 366 # Check the bounds 367 self.__check_bounds(full_offset, variable.size) 368 369 # Write the data. 370 if variable.datatype == str: 371 self.__write(full_offset, data.encode(self.__str_encode)) 372 elif variable.datatype in [int, bool]: 373 self.__write(full_offset, 374 data.to_bytes(variable.size, 375 byteorder=variable.byteorder.value, 376 signed=variable.signed)) 377 elif variable.datatype == float: 378 endian_sign = '<' if variable.byteorder == ByteOrder.LITTLE else '>' 379 self.__write(full_offset, struct.pack(f'{endian_sign}f', data)) 380 else: 381 self.__write(full_offset, data)
Writes a variable to the data, converts it back to bytes.
Arguments:
- data (object): The data to write.
- variable (Variable): The definition of the variable to write.
- offset (int, optional): Offset to apply. Added to variable offset. Used to write repeating structures. If not specified, then no offset is applied.