Source code for ecom.serializer

import struct

from abc import ABC
from enum import Enum
from typing import TYPE_CHECKING, Optional, Any, Dict

from ecom.message import DependantTelecommandDatapointType, TelecommandType, MessageType, MessageDatapointType, \
    TelecommandDatapointType, TelemetryType
from ecom.datatypes import StructType, CommunicationDatabaseAccessor, TypeInfo, ArrayType, DynamicSizeError
from ecom.verification import MessageVerifier

if TYPE_CHECKING:
    from ecom.database import CommunicationDatabase


[docs]class Serializer(CommunicationDatabaseAccessor, ABC): """ A serializer of messages. """ def __init__(self, database: 'CommunicationDatabase', headerType: str, verifier: Optional[MessageVerifier] = None): """ Initialize a new telecommand serializer. :param database: The communication database. :param headerType: The name of the message header type. :param verifier: An optional verifier that will be used to add verification data to the serialized messages. """ super().__init__(database) self._verifier = verifier self._headerTypeInfo = database.getTypeInfo(headerType) # type: TypeInfo[StructType] if not self._headerTypeInfo: raise RuntimeError(f'Database is missing the "{headerType}" type') if not issubclass(self._headerTypeInfo.type, StructType): raise RuntimeError(f'Invalid header type: "{headerType}" type must be a struct type, ' f'but is {type(self._headerTypeInfo.type)}') # TODO: Once we remove support for Python 3.7, change the signature to # def _serialize(self, message: MessageType, /, **kwargs) -> bytes: def _serialize(self, message: MessageType, kwargs) -> bytes: """ Serialize the message and its data into bytes. :param message: The message type to serialize. :param kwargs: Data for the message. :return: The serialized message. """ argumentData = b'' processedArguments = {} for datapoint in message.data: argumentValue = kwargs.pop(datapoint.name, self._getDatapointDefaultValue(datapoint)) if argumentValue is None: for dynamicallySizedDatapoint in message.data: if dynamicallySizedDatapoint.type is not None and \ issubclass(dynamicallySizedDatapoint.type.type, ArrayType): try: len(dynamicallySizedDatapoint.type.type) except DynamicSizeError as error: if error.sizeMember == datapoint.name: argumentValue = len(kwargs[dynamicallySizedDatapoint.name]) break else: raise ValueError(f'Missing value for message "{message.id.name}" datapoint "{datapoint.name}"') processedArguments[datapoint.name] = argumentValue if isinstance(datapoint, DependantTelecommandDatapointType): typeInfo = datapoint.configureWith( processedArguments[datapoint.provider.name]).type else: typeInfo = datapoint.type argumentData += self._packType(typeInfo, argumentValue, processedArguments) if kwargs: raise ValueError(f'Unexpected values for message "{message.id.name}": {kwargs}') telecommandValues = self._buildHeaderData(message) if self._verifier is not None: self._verifier.addPlaceholderVerificationData(telecommandValues) payload = self._packType(self._headerTypeInfo, telecommandValues, processedArguments) + argumentData if self._verifier is not None: payload = self._verifier.addVerificationData(payload, message, self._headerTypeInfo) return payload def _getDatapointDefaultValue(self, datapoint: MessageDatapointType) -> Optional[Any]: """ Get the value for a datapoint that should be used when no value is specified explicitly. If there is no default, return `None` instead. :param datapoint: The datapoint whose default value should be returned. :return: The default value of the datapoint. """ return None def _buildHeaderData(self, message: MessageType) -> Dict[str, Any]: """ Build the data that makes up the header of the given message type. :param message: A message type. :return: The name to value mapping of the header data for the given message. """ return {'type': message.id.value} def _packType(self, typeInfo, value, previousValues): """ Serialize the given value into bytes. :param typeInfo: The type of the value. :param value: The value. :param previousValues: A name to value mapping of previously used values that can be used to determine the size of the type. :return: The serialized value. """ if issubclass(typeInfo.type, StructType): if not isinstance(value, dict): raise TypeError(f'Value "{value}" is of invalid type ' f'"{type(value)}" for type "{typeInfo.name}"') packedData = b'' value = value.copy() for name, childTypeInfo in typeInfo.type: childValue = value.pop(name, None) if childValue is None: if childTypeInfo.default is None: raise ValueError( f'Missing value for child "{name}" of type "{typeInfo.name}"') childValue = childTypeInfo.default.value packedData += self._packType(childTypeInfo, childValue, previousValues) if value: raise ValueError(f'Unexpected value for type "{typeInfo.name}": {value}') return packedData elif issubclass(typeInfo.type, ArrayType) and not isinstance(value, bytes): if not isinstance(value, list): raise TypeError(f'Value "{value}" is of invalid type ' f'"{type(value)}" for type "{typeInfo.name}"') try: size = len(typeInfo.type) except DynamicSizeError as error: if previousValues is None or error.sizeMember not in previousValues: raise size = previousValues[error.sizeMember] if len(value) != size: raise ValueError(f'Value "{value}" has an invalid length: Got {len(value)}, expected {size}') itemTypeInfo = typeInfo.type.getElementTypeInfo() packedData = b'' for item in value: packedData += self._packType(itemTypeInfo, item, previousValues) return packedData elif issubclass(typeInfo.type, Enum): if not isinstance(value, Enum): value = typeInfo.type(value) value = value.value try: return struct.pack('<' + typeInfo.getFormat(self._database, previousValues), value) except struct.error as error: raise ValueError(f'Failed to serialize value {value!r} (type {type(value).__name__}) ' f'to type {typeInfo.type.__name__}: {error}')
[docs]class TelecommandSerializer(Serializer): """ A serializer of telecommands. """ def __init__(self, database: 'CommunicationDatabase', telecommandHeaderType='TelecommandMessageHeader', counterMemberName='counter', **kwargs): """ Initialize a new telecommand serializer. :param database: The communication database. :param telecommandHeaderType: The name of the telecommand message header data type. :param counterMemberName: The name of the member of the telecommand message header which is used to assign an id to each telecommand. """ super().__init__(database, telecommandHeaderType, **kwargs) self._counterMemberName = counterMemberName self._addCounter = counterMemberName in self._headerTypeInfo.type if self._addCounter: self._upperCounterLimit = self._headerTypeInfo.type[counterMemberName].getMaxNumericValue(database) + 1 else: self._upperCounterLimit = 0 self._telecommandCounter = 0 # TODO: Once we remove support for Python 3.7, change the signature to # def serialize(self, telecommand: TelecommandType, /, **kwargs) -> bytes:
[docs] def serialize(self, telecommand: TelecommandType, **kwargs) -> bytes: """ Serialize the telecommand and its data into bytes. :param telecommand: The telecommand. :param kwargs: Data for the telecommand. :return: The serialized telecommand. """ return self._serialize(telecommand, kwargs)
@property
[docs] def nextTelecommandCounter(self) -> int: """ :return: The next telecommand counter that will be used when serializing a telecommand. """ return self._telecommandCounter
def _getDatapointDefaultValue(self, datapoint: TelecommandDatapointType): return datapoint.default def _buildHeaderData(self, message: MessageType): data = super()._buildHeaderData(message) if self._addCounter: data[self._counterMemberName] = self._telecommandCounter self._telecommandCounter += 1 self._telecommandCounter %= self._upperCounterLimit return data
[docs]class TelemetrySerializer(Serializer): """ A serializer of telemetry messages. """ def __init__(self, database: 'CommunicationDatabase', telemetryHeaderType='TelemetryMessageHeader', **kwargs): """ Initialize a new telemetry serializer. :param database: The communication database. :param telemetryHeaderType: The name of the telemetry message header data type. """ super().__init__(database, telemetryHeaderType, **kwargs) # TODO: Once we remove support for Python 3.7, change the signature to # def serialize(self, telemetry: TelemetryType, /, **kwargs) -> bytes:
[docs] def serialize(self, telemetry: TelemetryType, **kwargs) -> bytes: """ Serialize the telemetry and its data into bytes. :param telemetry: The telemetry. :param kwargs: Data for the telemetry. :return: The serialized telemetry. """ return self._serialize(telemetry, kwargs)