Source code for ecom.parser

import struct
from abc import ABC

from typing import TYPE_CHECKING, Iterator, List, Optional, Union, Callable, Tuple, Dict, Any, TypeVar, Generic
from logging import getLogger
from dataclasses import dataclass

from ecom.message import Message, MessageType, DependantTelecommandDatapointType, MessageDatapointType, Telemetry, \
    Telecommand
from ecom.datatypes import StructType, ArrayType, TypeInfo, DynamicSizeError, CommunicationDatabaseAccessor, EnumType
from ecom.verification import MessageVerifier, MissingDataForVerificationError, VerificationError

if TYPE_CHECKING:
    from ecom.database import CommunicationDatabase


[docs]class ParserError(RuntimeError): """ An error occurred during parsing. """ def __init__(self, message: str, buffer: bytes): """ Instantiate a new parser error. :param message: An error message. :param buffer: The current parser buffer. """ super().__init__(message) self._buffer = buffer @property
[docs] def buffer(self) -> bytes: """ :return: The parser buffer at the time the error was generated. """ return self._buffer
[docs]M = TypeVar('M', bound=Message)
[docs]class Parser(CommunicationDatabaseAccessor, ABC, Generic[M]): """ A parser for messages. """
[docs] DEFAULT_MAX_DYNAMIC_MEMBER_SIZE = 512
""" The default maximum size of a dynamically sized member that the parser will allow during parsing. """ @dataclass(frozen=True) class _ParseInfo: """ Parsing info for a message type. """ formats: List[Union[str, DependantTelecommandDatapointType]] """ A list of parsing formats or dependant data point types that need to be resolved with the data. """ types: List[Tuple[str, TypeInfo]] """ A list of names to type info mappings of the data points. """ def __init__(self, database: 'CommunicationDatabase', headerType: str, messageTypes: List[MessageType], verifier: Optional[MessageVerifier] = None, maxDynamicMemberSize: int = DEFAULT_MAX_DYNAMIC_MEMBER_SIZE): """ Initialize a new message parser. :param database: The communication database. :param headerType: The name of the message header type. :param verifier: An optional verifier that will be used to verify parsed messages. :param maxDynamicMemberSize: The maximum allowed size of a dynamically sized member. """ super().__init__(database) self._logger = getLogger(self.__class__.__name__) self._verifier = verifier self._maxDynamicMemberSize = maxDynamicMemberSize self._buffer = b'' self._parseInfos = {} self._numParsedBytes = 0 self._numInvalidBytes = 0 self._parserHeaderTypeInfo = database.getTypeInfo(headerType) # type: TypeInfo[StructType] if not self._parserHeaderTypeInfo: raise RuntimeError(f'Database is missing the "{headerType}" type') if not issubclass(self._parserHeaderTypeInfo.type, StructType): raise RuntimeError(f'Invalid header type: "{headerType}" type must be a struct type, ' f'but is {type(self._parserHeaderTypeInfo.type)}') self._syncBits = self._getSyncBytes(self._parserHeaderTypeInfo) self._syncBitsLength = len(self._syncBits) self._headerParseInfo = self._createParseInfo(list(MessageDatapointType( name=name, type=typeInfo ) for name, typeInfo in self._parserHeaderTypeInfo.type)) self._createParserInfos(messageTypes) database.registerChangeListener(lambda: self._createParserInfos(messageTypes))
[docs] def parse(self, message: bytes, errorHandler: Optional[Callable[[ParserError], None]] = None, ignoredBytesHandler: Optional[Callable[[bytes, int], None]] = None) -> Iterator[M]: """ Parse messages from the data. Multiple messages might be parsed from the data. If the data contains an incomplete message, it will be added to an internal buffer and parsed once the rest of the data is available. :param message: The raw data to parse. :param errorHandler: An optional error handler for handling parser errors. :param ignoredBytesHandler: An optional handler for bytes that will get ignored during parsing. Takes the parser buffer and the number of bytes that will be ignored. :return: An iterator over all parsed messages. :raises ParserError: If the data contains a message with invalid data and no error handler is specified. """ self._buffer += message if errorHandler is None: errorHandler = self._defaultErrorHandler if ignoredBytesHandler is None: ignoredBytesHandler = self._defaultIgnoredBytesHandler while True: startIndex = self._buffer.find(self._syncBits) if startIndex == -1: if len(self._buffer) > 1: ignoredBytesHandler(self._buffer, len(self._buffer) - (self._syncBitsLength - 1)) self._numInvalidBytes += len(self._buffer) - (self._syncBitsLength - 1) # Throw away everything except what could be the start of sync bytes from the next message self._buffer = self._buffer[-(self._syncBitsLength - 1):] break elif startIndex != 0: ignoredBytesHandler(self._buffer, startIndex) self._buffer = self._buffer[startIndex:] self._numInvalidBytes += startIndex messageType = None try: header, headerSize = self._parse(self._headerParseInfo, self._buffer) if header is None: break messageType = header['type'] values, payloadSize = self._parse( self._parseInfos[messageType], self._buffer[headerSize:]) except ValueError as error: errorBuffer = self._buffer self._buffer = self._buffer[1:] # Remove the first of the invalid sync bits self._numInvalidBytes += 1 if messageType is None: errorHandler(ParserError(f'Failed to parse header: {error}', errorBuffer)) else: errorHandler(ParserError(f'Failed to parse {messageType.name} message: {error}', errorBuffer)) continue if values is None: break messageSize = headerSize + payloadSize if self._verifier is not None: try: verifiedBytes = self._verifier.verify( self._buffer, message=messageType, header=self._parserHeaderTypeInfo, messageSize=messageSize) except MissingDataForVerificationError: return # Not enough data, wait for more to arrive except VerificationError as error: errorBuffer = self._buffer self._buffer = self._buffer[1:] # Remove the first of the invalid sync bits self._numInvalidBytes += 1 errorHandler(ParserError( f'Verification failed for {messageType.name} message: {error}', errorBuffer)) continue if verifiedBytes is not None: self._buffer = verifiedBytes continue self._buffer = self._buffer[messageSize:] self._numParsedBytes += messageSize yield self._createParsedMessage(typ=messageType, data=values, header=header)
@property
[docs] def numParsedBytes(self) -> int: """ :return: The number of bytes of all successfully parsed messages. """ return self._numParsedBytes
@property
[docs] def numInvalidBytes(self) -> int: """ :return: The number of bytes that could not be parsed into a message. """ return self._numInvalidBytes
@staticmethod def _createParsedMessage(typ: EnumType, data: Dict[str, Any], header: Dict[str, Any]) -> M: return Message(type=typ, data=data, header=header) @staticmethod def _getSyncBytes(headerType: TypeInfo[StructType]) -> bytearray: """ Determine the synchronization bytes at the start of the header that are used to detect the start of a message in a byte stream. :param headerType: The type information about the message header. :return: The synchronization bytes to look for. """ return bytearray([headerType.type['sync byte 1'].default.value, headerType.type['sync byte 2'].default.value]) def _createParserInfos(self, messageTypes: List[MessageType]): """ Create all parser infos from the given list of message types. :param messageTypes: The types of message to generate parser info from. """ self._parseInfos.clear() for messageType in messageTypes: self._parseInfos[messageType.id] = self._createParseInfo(messageType.data) def _createParseInfo(self, datapoints: List[MessageDatapointType]): """ Create a parser info from the datapoints. :param datapoints: A list of datapoints. :return: The parser info for the datapoints. """ formats = [''] datapointTypes = [] for datapoint in datapoints: typeInfo = datapoint.type datapointTypes.append((datapoint.name, typeInfo)) if isinstance(datapoint, DependantTelecommandDatapointType): formats.extend([datapoint, '']) continue typeFormats = typeInfo.getFormats(self._database) formats[-1] += typeFormats[0] formats.extend(typeFormats[1:]) formats = ['<' + formatStr if isinstance(formatStr, str) else formatStr for formatStr in formats if formatStr != ''] return self._ParseInfo(formats, datapointTypes) def _parse(self, parseInfo: _ParseInfo, buffer: bytes): """ Parse the values indicated by the parse info from the buffer. :param parseInfo: Parsing information. :param buffer: The buffer to parse from. :return: The parsed values and the number of bytes consumed, or (None, None) if the buffer is not large enough. """ value = {} size = 0 if not parseInfo.types: return value, size valueTypeInfos = parseInfo.types.copy() for formatStr in parseInfo.formats: firstDatapointName, firstDatapointTypeInfo = valueTypeInfos[0] if isinstance(formatStr, DependantTelecommandDatapointType): firstDatapointTypeInfo = formatStr.configureWith(value[formatStr.provider.name]).type typeFormats = firstDatapointTypeInfo.getFormats(self._database) formatStr = '<' + typeFormats[0] if len(typeFormats) > 1: raise TypeError('Dynamically sized child elements are currently not supported') valueTypeInfos[0] = firstDatapointName, firstDatapointTypeInfo if '{' in formatStr: formatStr = formatStr.format(**value) actualSize = struct.calcsize(formatStr) self._checkDynamicMemberSize(memberName=firstDatapointName, actualSize=actualSize) elif issubclass(firstDatapointTypeInfo.type, ArrayType) and \ not issubclass(firstDatapointTypeInfo.type.getElementTypeInfo().type, bytes): try: len(firstDatapointTypeInfo.type) except DynamicSizeError as error: arraySize = value[error.sizeMember] actualSize = struct.calcsize(formatStr[0] + formatStr[1:] * arraySize) self._checkDynamicMemberSize(memberName=firstDatapointName, actualSize=actualSize) formatStr = formatStr[0] + formatStr[1:] * arraySize formatSize = struct.calcsize(formatStr) if len(buffer) < size + formatSize: return None, None rawValues = list(struct.unpack(formatStr, buffer[size:size + formatSize])) size += formatSize while rawValues: name, typeInfo = valueTypeInfos.pop(0) value[name] = self._parse_type(typeInfo, rawValues, value) return value, size def _checkDynamicMemberSize(self, memberName: str, actualSize: int): """ Verify that the actual size of the dynamically sized member is allowed. :raises ValueError: if the size is not allowed. :param memberName: The name of the dynamically sized member. :param actualSize: The actual size of the dynamically sized member. """ if actualSize > self._maxDynamicMemberSize: raise ValueError(f'Got variable sized member {memberName!r} with size {actualSize}B, ' f'which is larger than the allowed {self._maxDynamicMemberSize}B') @classmethod def _parse_type(cls, typeInfo: TypeInfo, rawValues: list, currentValue: dict): """ Parse a value with the given type from the list of raw values. :param typeInfo: The type of the value to parse. :param rawValues: The list of raw values. :param currentValue: The current total value that is being parsed. :return: The parsed value. """ if issubclass(typeInfo.type, StructType): childValues = {} for childName, childType in typeInfo.type: childValues[childName] = cls._parse_type(childType, rawValues, currentValue) return typeInfo.type(childValues) elif issubclass(typeInfo.type, ArrayType): if issubclass(typeInfo.type.getElementTypeInfo().type, bytes): return rawValues.pop(0) else: try: arraySize = len(typeInfo.type) except DynamicSizeError as error: arraySize = currentValue[error.sizeMember] return typeInfo.type(cls._parse_type( typeInfo.type.getElementTypeInfo(), rawValues, currentValue) for _ in range(arraySize)) else: return typeInfo.type(rawValues.pop(0)) @staticmethod def _defaultErrorHandler(parserError: ParserError): """ The default handling of a parser error. :param parserError: The parser error. :raises ParserError: The error. """ raise parserError def _defaultIgnoredBytesHandler(self, buffer: bytes, ignoredBytes: int): """ The default handing of bytes that were ignored during parsing. :param buffer: The current parsing buffer. :param ignoredBytes: The number of bytes that will get ignored. """ self._logger.warning(f'Ignoring {ignoredBytes} bytes')
[docs]class TelemetryParser(Parser[Telemetry]): """ A parser for telemetry messages. """ def __init__(self, database: 'CommunicationDatabase', **kwargs): super().__init__(database, headerType='TelemetryMessageHeader', messageTypes=database.telemetryTypes, **kwargs) @staticmethod def _createParsedMessage(typ: EnumType, data: Dict[str, Any], header: Dict[str, Any]) -> Telemetry: return Telemetry(type=typ, data=data, header=header)
[docs]class TelecommandParser(Parser[Telecommand]): """ A parser for telecommand messages. """ def __init__(self, database: 'CommunicationDatabase', **kwargs): super().__init__(database, headerType='TelecommandMessageHeader', messageTypes=database.telecommandTypes, **kwargs) @staticmethod def _createParsedMessage(typ: EnumType, data: Dict[str, Any], header: Dict[str, Any]) -> Telecommand: return Telecommand(type=typ, data=data, header=header)