import struct
from typing import Dict, Any, TYPE_CHECKING, Tuple
from ecom.message import MessageType
from ecom.datatypes import TypeInfo, StructType
from ecom.verification import MessageVerifier, VerificationError
if TYPE_CHECKING:
from ecom.database import CommunicationDatabase
[docs]def calculateChecksum(data: bytes, polynomial: int = 0x8811) -> int:
"""
Calculate a CRC16 checksum from the data.
:param data: The data to calculate the checksum from.
:param polynomial: The polynomial to use for the crc calculation in nominal form.
:return: The calculated checksum.
"""
crc = 0xFFFF
for byte in data:
crc ^= byte << 8
for _ in range(8):
if (crc & 0x8000) > 0:
crc = (crc << 1) ^ polynomial
else:
crc <<= 1
return crc & 0xFFFF
[docs]class ChecksumVerifier(MessageVerifier):
""" A checksum verifier that can add and verify a CRC16 checksum. """
def __init__(self, database: 'CommunicationDatabase', checksumDatapointName='checksum'):
"""
Initialize a new checksum verifier.
:param database: The communication database.
:param checksumDatapointName: The name of the datapoint in the message headers that will contain the checksum.
"""
super().__init__()
self._database = database
self._checksumDatapointName = checksumDatapointName
[docs] def verify(self, data: bytes, message: MessageType, header: TypeInfo[StructType], messageSize: int):
checksumBytes, checksumOffset = self._calculateChecksumAndOffset(data, header, messageSize)
if checksumBytes != data[checksumOffset:checksumOffset + len(checksumBytes)]:
raise VerificationError('Invalid checksum')
[docs] def addVerificationData(self, data: bytes, message: MessageType, header: TypeInfo[StructType]) -> bytes:
checksumBytes, checksumOffset = self._calculateChecksumAndOffset(data, header, len(data))
return data[:checksumOffset] + checksumBytes + data[checksumOffset + len(checksumBytes):]
[docs] def addPlaceholderVerificationData(self, data: Dict[str, Any]):
data[self._checksumDatapointName] = 0
def _calculateChecksumAndOffset(
self, data: bytes, header: TypeInfo[StructType], messageSize: int) -> Tuple[bytes, int]:
"""
Calculate the checksum of the data and the offset into the data where the checksum should be placed.
:param data: The data to calculate the checksum from. May also include other trailing data.
:param header: Type information about the header that contains the checksum.
:param messageSize: The size of the message, as calculated with the unverified data.
:return: The bytes of the calculated checksum and the offset into the data.
"""
try:
checksumTypeInfo = header.type[self._checksumDatapointName]
except AttributeError:
raise TypeError(f'Header {header.type} does not have the expected '
f'checksum attribute "{self._checksumDatapointName}"')
checksumOffset = header.type.offsetOf(self._database, self._checksumDatapointName)
checksumProtectedData = self._getChecksumProtectedData(
data, header, checksumTypeInfo, checksumOffset, messageSize)
checksumBytes = self._calculateChecksum(checksumTypeInfo, checksumProtectedData)
return checksumBytes, checksumOffset
# noinspection PyUnusedLocal
def _getChecksumProtectedData(self, data: bytes, header: TypeInfo[StructType], checksumTypeInfo: TypeInfo,
checksumOffset: int, messageSize: int) -> bytes:
"""
Get the data that is protected by the checksum from the data of the entire message.
:param data: The data of the entire message and everything after it.
:param header: Type information about the header of the message.
:param checksumTypeInfo: Type information about the checksum member of the header.
:param checksumOffset: The offset in bytes of the checksum member in the header.
:param messageSize: The size of the message, as calculated with the unverified data.
:return: The part of the message data that is protected by the checksum.
"""
return data[checksumOffset + checksumTypeInfo.getSize(self._database):messageSize]
def _calculateChecksum(self, checksumTypeInfo: TypeInfo, data: bytes) -> bytes:
"""
Calculate the checksum for the given data.
:raises TypeError: if the checksum type is not supported.
:param checksumTypeInfo: Type information about how the checksum is represented in the message.
:param data: The data to generate the checksum for.
:return: The checksum for that data serialized according to the checksum type info.
"""
checksumBaseType = checksumTypeInfo.getBaseType(self._database)
if checksumBaseType is TypeInfo.BaseType.UINT16:
return struct.pack('<' + checksumTypeInfo.getFormat(self._database), calculateChecksum(data))
raise TypeError(f'Unsupported checksum data type: {checksumBaseType.value}')