from abc import ABC, abstractmethod
from typing import Optional, Any, List, Dict, Iterator
from dataclasses import dataclass
from collections import OrderedDict
from ecom.datatypes import TypeInfo, EnumType, ArrayType, DynamicSizeError
@dataclass(frozen=True)
[docs]class MessageDatapointType:
""" A base for all message datapoint types. """
""" The name of this datapoint. """
""" The type information of this datapoint. """
[docs] description: Optional[str] = None
""" The description of this datapoint. """
@dataclass(frozen=True)
[docs]class MessageType:
""" A base for all message types. """
""" An enum value representing this message type. """
[docs] data: List[MessageDatapointType]
""" A list of datapoints that will be transmitted with this message. """
@dataclass(frozen=True)
[docs]class TelecommandResponseType:
""" A type of telecommand response. """
""" The name of the response value. """
""" The type of the response value. """
[docs] description: Optional[str] = None
""" A description of the response value. """
@dataclass(frozen=True)
[docs]class DependantTelecommandResponseType(TelecommandResponseType, ABC):
""" A telecommand response whose type depends on the value of a datapoint of the telecommand. """
[docs] provider: 'TelecommandDatapointType' = None
""" The datapoint that this response is dependent on. """
@abstractmethod
@dataclass(frozen=True)
[docs]class TelemetryDatapointType(MessageDatapointType):
""" A telemetry response data type. """
@dataclass(frozen=True)
[docs]class TelemetryType(MessageType):
""" A type of telemetry message. """
""" An enum value representing this telemetry type. See CommunicationDatabase.telemetryTypeEnum. """
[docs] data: List[TelemetryDatapointType]
""" A list of datapoints that will be transmitted with the telemetry. """
@dataclass(frozen=True)
[docs]class Message:
""" A generic message. """
"""
The type of the message.
See CommunicationDatabase.telemetryTypeEnum and CommunicationDatabase.telecommandTypeEnum.
"""
""" The data of this message. """
""" The data from the header of this message. """
[docs]class Telemetry(Message):
""" A telemetry message. This is a message that has been sent to the base. """
[docs]class Telecommand(Message):
""" A telecommand message. This is a message that has been sent from the base. """
@dataclass(frozen=True)
[docs]class TelecommandDatapointType(MessageDatapointType):
""" A datapoint of a telecommand. """
[docs] default: Optional = None
""" The default value for the datapoint. """
@dataclass(frozen=True)
[docs]class DependantTelecommandDatapointType(TelecommandDatapointType, ABC):
""" A datapoint whose type depends on the value of another datapoint. """
[docs] provider: TelecommandDatapointType = None
""" The argument that this datapoint is dependent on. """
@abstractmethod
@dataclass(frozen=True)
[docs]class TelecommandType(MessageType):
""" A telecommand message. """
""" An enum value representing this telecommand type. See CommunicationDatabase.telecommandTypeEnum. """
[docs] data: List[TelecommandDatapointType]
""" The datapoints of the telecommand. """
[docs] response: Optional[TelecommandResponseType]
""" The type information of the return value of the telecommand. """
[docs] description: Optional[str]
""" A description of the telecommand. """
""" Whether the telecommand is a debugging command. """
[docs]def iterateRequiredDatapoints(telecommand: TelecommandType) -> Iterator[TelecommandDatapointType]:
"""
Iterate over the datapoints of the given telecommand that are required to serialize the telecommand.
Parameters not included in the resulting iterator can be deduced from other parameters.
:param telecommand: The telecommand type whose required datapoints should be iterated.
:return: An iterator over the required datapoints of the telecommand type.
"""
parameters = OrderedDict()
for parameter in telecommand.data:
parameters[parameter.name] = parameter
if not isinstance(parameter, DependantTelecommandDatapointType) \
and issubclass(parameter.type.type, ArrayType):
try:
len(parameter.type.type)
except DynamicSizeError as error:
parameters.pop(error.sizeMember, None)
return (parameter for parameter in parameters.values())