Source code for ecom.checksum

import struct

from typing import Dict, Any, TYPE_CHECKING, Tuple

from ecom.message import MessageType
from ecom.datatypes import TypeInfo, StructType
from ecom.verification import MessageVerifier, VerificationError

if TYPE_CHECKING:
    from ecom.database import CommunicationDatabase


[docs]def calculateChecksum(data: bytes, polynomial: int = 0x8811) -> int: """ Calculate a CRC16 checksum from the data. :param data: The data to calculate the checksum from. :param polynomial: The polynomial to use for the crc calculation in nominal form. :return: The calculated checksum. """ crc = 0xFFFF for byte in data: crc ^= byte << 8 for _ in range(8): if (crc & 0x8000) > 0: crc = (crc << 1) ^ polynomial else: crc <<= 1 return crc & 0xFFFF
[docs]class ChecksumVerifier(MessageVerifier): """ A checksum verifier that can add and verify a CRC16 checksum. """ def __init__(self, database: 'CommunicationDatabase', checksumDatapointName='checksum'): """ Initialize a new checksum verifier. :param database: The communication database. :param checksumDatapointName: The name of the datapoint in the message headers that will contain the checksum. """ super().__init__() self._database = database self._checksumDatapointName = checksumDatapointName
[docs] def verify(self, data: bytes, message: MessageType, header: TypeInfo[StructType], messageSize: int): checksumBytes, checksumOffset = self._calculateChecksumAndOffset(data, header, messageSize) if checksumBytes != data[checksumOffset:checksumOffset + len(checksumBytes)]: raise VerificationError('Invalid checksum')
[docs] def addVerificationData(self, data: bytes, message: MessageType, header: TypeInfo[StructType]) -> bytes: checksumBytes, checksumOffset = self._calculateChecksumAndOffset(data, header, len(data)) return data[:checksumOffset] + checksumBytes + data[checksumOffset + len(checksumBytes):]
[docs] def addPlaceholderVerificationData(self, data: Dict[str, Any]): data[self._checksumDatapointName] = 0
def _calculateChecksumAndOffset( self, data: bytes, header: TypeInfo[StructType], messageSize: int) -> Tuple[bytes, int]: """ Calculate the checksum of the data and the offset into the data where the checksum should be placed. :param data: The data to calculate the checksum from. May also include other trailing data. :param header: Type information about the header that contains the checksum. :param messageSize: The size of the message, as calculated with the unverified data. :return: The bytes of the calculated checksum and the offset into the data. """ try: checksumTypeInfo = header.type[self._checksumDatapointName] except AttributeError: raise TypeError(f'Header {header.type} does not have the expected ' f'checksum attribute "{self._checksumDatapointName}"') checksumOffset = header.type.offsetOf(self._database, self._checksumDatapointName) checksumProtectedData = self._getChecksumProtectedData( data, header, checksumTypeInfo, checksumOffset, messageSize) checksumBytes = self._calculateChecksum(checksumTypeInfo, checksumProtectedData) return checksumBytes, checksumOffset # noinspection PyUnusedLocal def _getChecksumProtectedData(self, data: bytes, header: TypeInfo[StructType], checksumTypeInfo: TypeInfo, checksumOffset: int, messageSize: int) -> bytes: """ Get the data that is protected by the checksum from the data of the entire message. :param data: The data of the entire message and everything after it. :param header: Type information about the header of the message. :param checksumTypeInfo: Type information about the checksum member of the header. :param checksumOffset: The offset in bytes of the checksum member in the header. :param messageSize: The size of the message, as calculated with the unverified data. :return: The part of the message data that is protected by the checksum. """ return data[checksumOffset + checksumTypeInfo.getSize(self._database):messageSize] def _calculateChecksum(self, checksumTypeInfo: TypeInfo, data: bytes) -> bytes: """ Calculate the checksum for the given data. :raises TypeError: if the checksum type is not supported. :param checksumTypeInfo: Type information about how the checksum is represented in the message. :param data: The data to generate the checksum for. :return: The checksum for that data serialized according to the checksum type info. """ checksumBaseType = checksumTypeInfo.getBaseType(self._database) if checksumBaseType is TypeInfo.BaseType.UINT16: return struct.pack('<' + checksumTypeInfo.getFormat(self._database), calculateChecksum(data)) raise TypeError(f'Unsupported checksum data type: {checksumBaseType.value}')