#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import csv
import json
import dataclasses
from typing import List, Dict, Any, Optional, Generic, Type, Callable, Union, Tuple
from argparse import ArgumentParser
from collections import OrderedDict
try:
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from ecom.message import TelemetryDatapointType, DependantTelecommandResponseType, TelemetryType, MessageType, \
TelecommandDatapointType, DependantTelecommandDatapointType, TelecommandResponseType, TelecommandType, \
MessageDatapointType
from ecom.datatypes import TypeInfo, StructType, ArrayType, EnumType, DefaultValueInfo, V, DynamicSizeError, \
loadTypedValue
except ImportError:
raise
@dataclasses.dataclass(frozen=True)
[docs]class Unit(TypeInfo):
""" A representation of a physical unit. """
@classmethod
[docs] def fromTypeInfo(cls, name: str, typeInfo: TypeInfo,
description: Optional[str] = None) -> 'Unit':
"""
Create a new unit type from a given base type.
:param name: The name of the unit.
:param typeInfo: The base type.
:param description: The description of the unit.
:return: The new unit.
"""
return cls(typeInfo.type, name, baseTypeName=typeInfo.name,
description=None if description is None else description.strip(), default=typeInfo.default)
@dataclasses.dataclass(frozen=True)
[docs]class Configuration(Generic[V]):
""" A configuration item of the secondary device. """
""" The id of the configuration item. """
""" The name of the configuration item. """
""" The type of the configuration. """
""" The default value of the configuration. """
[docs] description: Optional[str] = None
""" A description of the configuration. """
[docs]class CommunicationDatabaseError(RuntimeError):
""" Indicates an error in the communication database. """
pass
[docs]class UnknownTypeError(CommunicationDatabaseError):
""" Information about an unknown shared datatype was requested. """
def __init__(self, typ: str):
"""
:param typ: The unknown type.
"""
super().__init__(f'Unknown type "{typ}"')
self.type = typ
[docs]class UnknownConstantError(ValueError):
""" Information about an unknown shared constant was requested. """
def __init__(self, constant: str):
"""
:param constant: The name of the unknown constant.
"""
super().__init__(f'Unknown constant "{constant}"')
self.constant = constant
[docs]class UnknownDatapointError(CommunicationDatabaseError):
""" Information about an unknown datapoint was requested. """
def __init__(self, datapointName: str):
"""
:param datapointName: The name of the unknown datapoint.
"""
super().__init__(f'Unknown argument "{datapointName}"')
self._datapointName = datapointName
@property
[docs] def datapointName(self):
"""
:return: The name of the unknown datapoint.
"""
return self._datapointName
@dataclasses.dataclass(frozen=True)
class _DbContainer:
""" Helper dataclass that takes stores a communication database. """
_database: 'CommunicationDatabase' = dataclasses.field(repr=False, hash=False, compare=False)
""" The communication database that contains the configurations. """
@dataclasses.dataclass(frozen=True)
[docs]class ConfigurationValueDatapoint(DependantTelecommandDatapointType, _DbContainer):
""" A datapoint that is the value of a configuration. """
[docs]class ConfigurationValueResponseType(DependantTelecommandResponseType):
""" A response type that is the value of a configuration. """
_database: 'CommunicationDatabase'
""" The communication database that contains the configurations. """
def __init__(self, database: 'CommunicationDatabase', *args, **kwargs):
super().__init__(*args, **kwargs)
self.__setattr__('_database', database)
@dataclasses.dataclass
[docs]class Constant(Generic[V]):
""" A constant in the communication database. """
""" The name of the constant. """
""" The value of the constant. """
""" Information about the type of the constant. """
""" A description of the constant. """
[docs]class CommunicationDatabase:
""" The shared communication database. Contains all information about the communication. """
def __init__(self, dataDirectory: str):
"""
Load the shared communication database.
:param dataDirectory: The directory in which the database is stored.
"""
super().__init__()
self._path = dataDirectory
self._units = OrderedDict() # type: Dict[str, List[Unit]]
self._constants = OrderedDict() # type: Dict[str, Constant]
self._typeMapping = OrderedDict() # type: Dict[str, TypeInfo]
self._telecommandTypes = [] # type: List[TelecommandType]
self._telemetryTypes = [] # type: List[TelemetryType]
self._configurations = [] # type: List[Configuration]
self._changeListeners = [] # type: List[Callable[[], None]]
telecommandsFile = os.path.join(dataDirectory, 'commands.csv')
self._loadUnits(os.path.join(dataDirectory, 'units.csv'))
self._loadConstants(os.path.join(dataDirectory, 'sharedConstants.csv'))
self._loadTelecommands(telecommandsFile)
self._loadTelemetry(os.path.join(dataDirectory, 'telemetry.csv'))
lateInitTypes = self._loadTypes(os.path.join(dataDirectory, 'sharedDataTypes.json'))
self._loadConfigurations(os.path.join(dataDirectory, 'configuration.csv'))
self._loadTelecommandArguments(os.path.join(dataDirectory, 'commandArguments'))
for name, description in lateInitTypes.items():
self._typeMapping[name] = self._loadType(name, description)
self._loadTelecommandResponses(telecommandsFile)
self._loadTelemetryArguments(os.path.join(dataDirectory, 'telemetryArguments'))
[docs] def __eq__(self, o: object) -> bool:
return isinstance(o, CommunicationDatabase) \
and o.units == self.units \
and o.constants == self.constants \
and o.dataTypes == self.dataTypes \
and o.telemetryTypes == self.telemetryTypes \
and o.telecommandTypes == self.telecommandTypes \
and o.configurations == self.configurations
[docs] def __repr__(self) -> str:
return f'{self.__class__.__name__}({self._path!r})'
@property
[docs] def constants(self) -> Dict[str, Constant]:
"""
:return: A name to value, description and type information mapping for all shared constants.
"""
return self._constants
@property
[docs] def telecommandTypes(self) -> List[TelecommandType]:
"""
:return: All telecommand types.
"""
return self._telecommandTypes
@property
[docs] def telecommandTypeEnum(self) -> Optional[Type[EnumType]]:
"""
:return: The enum class that is used for the telecommand types or None if no telecommand types exist.
"""
for telecommandType in self.telecommandTypes:
return telecommandType.id.__class__
return None
@property
[docs] def telemetryTypes(self) -> List[TelemetryType]:
"""
:return: All telemetry types.
"""
return self._telemetryTypes
@property
[docs] def telemetryTypeEnum(self) -> Optional[Type[EnumType]]:
"""
:return: The enum class that is used for the telemetry types or None if no telemetry types exist.
"""
for telemetryType in self.telemetryTypes:
return telemetryType.id.__class__
return None
@property
[docs] def dataTypes(self) -> Dict[str, TypeInfo]:
"""
:return: All shared data types.
"""
return self._typeMapping
@property
[docs] def units(self) -> Dict[str, List[Unit]]:
"""
:return: All mapping of names to all known units and their variants.
"""
return self._units
@property
[docs] def configurations(self) -> List[Configuration]:
"""
:return: All configuration items of the secondary device.
"""
return self._configurations
@property
[docs] def configurationEnum(self) -> Optional[Type[EnumType]]:
"""
:return: The enum class that is used for the configurations or None if no configurations exist.
"""
for configuration in self.configurations:
return configuration.id.__class__
return None
[docs] def getTypeInfo(self, typeName: str) -> TypeInfo:
"""
Get the type info for a known datatype by its name.
:param typeName: The name of the type.
:return: The type info for the data type.
"""
typeName = typeName.strip()
bracketIndex = typeName.find('[')
if bracketIndex != -1:
typeName = typeName[:bracketIndex]
try:
return TypeInfo.lookupBaseType(typeName)
except KeyError:
pass
try:
return self._typeMapping[typeName]
except KeyError:
pass
if typeName.endswith(')'):
parenthesesIndex = typeName.rfind('(')
if parenthesesIndex != -1:
unitName = typeName[parenthesesIndex + 1:-1].strip()
typeName = typeName[:parenthesesIndex].strip()
return self._getOrCreateUnitVariant(unitName, typeName)
try:
return self._units[typeName][0]
except KeyError:
raise UnknownTypeError(typeName) from None
[docs] def getTelecommandByName(self, name: str) -> TelecommandType:
"""
:param name: The name of a telecommand type.
:return: The telecommand type with the specified name.
"""
for telecommand in self.telecommandTypes:
if telecommand.id.name == name:
return telecommand
raise ValueError(f'Unknown telecommand "{name}"')
[docs] def getTelecommand(self, telecommandId: EnumType) -> TelecommandType:
"""
:param telecommandId: The id enum value of a telecommand type.
:return: The telecommand type with the specified id.
"""
for telecommand in self.telecommandTypes:
if telecommand.id is telecommandId:
return telecommand
raise ValueError(f'Unknown telecommand "{telecommandId.name}"')
[docs] def getTelemetryByName(self, name: str) -> TelemetryType:
"""
:param name: The name of a telemetry type.
:return: The telemetry type with the specified name.
"""
for telemetry in self.telemetryTypes:
if telemetry.id.name == name:
return telemetry
raise ValueError(f'Unknown telemetry "{name}"')
[docs] def getTelemetry(self, telemetryId: EnumType) -> TelemetryType:
"""
:param telemetryId: The id enum value of a telemetry type.
:return: The telemetry type with the specified id.
"""
for telemetry in self.telemetryTypes:
if telemetry.id is telemetryId:
return telemetry
raise ValueError(f'Unknown telemetry "{telemetryId.name}"')
[docs] def parseKnownTypeInfo(self, typeStr: str, name: Optional[str] = None, documentation: Optional[str] = None):
"""
Get the type info for a known datatype.
Supports parsing type strings declaring an array of a known datatype.
:param typeStr: The type string to parse.
:param name: The name of the array data type, if the type is an array. If not provided, the typeStr is used.
:param documentation: The documentation of the data type, if the type is an array.
:return: The type information for the parsed type.
"""
if name is None:
name = typeStr
bracketIndex = typeStr.find('[')
if bracketIndex != -1:
size = typeStr[bracketIndex + 1:-1]
if size.startswith('.'):
size = size[1:]
else:
size = self._parseValue(size, int)
baseType = self.getTypeInfo(typeStr[:bracketIndex])
return TypeInfo(ArrayType(
name, baseType, size, documentation), typeStr, typeStr, documentation)
else:
return self.getTypeInfo(typeStr)
[docs] def replaceType(self, typ: Type[V], name: Optional[str] = None):
"""
Replace a type registered in the database with another Python class.
This allows to define own types like enums for types defined in the communication database,
make sure that the two types match and keep up to date with each other,
and use the types on the Python side with IDE autocompletion support.
:param typ: The new type to replace the old type.
:param name: The name of the type to replace, if omitted the class name of the new type is used.
"""
if name is None:
name = typ.__name__
existingTypeInfo = self.getTypeInfo(name)
if typ != existingTypeInfo.type:
raise TypeError(f'The new type {typ!r} (from {typ.__module__}) does not match the existing type '
f'{existingTypeInfo.type!r} (from {existingTypeInfo.type.__module__})')
newTypeInfo = existingTypeInfo.copyWithType(typ)
if isinstance(existingTypeInfo, Unit):
# noinspection PyTypeChecker
self._units[name][0] = newTypeInfo
else:
self._typeMapping[name] = newTypeInfo
for name, typeInfo in self._typeMapping.items():
typesToCheck = [typeInfo]
while typesToCheck:
typeInfo = typesToCheck.pop(0)
if issubclass(typeInfo.type, StructType):
for childName, childType in typeInfo.type:
if childType.type is existingTypeInfo.type:
typeInfo.type.__children__[childName] = childType.copyWithType(typ)
else:
typesToCheck.append(childType)
elif issubclass(typeInfo.type, ArrayType):
childType = typeInfo.type.getElementTypeInfo()
if childType.type is existingTypeInfo.type:
typeInfo.type.__type__ = childType.copyWithType(typ)
else:
typesToCheck.append(childType)
for name, constant in self.constants.items():
if constant.type.type is existingTypeInfo.type:
self._constants[name] = dataclasses.replace(
constant, type=constant.type.copyWithType(typ), value=loadTypedValue(constant.value, typ))
elif issubclass(constant.type.type, (ArrayType, StructType)):
# These types can contain the changed type deeply nested within them,
# replace them, so they'll use the new type.
self._constants[name] = dataclasses.replace(
constant, value=loadTypedValue(constant.value, constant.type.type))
for telemetry in self.telemetryTypes:
for i, datapoint in enumerate(telemetry.data):
if datapoint.type.type is existingTypeInfo.type:
telemetry.data[i] = dataclasses.replace(datapoint, type=datapoint.type.copyWithType(typ))
for telecommandIndex, telecommand in enumerate(self.telecommandTypes):
for i, datapoint in enumerate(telecommand.data):
if datapoint.type is not None and datapoint.type.type is existingTypeInfo.type:
telecommand.data[i] = dataclasses.replace(
datapoint, type=datapoint.type.copyWithType(typ),
default=None if datapoint.default is None else loadTypedValue(datapoint.default, typ))
if telecommand.response is not None and telecommand.response.typeInfo is not None and \
telecommand.response.typeInfo.type is existingTypeInfo.type:
self.telecommandTypes[telecommandIndex] = dataclasses.replace(
telecommand, response=dataclasses.replace(
telecommand.response, typeInfo=telecommand.response.typeInfo.copyWithType(typ)))
for listener in self._changeListeners:
listener()
[docs] def registerChangeListener(self, listener: Callable[[], None]):
"""
Register a listener that will be called every time when the communication database has been changed.
:param listener: The callable that will be called when a change occurs.
"""
self._changeListeners.append(listener)
def _loadTypes(self, typesFilePath):
"""
Load the shared datatype information.
:param typesFilePath: The path to the shared data types file.
:return: A list of types that must be initialized after the telecommands are loaded.
"""
unInitializedTypes = {}
try:
with open(typesFilePath, encoding='utf-8') as typesFile:
typeDescriptions = json.load(
typesFile, object_pairs_hook=lambda *args: OrderedDict(*args))
for name, description in typeDescriptions.items():
try:
self._typeMapping[name] = self._loadType(name, description)
except UnknownConstantError as error:
if error.constant == 'MAX_TELECOMMAND_DATA_SIZE':
unInitializedTypes[name] = description
else:
raise
except UnknownTypeError as error:
if error.type in unInitializedTypes:
unInitializedTypes[name] = description
else:
raise
except IOError as error:
raise CommunicationDatabaseError(f'Error parsing {typesFilePath}: {error}')
return unInitializedTypes
def _loadConstants(self, sharedConstantsFilePath):
"""
Load the shared constants.
:param sharedConstantsFilePath: The path to the shared constants file.
"""
try:
with open(sharedConstantsFilePath, newline='', encoding='utf-8') as telecommandsFile:
try:
for i, row in enumerate(csv.reader(telecommandsFile)):
if i == 0:
if len(row) != 4:
raise CommunicationDatabaseError(
f'Error parsing {sharedConstantsFilePath}: '
f'Invalid number of csv headers')
if row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {sharedConstantsFilePath}: Invalid content')
continue
name, value, typ, description = row
try:
typeInfo = self.getTypeInfo(typ)
value = self._parseValue(value, typeInfo.type)
except KeyError:
raise CommunicationDatabaseError(
f'Error parsing {sharedConstantsFilePath}: '
f'Invalid type "{typ}" for constant {name}')
except ValueError:
raise CommunicationDatabaseError(
f'Error parsing {sharedConstantsFilePath}: '
f'Invalid value "{value}" for type "{typ}" for constant {name}')
self._constants[name] = Constant(name=name, value=value, type=typeInfo, description=description)
except csv.Error as error:
raise CommunicationDatabaseError(
f'Error parsing {sharedConstantsFilePath}: {error}')
except FileNotFoundError:
return
except IOError as error:
raise CommunicationDatabaseError(f'Error parsing {sharedConstantsFilePath}: {error}')
def _loadConfigurations(self, configurationsFilePath):
"""
Load the secondary device configuration items.
:param configurationsFilePath: The path to the configurations file.
"""
configurationArguments = []
maxSize = 0
try:
with open(configurationsFilePath, newline='', encoding='utf-8') as configurationsFile:
try:
for i, row in enumerate(csv.reader(configurationsFile)):
if i == 0:
if len(row) != 4:
raise CommunicationDatabaseError(
f'Error parsing {configurationsFilePath}: '
f'Invalid number of csv headers')
if row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {configurationsFilePath}: Invalid content')
continue
name, typ, defaultValue, description = row
try:
typeInfo = self.parseKnownTypeInfo(typ)
except KeyError:
raise CommunicationDatabaseError(
f'Error parsing {configurationsFilePath}: '
f'Invalid type "{typ}" for configuration {name}')
try:
defaultValue = self._parseValue(defaultValue, typeInfo.type)
except ValueError:
raise CommunicationDatabaseError(
f'Error parsing {configurationsFilePath}: Invalid default value '
f'"{defaultValue}" for type "{typ}" for configuration {name}')
configurationArguments.append((
name, typeInfo, defaultValue,
description.strip() if description else None))
maxSize = max(maxSize, typeInfo.getSize(self))
except csv.Error as error:
raise CommunicationDatabaseError(
f'Error parsing {configurationsFilePath}: {error}')
except FileNotFoundError:
return
except IOError as error:
raise CommunicationDatabaseError(f'Error parsing {configurationsFilePath}: {error}')
numConfigurations = len(configurationArguments)
try:
baseType = self._getBaseTypeForNumber(numConfigurations)
except ValueError:
raise CommunicationDatabaseError('Too many configuration items')
self._constants['NUM_CONFIGURATIONS'] = Constant(
name='NUM_CONFIGURATIONS',
value=numConfigurations,
type=self.getTypeInfo(baseType),
description='The number of registered configuration items.',
)
self._constants['MAX_CONFIG_VALUE_SIZE'] = Constant(
name='MAX_CONFIG_VALUE_SIZE',
value=maxSize,
type=self.getTypeInfo(self._getBaseTypeForNumber(maxSize)),
description='The maximum size of any configuration item in bytes.',
)
configIdEnum = self._loadType('ConfigurationId', {
'__doc__': 'Ids of all possible configuration items.',
'__type__': baseType,
'__values__': {name: {'__doc__': description} if description else {}
for name, _, _, description in configurationArguments}
})
self._typeMapping['ConfigurationId'] = configIdEnum
for i, configArguments in enumerate(configurationArguments):
name, *args = configArguments
self._configurations.append(Configuration(configIdEnum.type[name], name, *args))
configStructItems = OrderedDict()
for configuration in self._configurations:
configItem = {'__type__': configuration.type.name}
if configuration.description:
configItem['__doc__'] = configuration.description
configStructItems[configuration.name] = configItem
configStructItems['__doc__'] = 'The configuration of the secondary device.'
configurationType = self._loadType('Configuration', configStructItems)
self._typeMapping['Configuration'] = configurationType
self._constants['DEFAULT_CONFIGURATION'] = Constant(
name='DEFAULT_CONFIGURATION',
value=configurationType.type({configuration.name: configuration.defaultValue
for configuration in self._configurations}),
type=configurationType,
description='The values of the default configuration.',
)
def _loadUnits(self, unitsFilePath):
"""
Load the unit types.
:param unitsFilePath: The path to the units file.
"""
try:
with open(unitsFilePath, newline='', encoding='utf-8') as unitsFile:
try:
for i, row in enumerate(csv.reader(unitsFile)):
if i == 0:
if len(row) != 3:
raise CommunicationDatabaseError(
f'Error parsing {unitsFilePath}: '
f'Invalid number of csv headers')
if row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {unitsFilePath}: Invalid content')
continue
name, typ, description = row
try:
typeInfo = self.parseKnownTypeInfo(typ)
except KeyError:
raise CommunicationDatabaseError(
f'Error parsing {unitsFilePath}: '
f'Invalid type "{typ}" for constant {name}')
self._units.setdefault(name, []).append(
Unit.fromTypeInfo(name, typeInfo, description.strip() or None))
except csv.Error as error:
raise CommunicationDatabaseError(
f'Error parsing {unitsFilePath}: {error}')
except FileNotFoundError:
return
except IOError as error:
raise CommunicationDatabaseError(f'Error parsing {unitsFilePath}: {error}')
def _getOrCreateUnitVariant(self, unitName, typeName):
"""
Retrieve or create a variant of a unit.
:param unitName: The name of the unit.
:param typeName: The variant base type.
:return: The variant unit.
"""
try:
unitVariants = self._units[unitName]
except KeyError:
raise UnknownTypeError(unitName) from None
for variant in unitVariants:
if variant.baseTypeName == typeName:
return variant
else:
try:
variantType = self.parseKnownTypeInfo(typeName)
except KeyError:
raise UnknownTypeError(typeName) from None
variant = Unit.fromTypeInfo(unitName, variantType, unitVariants[0].description)
unitVariants.append(variant)
return variant
def _loadTelecommands(self, telecommandsFilePath):
"""
Load the telecommands.
:param telecommandsFilePath: The path to the file containing
information about the telecommands.
"""
telecommandsData = []
try:
with open(telecommandsFilePath, newline='', encoding='utf-8') as telecommandsFile:
for i, row in enumerate(csv.reader(telecommandsFile)):
if i == 0:
if len(row) != 6 or row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {telecommandsFilePath}: '
f'Invalid content in "commands" sheet')
continue
isDebug = False
if row[1].strip().lower() == 'true':
isDebug = True
elif row[1].strip().lower() != 'false':
raise CommunicationDatabaseError(
f'Error parsing {telecommandsFilePath}: Debug indication in line '
f'{i} is neither "true" nor "false".')
description = row[2] or None
name = row[0]
telecommandsData.append((name, description, isDebug))
except FileNotFoundError:
return
except (IOError, csv.Error) as error:
raise CommunicationDatabaseError(f'Error parsing {telecommandsFilePath}: {error}')
telecommands = [[name, i] for i, (name, _, _) in enumerate(telecommandsData)]
telecommandDoc = 'Ids of all possible telecommands.'
telecommands.append(['__doc__', telecommandDoc])
try:
telecommandTypeEnum = EnumType('TelecommandType', telecommands, start=0)
except TypeError as error:
raise CommunicationDatabaseError(f'Error parsing {telecommandsFilePath}: {error}')
for enumValue, (name, description, isDebug) in zip(telecommandTypeEnum, telecommandsData):
enumValue.__doc__ = description
self._telecommandTypes.append(TelecommandType(
id=enumValue,
data=[],
response=None,
description=description,
isDebug=isDebug,
))
try:
baseType = self._getBaseTypeForNumber(len(self._telecommandTypes))
except ValueError:
raise CommunicationDatabaseError('Too many telecommands')
self._typeMapping['TelecommandType'] = TypeInfo(
telecommandTypeEnum, 'TelecommandType', baseType, telecommandDoc)
@staticmethod
def _getBaseTypeForNumber(number):
"""
Get a base type name suitable to represent a given number of integer values.
:param number: The number of values that the type needs to be able to represent.
:return: The name of the suitable type.
"""
if number <= 255:
return TypeInfo.BaseType.UINT8
if number <= 65535:
return TypeInfo.BaseType.UINT16
if number <= 4294967295:
return TypeInfo.BaseType.UINT32
raise ValueError('Number is too large')
def _loadTelecommandArguments(self, telecommandArgumentsFolder):
"""
Load the telecommand arguments. The telecommands must have been loaded before.
:param telecommandArgumentsFolder: The path to the folder containing
the datafiles for the telecommand arguments.
"""
if not self.telecommandTypes:
return
maxSize = 0
for telecommand in self._telecommandTypes:
argumentSize = 0
argumentFilePath = os.path.join(telecommandArgumentsFolder, telecommand.id.name + '.csv')
try:
with open(argumentFilePath, newline='', encoding='utf-8') as argumentFile:
for i, row in enumerate(csv.reader(argumentFile)):
if i == 0:
if len(row) != 4 or row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {argumentFilePath}: '
f'Invalid content in "{telecommand.id.name}" sheet')
continue
name = row[0]
documentation = row[3] or None
typeName = row[1]
try:
typeInfo = self.parseKnownTypeInfo(typeName, documentation=documentation)
except UnknownTypeError:
try:
datapoint, dependencyInfo = self._parseDatapointDependencyInfo(
typeName, telecommand.data)
except UnknownDatapointError as error:
raise CommunicationDatabaseError(
f'Failed to load dependant datapoint "{name}": {error}')
if datapoint is None:
raise
if dependencyInfo:
raise CommunicationDatabaseError(
f'Failed to load dependant datapoint "{name}": '
f'Dynamic dependent arguments are not implemented yet')
elif datapoint.type == self._typeMapping['ConfigurationId']:
if row[2]:
raise CommunicationDatabaseError('It is not possible to have a default value for a '
'ConfigurationId telecommand datapoint')
argumentSize += self.constants['MAX_CONFIG_VALUE_SIZE'].value
# It is ok to pass None as the type info, because the type will be resolved when
# the ConfigurationValueArgument is configured (configureWith is called).
# noinspection PyTypeChecker
typeInfo = None # type: TypeInfo
telecommand.data.append(ConfigurationValueDatapoint(
_database=self, name=name,
type=typeInfo,
default=None,
description=documentation,
provider=datapoint,
))
else:
raise CommunicationDatabaseError(
f'Failed to load dependant datapoint "{name}": No dependency info given')
else:
defaultValue = None
if row[2]:
defaultValue = self._parseValue(row[2], typeInfo.type)
try:
argumentSize += typeInfo.getSize(self)
except DynamicSizeError:
pass # The size of dynamically sized arguments is ignored when calculating the maximum.
telecommand.data.append(TelecommandDatapointType(
name=name,
type=typeInfo,
default=defaultValue,
description=documentation,
))
maxSize = max(maxSize, argumentSize)
except FileNotFoundError:
pass
except (IOError, csv.Error, UnknownTypeError) as error:
raise CommunicationDatabaseError(f'Error parsing {argumentFilePath}: {error}')
self._constants['MAX_TELECOMMAND_DATA_SIZE'] = Constant(
name='MAX_TELECOMMAND_DATA_SIZE',
value=maxSize,
type=self.getTypeInfo(self._getBaseTypeForNumber(maxSize)),
description='The maximum size in bytes of a telecommand data payload.',
)
def _loadTelecommandResponses(self, telecommandsFilePath):
"""
Load the telecommand responses. The telecommands must have been loaded before.
:param telecommandsFilePath: The path to the file containing
information about the telecommands.
"""
if not self.telecommandTypes:
return
maxSize = 0
try:
with open(telecommandsFilePath, newline='', encoding='utf-8') as telecommandsFile:
for i, row in enumerate(csv.reader(telecommandsFile)):
if i == 0:
continue
name, typ, description = row[3:]
if not name:
continue
if not description:
description = None
telecommand = self._telecommandTypes[i - 1]
try:
typeInfo = self.parseKnownTypeInfo(typ, documentation=description)
maxSize = max(maxSize, typeInfo.getSize(self))
response = TelecommandResponseType(
name=name,
typeInfo=typeInfo,
description=description,
)
except UnknownTypeError:
try:
datapoint, dependencyInfo = self._parseDatapointDependencyInfo(typ, telecommand.data)
except UnknownDatapointError as error:
raise CommunicationDatabaseError(
f'Failed to load dependant response "{name}": {error}')
if datapoint is None:
raise
if dependencyInfo:
raise CommunicationDatabaseError(
f'Failed to load dependant response "{name}": '
f'Dynamic dependent responses are not implemented yet')
elif datapoint.type == self._typeMapping['ConfigurationId']:
maxSize = max(maxSize, max(
config.type.getSize(self) for config in self._configurations))
response = ConfigurationValueResponseType(
self, name, None, description, datapoint)
else:
raise CommunicationDatabaseError(
f'Failed to load dependant response "{name}": '
f'No dependency info given')
self._telecommandTypes[i - 1] = dataclasses.replace(telecommand, response=response)
except (IOError, csv.Error, UnknownTypeError) as error:
raise CommunicationDatabaseError(f'Error parsing {telecommandsFilePath}: {error}')
self._constants['MAX_TELECOMMAND_RESPONSE_SIZE'] = Constant(
name='MAX_TELECOMMAND_RESPONSE_SIZE',
value=maxSize,
type=self.getTypeInfo(self._getBaseTypeForNumber(maxSize)),
description='The maximum size in bytes of a telecommand response payload.',
)
def _loadTelemetry(self, telemetriesFilePath):
"""
Load the telemetry types.
:param telemetriesFilePath: The path to the file containing information about the telemetry.
"""
try:
with open(telemetriesFilePath, newline='', encoding='utf-8') as telecommandsFile:
telemetryTypes = {}
for i, row in enumerate(csv.reader(telecommandsFile)):
if i == 0:
if len(row) != 2 or row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {telemetriesFilePath}: Invalid telemetry header')
continue
telemetryTypes[row[0]] = row[1]
self._typeMapping['TelemetryType'] = self._loadType('TelemetryType', {
'__doc__': 'All possible types of telemetry messages.',
'__type__': TypeInfo.BaseType.UINT8,
'__values__': OrderedDict([(name, {'__doc__': documentation})
for name, documentation in telemetryTypes.items()])
})
except FileNotFoundError:
return
except (IOError, csv.Error) as error:
raise CommunicationDatabaseError(f'Error parsing {telemetriesFilePath}: {error}')
def _loadTelemetryArguments(self, telemetryArgumentsFolder):
"""
Load the arguments for the telemetry types. The types must have been loaded before.
:param telemetryArgumentsFolder: The path to the folder containing the files
with the telemetry arguments information.
"""
maxDataSize = None
for telemetryType in self._typeMapping['TelemetryType'].type:
dataTypeInfos = []
argumentFilePath = os.path.join(telemetryArgumentsFolder, telemetryType.name + '.csv')
try:
with open(argumentFilePath, newline='', encoding='utf-8') as argumentFile:
for i, row in enumerate(csv.reader(argumentFile)):
if len(row) != 3:
raise CommunicationDatabaseError(
f'Error parsing {argumentFilePath}: Unexpected number '
f'of rows in line {i}: Expected 3, got {len(row)}')
elif i == 0:
if row[0] != 'Name':
raise CommunicationDatabaseError(
f'Error parsing {argumentFilePath}: '
f'Invalid telemetry parameter header')
continue
name, typ, documentation = row
if not documentation:
documentation = None
dataTypeInfos.append(TelemetryDatapointType(
name=name,
type=self.parseKnownTypeInfo(typ, documentation=documentation),
description=documentation,
))
self._telemetryTypes.append(TelemetryType(
id=telemetryType,
data=dataTypeInfos,
))
if maxDataSize is not None and maxDataSize < 0:
continue
try:
dataSize = sum(dataType.type.getSize(self) for dataType in dataTypeInfos)
if maxDataSize is None:
maxDataSize = dataSize
else:
maxDataSize = max(maxDataSize, dataSize)
except DynamicSizeError:
maxDataSize = -1
except FileNotFoundError:
pass
except (IOError, csv.Error, UnknownTypeError) as error:
raise CommunicationDatabaseError(f'Error parsing {argumentFilePath}: {error}')
if maxDataSize is not None and maxDataSize > 0:
self._constants['MAX_TELEMETRY_DATA_SIZE'] = Constant(
name='MAX_TELEMETRY_DATA_SIZE',
value=maxDataSize,
type=self.getTypeInfo(self._getBaseTypeForNumber(maxDataSize)),
description='The maximum size in bytes of any telemetry data payload.',
)
@staticmethod
def _parseDatapointDependencyInfo(dependencyInfo: str, datapoints: List[MessageDatapointType]) \
-> Union[Tuple[None, None], Tuple[MessageDatapointType, str]]:
"""
Parse the dependency info for a value that depends on the value of another datapoint.
:param dependencyInfo: The dependency info description.
:param datapoints: The datapoints that can be depended on.
:return: The datapoint of the dependency and a parsed description,
or None in both places if the description is not valid.
"""
if '?' not in dependencyInfo:
return None, None
providerName, description = dependencyInfo.split('?', 1)
for datapoint in datapoints:
if datapoint.name == providerName:
break
else:
raise UnknownDatapointError(providerName)
return datapoint, description
def _loadType(self, name: str, description: Union[str, Dict[str, Any]]):
"""
Load a datatype from the given description.
:param name: The name of the data type.
:param description: The description of the data type.
:return: The type info for the loaded data type.
"""
if isinstance(description, str):
baseType = description
attributes = {}
children = {}
documentation = None
else:
attributes = {key: value for key, value in description.items() if
key.startswith('__') and key.endswith('__')}
children = {key: value for key, value in description.items() if key not in attributes}
baseType = attributes.pop('__type__', None)
documentation = attributes.pop('__doc__', None)
values = attributes.pop('__values__', None)
if values is not None:
if children:
raise CommunicationDatabaseError(
f'Type "{name}" has "__values__" attribute but also contains children')
enumValueDocs = {}
if isinstance(values, list):
enumValues = values
elif isinstance(values, dict):
enumValues = []
newValue = 0
for enumValueName, enumValueProperties in values.items():
value = enumValueProperties.get('__value__', newValue)
newValue = value + 1
enumValues.append([enumValueName, value])
enumValueDoc = enumValueProperties.get('__doc__')
if enumValueDoc:
enumValueDocs[enumValueName] = enumValueDoc
else:
raise CommunicationDatabaseError(
f'Type "{name}" has "__values__" attribute which is not a list or dict')
if documentation:
enumValues.append(['__doc__', documentation])
typ = EnumType(name, enumValues, start=0)
for enumValue in typ:
enumValue.__doc__ = enumValueDocs.get(enumValue.name)
if baseType is None:
baseType = self._getBaseTypeForNumber(max(typ, key=lambda item: item.value).value)
elif children:
if baseType is not None:
raise CommunicationDatabaseError(
f'Type "{name}" has "__type__" attribute and at least one child attribute')
childrenTypes = OrderedDict()
for childName, childDescription in children.items():
typeName = childDescription.get('__type__', childName) \
if isinstance(childDescription, dict) else childDescription
childrenTypes[childName] = self._loadType(typeName, childDescription)
typ = StructType(name, childrenTypes, documentation)
else:
typ = self.parseKnownTypeInfo(baseType, name, documentation).type
defaultValue = attributes.pop('__value__', None)
if defaultValue is not None:
try:
defaultValue = DefaultValueInfo(
self._parseValue(defaultValue, typ),
defaultValue if defaultValue in self._constants else None)
except ValueError as error:
raise CommunicationDatabaseError(
f'Type "{name}" has "__value__" attribute "{defaultValue}" '
f'which is not valid for type "{typ.__name__}": {error}')
if attributes:
raise CommunicationDatabaseError(
f'Type "{name}" has unknown attribute(s): ' + ', '.join(attributes))
return TypeInfo(typ, name, baseType, documentation, defaultValue)
def _parseValue(self, value, typ) -> Any:
"""
Parse a value based on the data type.
:param value: The value to parse.
:param typ: The data type of the value.
:return: The parsed value.
"""
try:
return self._constants[value].value
except KeyError:
try:
return loadTypedValue(value, typ)
except (ValueError, TypeError):
raise UnknownConstantError(value)
[docs]def main():
"""
Verifies the communication database.
:return: Whether the verification was successful.
"""
parser = ArgumentParser(description='Verify the shared communication database.')
parser.add_argument('-d', '--dataDir', required=True,
help='path to directory where the data files are stored')
parser.add_argument('-l', '--listPackages', action='store_true', default=False,
help='list all packages defined in the database')
arguments = parser.parse_args()
try:
database = CommunicationDatabase(arguments.dataDir)
except CommunicationDatabaseError as error:
print(f'[Error] Failed to read database: {error}')
return 1
print('Database OK')
if arguments.listPackages:
from struct import calcsize
def _getSize(messageType: MessageType, headerSize: int) -> str:
"""
Calculate the maximum size of a message type and format it as a human-readable string.
:param messageType: The message type.
:param headerSize: The size of the header for the message type in bytes.
:return: The maximum size of the message formatted as a string.
"""
typeFormats = ['']
for data in messageType.data:
if isinstance(data, ConfigurationValueDatapoint):
maxSize = 0
dataType = None
for config in database.configurations:
configDataType = data.configureWith(config.id).type
configDataTypeSize = configDataType.getSize(database)
if configDataTypeSize > maxSize:
maxSize = configDataTypeSize
dataType = configDataType
if dataType is None:
continue
else:
dataType = data.type
formats = dataType.getFormats(database)
typeFormats[-1] += formats[0]
typeFormats += formats[1:]
typeFormats = [formatString[formatString.index('}') + 1:]
if formatString.startswith('{') else formatString
for formatString in typeFormats]
sizes = [calcsize('<' + formatString) for formatString in typeFormats]
size = f'{sizes[0] + headerSize}B'
for i, sizePart in enumerate(sizes[1:]):
size += f' + {chr(ord("a") + i)} * {sizePart}B'
return size
if database.telemetryTypes:
print(f'Telemetry ({len(database.telemetryTypes)}):')
telemetryHeaderSize = calcsize('<' + database.getTypeInfo('TelemetryMessageHeader').getFormats(database)[0])
for telemetry in database.telemetryTypes:
print(f'\t{telemetry.id.name}: {_getSize(telemetry, telemetryHeaderSize)}')
if database.telecommandTypes:
telecommandSize = calcsize('<' + database.getTypeInfo('TelecommandMessageHeader').getFormats(database)[0])
print(f'Telecommands ({len(database.telecommandTypes)}):')
for telecommand in database.telecommandTypes:
print(f'\t{telecommand.id.name}: {_getSize(telecommand, telecommandSize)}')
print(f'MAX_TELECOMMAND_DATA_SIZE: {database.constants["MAX_TELECOMMAND_DATA_SIZE"].value}B')
maxConfigSize = database.constants.get('MAX_CONFIG_VALUE_SIZE')
if maxConfigSize is not None:
print(f'MAX_CONFIG_VALUE_SIZE: {maxConfigSize.value}B')
return 0
if __name__ == '__main__':
sys.exit(main())