Source code for ecom.datatypes

import json
import struct

from enum import Enum, EnumMeta
from types import GeneratorType
from typing import Dict, Type, Union, Optional, TypeVar, Tuple, Iterator, Generic, Iterable, \
    TYPE_CHECKING, ClassVar, Any, Callable, List
from dataclasses import dataclass, replace as replace_dataclass, fields, field, Field, MISSING

try:
    from enum import StrEnum
except ImportError:
[docs] class StrEnum(str, Enum): pass
try: from typing import dataclass_transform except ImportError:
[docs] def dataclass_transform(): return lambda value: value
if TYPE_CHECKING: from database import CommunicationDatabase
[docs]class CommunicationDatabaseAccessor: """ A generic base for classes that want to access the communication database. """ def __init__(self, database: 'CommunicationDatabase'): """ :param database: The communication database. """ super().__init__() self._database = database
[docs]def loadTypedValue(value: Any, typ: Type) -> Any: """ Load a value with the given type. :param value: A value. :param typ: The type of the parsed value. :return: The value parsed with the given type. """ if isinstance(value, typ): return value if issubclass(typ, bool): if value == 'true': return True elif value == 'false': return False else: raise TypeError(f'Invalid value for boolean type: {value!r}') if issubclass(typ, bytes): return typ(value, encoding='utf-8') if issubclass(typ, StructType): if not isinstance(value, dict): value = json.loads(value) if not isinstance(value, dict): raise TypeError(f'Invalid value for StructType: {value!r}') return typ({name: loadTypedValue(value[name], childType.type) for name, childType in typ}) if issubclass(typ, ArrayType): childType = typ.getElementTypeInfo().type if not isinstance(value, list): if issubclass(childType, bytes): parsedValue = value if isinstance(value, childType) else childType(value, encoding='utf-8') try: if len(parsedValue) > len(typ): raise ValueError(f'Value for bytes list is too large: {len(parsedValue)} (max is {len(typ)})') except DynamicSizeError: pass return parsedValue else: value = json.loads(value) if not isinstance(value, list): raise TypeError(f'Invalid value for ArrayType: {value!r}') return typ([loadTypedValue(child, childType) for child in value]) try: return typ(value) except ValueError: if issubclass(typ, Enum): try: if typ == value.__class__: return typ[value.name] return typ[value] except KeyError: pass raise TypeError(f'Invalid value for type {typ}: {value!r}')
[docs]class StructTypeMeta(type): """ Metaclass for the StructType. """ def __new__(mcs, cls, bases, classDict): children = classDict.pop('__children__', {}) cls = super().__new__(mcs, cls, bases, classDict) cls.__children__ = children return cls
[docs] def __iter__(cls) -> Iterator[Tuple[str, 'TypeInfo']]: """ Iterate over all children of the struct. :return: A list of child names and child type infos. """ return ((name, child) for name, child in cls.__children__.items())
[docs] def __getitem__(self, item): """ Get the type info for a child of the struct. :param item: The name of the child. :return: The type info for that child. :raises AttributeError: If the struct does not have a child with that name. """ try: return self.__children__[item] except KeyError: raise AttributeError(item) from None
[docs] def __contains__(self, childName: str) -> bool: """ Whether this struct contains a child element with the given name. :param childName: The name of a child element. :return: Whether the struct contains a child element with that name. """ return childName in self.__children__
[docs] def __call__(cls, value: Union[str, Dict, Iterable[Iterable], None] = None, childrenTypes: Optional[Dict[str, 'TypeInfo']] = None, documentation: Optional[str] = None, **kwargs): """ Create a new struct type. :param value: The name of the struct typ. :param childrenTypes: A mapping of children names and their types for the struct type. :param documentation: Documentation for the new type. :return: The new struct type. """ if cls is not StructType: if value is None: return super().__call__(**kwargs) wrongChildren = set(dict(value)) ^ set(cls.__children__) if wrongChildren: raise ValueError('Invalid or missing children: ' + ', '.join(wrongChildren)) return super().__call__(value, **kwargs) if childrenTypes is None: raise TypeError( f'Missing argument "childrenTypes" for StructType creation of "{value}"') classDict = {'__children__': childrenTypes} if documentation: classDict['__doc__'] = documentation metaClass = cls.__class__ return metaClass.__new__(metaClass, value, (cls,), classDict)
[docs] def __eq__(self, o: object) -> bool: if not isinstance(o, type) or not issubclass(o, StructType): return False myChildrenKeys = sorted(name for name, value in self) theirChildrenKeys = sorted(name for name, value in o) if myChildrenKeys != theirChildrenKeys: return False return all(self.__children__[myChildName] == o.__children__[theirChildName] for myChildName, theirChildName in zip(myChildrenKeys, theirChildrenKeys))
[docs] def __hash__(self) -> int: return hash(tuple(sorted(list(self))))
[docs] def offsetOf(cls, database: 'CommunicationDatabase', name: str) -> int: """ Calculate the offset of the child element with the given name. :param database: A communication database. :param name: The name of the child element whose offset should be calculated. :return: The offset of the child element in bytes. """ if name not in cls: raise ValueError(f'No child with the given name {name!r} in {cls}') offset = 0 for childName, child in cls: if name == childName: break offset += child.getSize(database) return offset
[docs]class StructType(dict, metaclass=StructTypeMeta): """ Represents a C struct. Can be iterated over to get the struct members. """ pass
[docs]T = TypeVar('T', bound=StructType)
[docs]def structField(typ: Union[str, 'TypeInfo', Callable[['CommunicationDatabase'], 'TypeInfo'], None] = None, name: Optional[str] = None, **kwargs) -> Field: """ Declare a new struct dataclass field. This is a wrapper around the :py:meth:`dataclasses.field` function. :see: :py:meth:`dataclasses.field` :see: :py:meth:`ecom.datatypes.structDataclass` :param typ: The type of this field or a callable that given a communication database resolves the type of this field. This is required unless the field replaces a field of another structure, in which case the type is inferred from the other field. :param name: The name of this field. If omitted, the name of dataclass field is used. :param kwargs: Additional arguments to the :py:meth:`dataclasses.field` function. :return: A struct dataclass field. """ return field(metadata={'structName': name, 'structType': typ}, **kwargs)
[docs]def isStructField(childField: Field) -> bool: """ Check whether the given field is a struct field created with :py:meth:`ecom.datatypes.structField`. :param childField: The field to check. :return: Whether the field is a struct field. """ return 'structName' in childField.metadata
[docs]def getStructFieldName(childField: field) -> str: """ Get the name of the element in the struct of a struct field. This can be different from the dataclass field name. :raises KeyError: if the field is not a struct field. :param childField: The struct field to get the name of. :return: The field name of the struct field. """ structName = childField.metadata['structName'] if structName is None: structName = childField.name return structName
[docs]def getStructFieldType(childField: field, database: 'CommunicationDatabase') -> Optional['TypeInfo']: """ Get the type of the struct field or None, if the struct field does not define its type. :raises KeyError: if the field is not a struct field. :param childField: The struct field to get the type of. :param database: A communication database to lookup type info. :return: The type information about the struct field type or None. """ structType = childField.metadata['structType'] if structType is None: return None if isinstance(structType, TypeInfo): return structType if callable(structType): return structType(database) return database.getTypeInfo(structType)
[docs]def structDataclass(database: 'CommunicationDatabase', replaceType: Union[str, bool] = False): """ A decorator for a class that inherits from StructType that allows it to act as a dataclass. Members can be defined as structFields, to declare their type. The resulting class is a dataclass. .. highlight:: python The field values can be given as a dict, just like a regular StructType:: floatTypeInfo = TypeInfo.lookupBaseType(TypeInfo.BaseType.FLOAT) @structDataclass(database) class Quaternion(StructType): x: float = structField(typ=floatTypeInfo) y: float = structField(typ=floatTypeInfo) z: float = structField(typ=floatTypeInfo) w: float = structField(typ=floatTypeInfo) value = Quaternion({'x': 1, 'y': 2, 'z': 3, 'w': 4}) print(value.x) # -> 1 The field values can also be given as keyword arguments:: Quaternion(x=1, y=2, z=3, w=4) Alternatively, the field values can be given as a mixture of both methods:: @structDataclass(database) class Quaternion(StructType): x: float = structField(typ=floatTypeInfo) y: float = structField(typ=floatTypeInfo) z: float = structField(typ=floatTypeInfo) w: float = structField(typ=floatTypeInfo) other = 5.0 Quaternion({'x': 1, 'y': 2, 'z': 3, 'w': 4}, other=6.0) A structDataclass can also replace an existing struct type in the database. If `replaceType` is set to ``True``, the class will replace a struct with the same name as the class. If `replaceType` is a string, the class will replace a struct by that name. In both cases, the struct have the same fields as the replacing struct. The fields don't need to be explicitly typed, they will inherit their type from their respective field in the replacing struct:: @structDataclass(database, replace=True) class Quaternion(StructType): x: float y: float z: float w: float :param database: A communication database. :param replaceType: If ``True``, replace a type in the database with the same class name. If a string, replace a type with that name. :return: The struct dataclass. """ @dataclass_transform() def wrapper(cls: Type[T]) -> Type[T]: if not issubclass(cls, StructType): raise TypeError('The class must inherit from StructType') # noinspection PyArgumentList dataCls = dataclass(cls, init=False) ownFields = {getStructFieldName(child) if isStructField(child) else child.name: child for child in fields(dataCls)} # TODO: Once we drop Python 3.7 support, change this to # def constructor(self, data=None, /, **kwargs): def constructor(self, data: Optional[Dict[str, Any]] = None, **kwargs): if data is None: data = {} elif not isinstance(data, dict): raise TypeError('The data must be given as a dict to a StructType') dataInput = data.copy() data = {} dataclassValues = {} for fieldName, childField in ownFields.items(): try: rawValue = dataInput.pop(fieldName) except KeyError: try: rawValue = kwargs.pop(fieldName) except KeyError: if childField.default is not MISSING: rawValue = childField.default elif childField.default_factory is not MISSING: rawValue = childField.default_factory() else: raise TypeError(f'Missing argument {fieldName} for {self.__class__.__name__}') fieldType = getStructFieldType(childField, database) if isStructField(childField) else None if fieldType is None: fieldType = childField.type else: fieldType = fieldType.type value = loadTypedValue(rawValue, fieldType) if fieldName in dataCls: data[fieldName] = value else: dataclassValues[fieldName] = value if dataInput: raise TypeError(f'Invalid arguments for {self.__class__.__name__}: {dataInput}') super(dataCls, self).__init__(data) for fieldName, value in dataclassValues.items(): setattr(self, fieldName, value) dataCls.__init__ = constructor nonlocal replaceType if replaceType: if not isinstance(replaceType, str): replaceType = cls.__name__ replaceTypeInfo = database.getTypeInfo(replaceType) if not issubclass(replaceTypeInfo.type, StructType): raise TypeError('A struct dataclass can only replace a StructType') for name, oldTypeInfo in replaceTypeInfo.type: ownField = ownFields.get(name) if ownField is None: raise TypeError(f'Missing child {name!r}') ownFieldType = ownField.type if isStructField(ownField): ownFieldTypeInfo = getStructFieldType(ownField, database) if ownFieldTypeInfo is not None: ownFieldType = ownFieldTypeInfo.type if ownFieldType != oldTypeInfo.type: raise TypeError(f'Child {name!r} has an incompatible type: {ownFieldType} != {oldTypeInfo.type}') dataCls.__children__[name] = oldTypeInfo.copyWithType(ownFieldType) database.replaceType(dataCls, name=replaceType) else: for child in fields(dataCls): if isStructField(child): dataCls.__children__[getStructFieldName(child)] = getStructFieldType(child, database) for child in fields(dataCls): childName = getStructFieldName(child) if isStructField(child) else child.name if childName not in dataCls: def getter(self, _child=child): return getattr(self, '__' + _child.name, _child.default) def setter(self, value, _child=child): setattr(self, '__' + _child.name, value) else: def getter(self, _name=childName): return self[_name] def setter(self, value, _name=childName): self[_name] = value setattr(dataCls, child.name, property(fget=getter, fset=setter)) return dataCls return wrapper
[docs]class DynamicSizeError(RuntimeError): """ An operation was requested that required a known size, but the size must be read dynamically from the parent struct. """ def __init__(self, sizeMember: str): """ Create a new error. :param sizeMember: The name of the member that must be read to get the size. """ super().__init__(f'ArrayType has dynamic size, size is stored in member "{sizeMember}"') self._sizeMember = sizeMember @property
[docs] def sizeMember(self) -> str: """ :return: The name of the member that must be read to get the size. """ return self._sizeMember
[docs]class ArrayTypeMeta(type): """ Metaclass for the ArrayType. """ def __new__(mcs, cls, bases, classDict): typ = classDict.pop('__type__', None) size = classDict.pop('__size__', 0) cls = super().__new__(mcs, cls, bases, classDict) cls.__type__ = typ cls.__size__ = size return cls
[docs] def __len__(cls) -> int: """ :return: The length of the array. :raises DynamicSizeError: If the length must be dynamically read from a member of the parent struct. """ if isinstance(cls.__size__, str): raise DynamicSizeError(cls.__size__) return cls.__size__
[docs] def __eq__(self, o: object) -> bool: if not isinstance(o, type) or not issubclass(o, ArrayType): return False return self.__type__ == o.__type__ and self.__size__ == o.__size__
[docs] def __hash__(self) -> int: return hash((self.__type__, self.__size__))
[docs] def getElementTypeInfo(cls) -> 'TypeInfo': """ :return: The type info for all elements. """ return cls.__type__
[docs] def __call__(cls, value: Union[str, Iterable], typ: Optional['TypeInfo'] = None, size: Optional[Union[int, str]] = None, documentation: Optional[str] = None): """ Create a new array type. :param value: The name of the array type. :param typ: The type info for the elements of the array. :param size: The size of the array or the name of the member in the parent struct from which the size can be read. :param documentation: Documentation of the new type. :return: The new array type. """ if cls is not ArrayType: if isinstance(value, GeneratorType): value = list(value) if isinstance(cls.__size__, int) and len(value) != cls.__size__: raise ValueError('Invalid size') for child in value: if not isinstance(child, cls.__type__.type): raise ValueError(f'Invalid child type: {type(child)}') return super().__call__(value) if typ is None: raise TypeError(f'Missing argument "typ" for ArrayType creation of "{value}"') if size is None: raise TypeError(f'Missing argument "size" for ArrayType creation of "{value}"') classDict = {'__type__': typ, '__size__': size} if documentation: classDict['__doc__'] = documentation metaClass = cls.__class__ return metaClass.__new__(metaClass, value, (cls,), classDict)
[docs]class ArrayType(list, metaclass=ArrayTypeMeta): """ Represents a C array. Provides the type and constant size of the array. """ pass
[docs]class EnumTypeMeta(EnumMeta): """ Metaclass for Enums that are compatible with C style enums and can be compared. """
[docs] def __eq__(self, o: object) -> bool: if not isinstance(o, type) or not issubclass(o, Enum): return False myChildren = sorted(list(self), key=lambda element: element.name) theirChildren = sorted(list(o), key=lambda element: element.name) if len(myChildren) != len(theirChildren): return False return all(myChild.value == theirChild.value and myChild.name == theirChild.name for myChild, theirChild in zip(myChildren, theirChildren))
[docs] def __hash__(self) -> int: return super().__hash__()
[docs]class EnumType(int, Enum, metaclass=EnumTypeMeta): """ Represents a C style enum. """
[docs] def __eq__(self, o: object) -> bool: if o.__class__ is self.__class__: return o is self if isinstance(o, EnumType): return o.__class__ == self.__class__ and o.name == self.name and o.value == self.value return super().__eq__(o)
[docs] def __hash__(self) -> Any: return super().__hash__()
[docs]V = TypeVar('V', bound=Union[Enum, StructType, ArrayType, bytes, float, int, bool])
@dataclass(frozen=True)
[docs]class DefaultValueInfo(Generic[V]): """ A default value for a type. """
[docs] value: V
""" The default value. """
[docs] constantName: Optional[str] = None
""" The name of the default value as a shared constant. """
@dataclass(frozen=True)
[docs]class TypeInfo(Generic[V]): """ Describes a data type. """
[docs] type: Type[V]
""" The Python type representing the type. """
[docs] name: str
""" The name of the type. """
[docs] baseTypeName: Optional[str]
""" The name of the base type. """
[docs] description: Optional[str] = None
""" A description of the type. """
[docs] default: Optional[DefaultValueInfo[V]] = None
""" The default value of the type. """
[docs] class BaseType(StrEnum): """ A base type that can be used in the communication database. """
[docs] INT8 = 'int8'
""" This is the mapping of the C type for small numbers. """
[docs] UINT8 = 'uint8'
""" This is the mapping of the C type for small unsigned numbers. """
[docs] BOOL = 'bool'
""" This is a type, that has only two values: `True` and `False`. """
[docs] INT16 = 'int16'
""" This is the mapping of the C type for small numbers. """
[docs] UINT16 = 'uint16'
""" This is the mapping of the C type for small unsigned numbers. """
[docs] INT32 = 'int32'
""" This is the mapping of the C type for numbers. """
[docs] UINT32 = 'uint32'
""" This is the mapping of the C type for unsigned numbers. """
[docs] INT64 = 'int64'
""" This is the mapping of the C type for large numbers. """
[docs] UINT64 = 'uint64'
""" This is the mapping of the C type for large unsigned numbers. """
[docs] FLOAT = 'float'
""" A type for small floating point values. **WARNING**: This type does not have a standard size in C. If the size is not `4` bytes for a platform, the generated code will not compile. """
[docs] DOUBLE = 'double'
""" A type for larger floating point values with higher precision. **WARNING**: This type does not have a standard size in C. If the size is not `8` bytes for a platform, the generated code will not compile. """
[docs] CHAR = 'char'
""" This type represents a character. It is usually used as an array to represent a string. """
[docs] BYTES = 'bytes'
""" This type is similar to the `char` type, but it is used to represent a byte. """
# A mapping of the known base type names to their Python type equivalent, their struct # format character (See https://docs.python.org/3/library/struct.html#format-characters) # and their minimum and maximum numeric value. # When updating please also update the table in examples/database/documentation/baseTypes.md _BASE_TYPE_MAPPING: ClassVar[Dict[str, Tuple[Type, str, Union[int, float, None], Union[int, float, None]]]] = { BaseType.INT8: (int, 'b', -128, 127), BaseType.UINT8: (int, 'B', 0, 255), BaseType.BOOL: (bool, '?', None, None), BaseType.INT16: (int, 'h', -32768, 32767), BaseType.UINT16: (int, 'H', 0, 65535), BaseType.INT32: (int, 'i', -2147483648, 2147483647), BaseType.UINT32: (int, 'I', 0, 4294967295), BaseType.INT64: (int, 'q', -9223372036854775808, 9223372036854775807), BaseType.UINT64: (int, 'Q', 0, 18446744073709551615), BaseType.FLOAT: (float, 'f', -3.4e+38, 3.4e+38), BaseType.DOUBLE: (float, 'd', -1.7e+308, 1.7e+308), BaseType.CHAR: (bytes, 's', None, None), BaseType.BYTES: (bytes, 's', None, None), }
[docs] def getBaseType(self, database: 'CommunicationDatabase') -> BaseType: """ :param database: A database of all known types. :return: The base type name of this type. """ typeInfo = self while typeInfo.baseTypeName not in self._BASE_TYPE_MAPPING: if typeInfo.baseTypeName is None: raise ValueError(f'Type {typeInfo.name} does not have a base type') typeInfo = database.getTypeInfo(typeInfo.baseTypeName) return TypeInfo.BaseType(typeInfo.baseTypeName)
[docs] def getSize(self, database: 'CommunicationDatabase') -> int: """ :param database: A database of all known types. :return: The size of this type in bytes. """ return struct.calcsize('<' + self.getFormat(database))
[docs] def getFormat(self, database: 'CommunicationDatabase', values: Optional[Dict[str, Any]] = None) -> str: """ :param database: A database of all known types. :param values: An optional dictionary of other values on which the format might depend on. For example, this could include a size member for a dynamically sized list. :return: The format string of this type. """ size = None if issubclass(self.type, StructType): return ''.join(childType.getFormat(database) for _, childType in self.type) if issubclass(self.type, ArrayType): try: size = len(self.type) except DynamicSizeError as error: if values is None or error.sizeMember not in values: raise size = values[error.sizeMember] try: return ('' if size is None else str(size)) + \ self._BASE_TYPE_MAPPING[self.baseTypeName][1] except KeyError: if self.baseTypeName is None: raise ValueError(f'Type "{self.name}" does not have an assigned format') baseType = database.getTypeInfo(self.baseTypeName) baseFormat = baseType.getFormat(database) if size is not None: if issubclass(baseType.type, bytes): baseFormat = f'{size}{baseFormat}' else: baseFormat *= size return baseFormat
[docs] def getFormats(self, database: 'CommunicationDatabase') -> List[str]: """ Get the format strings for this type. A type that consists of multiple subtypes will yield a concatenated format string. If a type is dynamically sized, a new format string is started that can be formatted with the parsed values from the previous format string to fill the required size information. :param database: The communication database. :return: The format strings. """ formats = [''] if issubclass(self.type, StructType): typeFormats = [''] for _, childTypeInfo in self.type: childFormats = childTypeInfo.getFormats(database) typeFormats[-1] += childFormats[0] typeFormats.extend(childFormats[1:]) elif issubclass(self.type, ArrayType): elementTypeInfo = self.type.getElementTypeInfo() size = None try: size = len(self.type) except DynamicSizeError as error: formats.append('{' + error.sizeMember + '}' if issubclass(elementTypeInfo.type, bytes) else '') else: if issubclass(elementTypeInfo.type, bytes): formats[-1] += str(size) size = None typeFormats = elementTypeInfo.getFormats(database) if size is not None: if len(typeFormats) == 1: typeFormats = [typeFormats[0] * size] else: typeFormats *= size else: typeFormats = [self.getFormat(database)] formats[-1] += typeFormats[0] formats.extend(typeFormats[1:]) return formats
[docs] def getMinNumericValue(self, database: 'CommunicationDatabase') -> Union[int, float]: """ Get the minimum number that this type can represent. :raises TypeError: if the type is not numeric. :param database: A communication database. :return: The minimum value of this type. """ baseType = self.getBaseType(database) minNumericValue = self._BASE_TYPE_MAPPING[baseType][2] if minNumericValue is None: raise TypeError(f'The type {self.type} is not numeric') return minNumericValue
[docs] def getMaxNumericValue(self, database: 'CommunicationDatabase') -> Union[int, float]: """ Get the maximum number that this type can represent. :raises TypeError: if the type is not numeric. :param database: A communication database. :return: The maximum value of this type. """ baseType = self.getBaseType(database) maxNumericValue = self._BASE_TYPE_MAPPING[baseType][3] if maxNumericValue is None: raise TypeError(f'The type {self.type} is not numeric') return maxNumericValue
@classmethod
[docs] def lookupBaseType(cls, name: BaseType) -> 'TypeInfo': """ Find a base type by its name. :param name: The name of the base type. :return: The TypeInfo of the base type. """ # noinspection PyTypeChecker return TypeInfo(cls._BASE_TYPE_MAPPING[name][0], name, baseTypeName=name)
[docs] def copyWithType(self, typ: Type[V]) -> 'TypeInfo[V]': """ Copy this TypeInfo but replace the type with the given new type. :param typ: The new type that this type info should contain. :return: The copied type info with the new type. """ defaultInfo = self.default if defaultInfo is not None: defaultInfo = replace_dataclass(defaultInfo, value=loadTypedValue(defaultInfo.value, typ)) return replace_dataclass(self, type=typ, default=defaultInfo)