import struct
from abc import ABC
from enum import Enum
from typing import TYPE_CHECKING, Optional, Any, Dict
from ecom.message import DependantTelecommandDatapointType, TelecommandType, MessageType, MessageDatapointType, \
TelecommandDatapointType, TelemetryType
from ecom.datatypes import StructType, CommunicationDatabaseAccessor, TypeInfo, ArrayType, DynamicSizeError
from ecom.verification import MessageVerifier
if TYPE_CHECKING:
from ecom.database import CommunicationDatabase
[docs]class Serializer(CommunicationDatabaseAccessor, ABC):
""" A serializer of messages. """
def __init__(self, database: 'CommunicationDatabase', headerType: str, verifier: Optional[MessageVerifier] = None):
"""
Initialize a new telecommand serializer.
:param database: The communication database.
:param headerType: The name of the message header type.
:param verifier: An optional verifier that will be used to add verification data to the serialized messages.
"""
super().__init__(database)
self._verifier = verifier
self._headerTypeInfo = database.getTypeInfo(headerType) # type: TypeInfo[StructType]
if not self._headerTypeInfo:
raise RuntimeError(f'Database is missing the "{headerType}" type')
if not issubclass(self._headerTypeInfo.type, StructType):
raise RuntimeError(f'Invalid header type: "{headerType}" type must be a struct type, '
f'but is {type(self._headerTypeInfo.type)}')
# TODO: Once we remove support for Python 3.7, change the signature to
# def _serialize(self, message: MessageType, /, **kwargs) -> bytes:
def _serialize(self, message: MessageType, kwargs) -> bytes:
"""
Serialize the message and its data into bytes.
:param message: The message type to serialize.
:param kwargs: Data for the message.
:return: The serialized message.
"""
argumentData = b''
processedArguments = {}
for datapoint in message.data:
argumentValue = kwargs.pop(datapoint.name, self._getDatapointDefaultValue(datapoint))
if argumentValue is None:
for dynamicallySizedDatapoint in message.data:
if dynamicallySizedDatapoint.type is not None and \
issubclass(dynamicallySizedDatapoint.type.type, ArrayType):
try:
len(dynamicallySizedDatapoint.type.type)
except DynamicSizeError as error:
if error.sizeMember == datapoint.name:
argumentValue = len(kwargs[dynamicallySizedDatapoint.name])
break
else:
raise ValueError(f'Missing value for message "{message.id.name}" datapoint "{datapoint.name}"')
processedArguments[datapoint.name] = argumentValue
if isinstance(datapoint, DependantTelecommandDatapointType):
typeInfo = datapoint.configureWith(
processedArguments[datapoint.provider.name]).type
else:
typeInfo = datapoint.type
argumentData += self._packType(typeInfo, argumentValue, processedArguments)
if kwargs:
raise ValueError(f'Unexpected values for message "{message.id.name}": {kwargs}')
telecommandValues = self._buildHeaderData(message)
if self._verifier is not None:
self._verifier.addPlaceholderVerificationData(telecommandValues)
payload = self._packType(self._headerTypeInfo, telecommandValues, processedArguments) + argumentData
if self._verifier is not None:
payload = self._verifier.addVerificationData(payload, message, self._headerTypeInfo)
return payload
def _getDatapointDefaultValue(self, datapoint: MessageDatapointType) -> Optional[Any]:
"""
Get the value for a datapoint that should be used when no value is specified explicitly.
If there is no default, return `None` instead.
:param datapoint: The datapoint whose default value should be returned.
:return: The default value of the datapoint.
"""
return None
def _buildHeaderData(self, message: MessageType) -> Dict[str, Any]:
"""
Build the data that makes up the header of the given message type.
:param message: A message type.
:return: The name to value mapping of the header data for the given message.
"""
return {'type': message.id.value}
def _packType(self, typeInfo, value, previousValues):
"""
Serialize the given value into bytes.
:param typeInfo: The type of the value.
:param value: The value.
:param previousValues: A name to value mapping of previously used values
that can be used to determine the size of the type.
:return: The serialized value.
"""
if issubclass(typeInfo.type, StructType):
if not isinstance(value, dict):
raise TypeError(f'Value "{value}" is of invalid type '
f'"{type(value)}" for type "{typeInfo.name}"')
packedData = b''
value = value.copy()
for name, childTypeInfo in typeInfo.type:
childValue = value.pop(name, None)
if childValue is None:
if childTypeInfo.default is None:
raise ValueError(
f'Missing value for child "{name}" of type "{typeInfo.name}"')
childValue = childTypeInfo.default.value
packedData += self._packType(childTypeInfo, childValue, previousValues)
if value:
raise ValueError(f'Unexpected value for type "{typeInfo.name}": {value}')
return packedData
elif issubclass(typeInfo.type, ArrayType) and not isinstance(value, bytes):
if not isinstance(value, list):
raise TypeError(f'Value "{value}" is of invalid type '
f'"{type(value)}" for type "{typeInfo.name}"')
try:
size = len(typeInfo.type)
except DynamicSizeError as error:
if previousValues is None or error.sizeMember not in previousValues:
raise
size = previousValues[error.sizeMember]
if len(value) != size:
raise ValueError(f'Value "{value}" has an invalid length: Got {len(value)}, expected {size}')
itemTypeInfo = typeInfo.type.getElementTypeInfo()
packedData = b''
for item in value:
packedData += self._packType(itemTypeInfo, item, previousValues)
return packedData
elif issubclass(typeInfo.type, Enum):
if not isinstance(value, Enum):
value = typeInfo.type(value)
value = value.value
try:
return struct.pack('<' + typeInfo.getFormat(self._database, previousValues), value)
except struct.error as error:
raise ValueError(f'Failed to serialize value {value!r} (type {type(value).__name__}) '
f'to type {typeInfo.type.__name__}: {error}')
[docs]class TelecommandSerializer(Serializer):
""" A serializer of telecommands. """
def __init__(self, database: 'CommunicationDatabase', telecommandHeaderType='TelecommandMessageHeader',
counterMemberName='counter', **kwargs):
"""
Initialize a new telecommand serializer.
:param database: The communication database.
:param telecommandHeaderType: The name of the telecommand message header data type.
:param counterMemberName: The name of the member of the telecommand message header which is used
to assign an id to each telecommand.
"""
super().__init__(database, telecommandHeaderType, **kwargs)
self._counterMemberName = counterMemberName
self._addCounter = counterMemberName in self._headerTypeInfo.type
if self._addCounter:
self._upperCounterLimit = self._headerTypeInfo.type[counterMemberName].getMaxNumericValue(database) + 1
else:
self._upperCounterLimit = 0
self._telecommandCounter = 0
# TODO: Once we remove support for Python 3.7, change the signature to
# def serialize(self, telecommand: TelecommandType, /, **kwargs) -> bytes:
[docs] def serialize(self, telecommand: TelecommandType, **kwargs) -> bytes:
"""
Serialize the telecommand and its data into bytes.
:param telecommand: The telecommand.
:param kwargs: Data for the telecommand.
:return: The serialized telecommand.
"""
return self._serialize(telecommand, kwargs)
@property
[docs] def nextTelecommandCounter(self) -> int:
"""
:return: The next telecommand counter that will be used when serializing a telecommand.
"""
return self._telecommandCounter
def _getDatapointDefaultValue(self, datapoint: TelecommandDatapointType):
return datapoint.default
def _buildHeaderData(self, message: MessageType):
data = super()._buildHeaderData(message)
if self._addCounter:
data[self._counterMemberName] = self._telecommandCounter
self._telecommandCounter += 1
self._telecommandCounter %= self._upperCounterLimit
return data
[docs]class TelemetrySerializer(Serializer):
""" A serializer of telemetry messages. """
def __init__(self, database: 'CommunicationDatabase', telemetryHeaderType='TelemetryMessageHeader', **kwargs):
"""
Initialize a new telemetry serializer.
:param database: The communication database.
:param telemetryHeaderType: The name of the telemetry message header data type.
"""
super().__init__(database, telemetryHeaderType, **kwargs)
# TODO: Once we remove support for Python 3.7, change the signature to
# def serialize(self, telemetry: TelemetryType, /, **kwargs) -> bytes:
[docs] def serialize(self, telemetry: TelemetryType, **kwargs) -> bytes:
"""
Serialize the telemetry and its data into bytes.
:param telemetry: The telemetry.
:param kwargs: Data for the telemetry.
:return: The serialized telemetry.
"""
return self._serialize(telemetry, kwargs)