Source code for ecom.response

from abc import ABC, abstractmethod
from typing import Optional, Callable, Iterator, Dict, Any

from ecom.datatypes import TypeInfo
from ecom.parser import TelemetryParser, ParserError
from ecom.message import DependantTelecommandResponseType, Message, TelecommandType, MessageDatapointType, Telecommand
from ecom.serializer import TelecommandSerializer, TelemetrySerializer


[docs]class ResponseTelemetryParser(TelemetryParser, TelecommandSerializer): """ A parser and serializer for telemetry and telecommand messages. It adds extra functionality for response and acknowledge messages: * Both gain a 'command' data point with the telecommand instance that initiated the response. * The 'value' data point of the response message is parsed into it's Python representation. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._telecommandCache = {} self._TELEMETRY_RESPONSE = self._database.telemetryTypeEnum['RESPONSE'] self._TELEMETRY_ACKNOWLEDGE = self._database.telemetryTypeEnum['ACKNOWLEDGE']
[docs] def serialize(self, telecommand: TelecommandType, **kwargs) -> bytes: response = telecommand.response if isinstance(response, DependantTelecommandResponseType): response = response.configureWith(kwargs[response.provider.name]) self._telecommandCache[self._telecommandCounter] = (telecommand, response) return super().serialize(telecommand, **kwargs)
[docs] def parse(self, message: bytes, errorHandler: Optional[Callable[[ParserError], None]] = None, ignoredBytesHandler: Optional[Callable[[bytes, int], None]] = None) -> Iterator[Message]: for telemetry in super().parse(message, errorHandler=errorHandler, ignoredBytesHandler=ignoredBytesHandler): errorBuffer = message message = self._buffer if telemetry.type is self._TELEMETRY_RESPONSE or \ telemetry.type is self._TELEMETRY_ACKNOWLEDGE: commandNumber = telemetry.data['command number'] try: telecommand, response = self._telecommandCache.pop(commandNumber) except KeyError: error = ParserError(f'Received {telemetry.type.name} with an ' f'invalid command number {commandNumber}', errorBuffer) if errorHandler: errorHandler(error) else: self._defaultErrorHandler(error) continue telemetry.data['command'] = telecommand if telemetry.type is self._TELEMETRY_RESPONSE: if response is None: error = ParserError(f'Received {telemetry.type.name} for telecommand ' f'{telecommand.name} which does not expect a response', errorBuffer) if errorHandler: errorHandler(error) else: self._defaultErrorHandler(error) else: value = telemetry.data['value'] try: value = self._parse(self._createParseInfo([ MessageDatapointType(name='value', type=response.typeInfo)]), value)[0]['value'] except ValueError as error: error = ParserError( f'Received telecommand response with an invalid value: {error}', errorBuffer) if errorHandler: errorHandler(error) else: self._defaultErrorHandler(error) continue telemetry.data['value'] = value yield telemetry
[docs]class ResponseTelemetrySerializer(TelemetrySerializer, ABC): """ A serializer that can serialize a response to a received telecommand. This handles logic for responses whose content depend on a received telecommand message. """ def __init__(self, *args, responseTelemetryName: str = 'RESPONSE', acknowledgeTelemetryName: str = 'ACKNOWLEDGE', responseValueDatapointName: str = 'value', **kwargs): """ Initialize a new ResponseTelemetrySerializer. :param args: Arguments for the TelemetrySerializer. :param acknowledgeTelemetryName: The name of the telemetry type that is used to describe acknowledgement messages. :param responseTelemetryName: The name of the telemetry type that is used to describe response messages. :param responseValueDatapointName: The name of the datapoint in the response telemetry that contains the response value. :param kwargs: Keyword arguments for the TelemetrySerializer. """ super().__init__(*args, **kwargs) self._responseValueDatapointName = responseValueDatapointName self._ACKNOWLEDGE_TELEMETRY = self._database.getTelemetryByName(acknowledgeTelemetryName) self._RESPONSE_TELEMETRY = self._database.getTelemetryByName(responseTelemetryName) for datapoint in self._RESPONSE_TELEMETRY.data: if datapoint.name == self._responseValueDatapointName: break else: raise ValueError(f"The response telemetry doesn't have a response value datapoint " f"with the name {responseTelemetryName!r}") self._addCommandNumber = any(datapoint.name == 'command number' for datapoint in self._RESPONSE_TELEMETRY.data)
[docs] def serializeTelecommandResponse(self, telecommand: Telecommand, value: Any, additionalData: Optional[Dict[str, Any]] = None) -> bytes: """ Serialize a response for the given telecommand with the given value. :param telecommand: The telecommand message that this response is responding to. :param value: The response value to the telecommand. :param additionalData: Optional additional data that is required by the response telemetry package. :return: The serialized response. """ responseData = {} if additionalData is None else additionalData.copy() if self._addCommandNumber: responseData['command number'] = telecommand.header['counter'] telecommandType = self._database.getTelecommand(telecommand.type) response = telecommandType.response if isinstance(response, DependantTelecommandResponseType): response = response.configureWith(telecommand.data[response.provider.name]) self._serializeResponseValue(value, response.typeInfo, responseData) return self.serialize(self._RESPONSE_TELEMETRY, **responseData)
[docs] def serializeTelecommandAcknowledge( self, telecommand: Telecommand, additionalData: Optional[Dict[str, Any]] = None) -> bytes: """ Serialize an acknowledgement for the given telecommand. :param telecommand: The telecommand message that this response is acknowledging. :param additionalData: Optional additional data that is required by the acknowledgement telemetry package. :return: The serialized acknowledgement. """ responseData = {} if additionalData is None else additionalData.copy() if self._addCommandNumber: responseData['command number'] = telecommand.header['counter'] return self.serialize(self._ACKNOWLEDGE_TELEMETRY, **responseData)
@abstractmethod def _serializeResponseValue(self, value: Any, typeInfo: TypeInfo, responseDatapoints: Dict[str, Any]): """ Serialize the response value and add it to the response data points. :param value: The value to serialize. :param typeInfo: The type information of the value. :param responseDatapoints: The datapoints that are send with the response. """ pass
[docs]class FixedSizeResponseTelemetrySerializer(ResponseTelemetrySerializer): """ A serializer that supports sending responses in a datapoint that is a list of bytes with a constant size. """ def __init__(self, *args, maxResponseSize: Optional[int] = None, maxResponseSizeConstant: str = 'MAX_TELECOMMAND_RESPONSE_SIZE', **kwargs): """ Initialize a response serializer that works with a fixed sized bytes array in the response payload. :param args: Arguments for the ResponseTelemetrySerializer. :param maxResponseSize: The maximum size in bytes used by a response value. If not given, the value is taken from a constant in the database identified by maxResponseSizeConstant. :param maxResponseSizeConstant: The name of a database constant that is used to determine the maximum bytes used by a response value. Defaults to MAX_TELECOMMAND_RESPONSE_SIZE, which is automatically generated by the database. This is ignored if a maxResponseSize is given. :param kwargs: Keyword arguments for the ResponseTelemetrySerializer. """ super().__init__(*args, **kwargs) if maxResponseSize is not None: self._maxResponseSize = maxResponseSize else: try: self._maxResponseSize = int(self._database.constants[maxResponseSizeConstant].value) except (KeyError, ValueError): raise ValueError(f'{maxResponseSizeConstant!r} is not a valid maximum response size') def _serializeResponseValue(self, value: Any, typeInfo: TypeInfo, responseDatapoints: Dict[str, Any]): responseDatapoints[self._responseValueDatapointName] = self._packType( typeInfo, value, responseDatapoints).ljust(self._maxResponseSize, b'\x00')
[docs]class VariableSizedResponseTelemetrySerializer(ResponseTelemetrySerializer): """ A serializer that supports sending responses in a variable sized response package. """ def __init__(self, *args, sizeDatapointName: str = 'size', **kwargs): """ Initialize a response serializer that will serialize a variable sized response. :param args: Arguments for the ResponseTelemetrySerializer. :param sizeDatapointName: The name of the datapoint in the response package that holds the size of the response value. :param kwargs: Keyword arguments for the ResponseTelemetrySerializer. """ super().__init__(*args, **kwargs) self._responseSizeDatapointName = sizeDatapointName for datapoint in self._RESPONSE_TELEMETRY.data: if datapoint.name == sizeDatapointName: break else: raise ValueError(f"The response telemetry doesn't have a response size datapoint " f"with the name {sizeDatapointName!r}") def _serializeResponseValue(self, value: Any, typeInfo: TypeInfo, responseDatapoints: Dict[str, Any]): responsePayload = self._packType(typeInfo, value, responseDatapoints) responseDatapoints[self._responseSizeDatapointName] = len(responsePayload) responseDatapoints[self._responseValueDatapointName] = responsePayload