Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/support misb 0903 #37

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added data/realflight.bin
Binary file not shown.
3 changes: 2 additions & 1 deletion klvdata/__init__.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .streamparser import StreamParser
from . import misb0601
from . import misb0102
from . import misb0102
from . import misb0903
46 changes: 39 additions & 7 deletions klvdata/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from datetime import datetime
from datetime import timezone
from binascii import hexlify, unhexlify
from math import log, ceil, floor

def datetime_to_bytes(value):
"""Return bytes representing UTC time in microseconds."""
Expand Down Expand Up @@ -77,13 +78,16 @@ def ber_encode(value):
return int_to_bytes(byte_length + 128) + int_to_bytes(value, length=byte_length)


def bytes_to_str(value):
"""Return UTF-8 formatted string from bytes object."""
return bytes(value).decode('UTF-8')
def bytes_to_str(value, encoding='UTF-8'):
"""Return string from bytes object."""
if isinstance(value, str):
return value
else:
return bytes(value).decode(encoding)


def str_to_bytes(value):
"""Return bytes object from UTF-8 formatted string."""
def str_to_bytes(value, encoding='UTF-8'):
"""Return bytes object from string."""
return bytes(str(value), 'UTF-8')


Expand Down Expand Up @@ -124,13 +128,11 @@ def linear_map(src_value, src_domain, dst_range):

return dst_value


def bytes_to_float(value, _domain, _range):
"""Convert the fixed point value self.value to a floating point value."""
src_value = int().from_bytes(value, byteorder='big', signed=(min(_domain) < 0))
return linear_map(src_value, _domain, _range)


def float_to_bytes(value, _domain, _range):
"""Convert the fixed point value self.value to a floating point value."""
# Some classes like MappedElement are calling float_to_bytes with arguments _domain
Expand All @@ -142,6 +144,36 @@ def float_to_bytes(value, _domain, _range):
dst_value = linear_map(value, src_domain=src_domain, dst_range=dst_range)
return round(dst_value).to_bytes(length, byteorder='big', signed=(dst_min < 0))

def float_to_imapb(value, _length, _range):
_min, _max = _range
if value < _min or value > _max:
return b''

bPow = ceil(log(_max - _min, 2))
dPow = 8 * _length - 1
sF = 2**(dPow - bPow)
zOffset = 0.0
if _min < 0 and _max > 0:
zOffset = sF * _min - floor(sF * _min)

y = int(sF * (value - _min) + zOffset)

return int_to_bytes(y, _length, signed=True)

def imapb_to_float(value, _range):
_min, _max = _range
length = len(value)

bPow = ceil(log(_max - _min, 2))
dPow = 8 * length - 1
sF = 2**(dPow - bPow)
sR = 2**(bPow - dPow)
zOffset = 0.0
if _min < 0 and _max > 0:
zOffset = sF * _min - floor(sF * _min)

y = bytes_to_int(value, signed=True)
return sR * (y - zOffset) + _min

def packet_checksum(data):
"""Return two byte checksum from a SMPTE ST 336 KLV structured bytes object."""
Expand Down
4 changes: 3 additions & 1 deletion klvdata/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from abc import ABCMeta
from abc import abstractmethod
from klvdata.common import ber_encode

from klvdata.common import int_to_bytes

# Proposed alternate names, "BaseElement" of modules "bases".
class Element(metaclass=ABCMeta):
Expand Down Expand Up @@ -65,6 +65,8 @@ def __bytes__(self):

def __len__(self):
"""Return the byte length of self.value."""
if isinstance(self.value, int):
return len(int_to_bytes(self.value))
return len(bytes(self.value))

@abstractmethod
Expand Down
149 changes: 142 additions & 7 deletions klvdata/elementparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
from klvdata.common import bytes_to_float
from klvdata.common import bytes_to_hexstr
from klvdata.common import bytes_to_str
from klvdata.common import imapb_to_float
from klvdata.common import datetime_to_bytes
from klvdata.common import float_to_bytes
from klvdata.common import str_to_bytes
from klvdata.common import int_to_bytes
from klvdata.common import float_to_imapb


class ElementParser(Element, metaclass=ABCMeta):
Expand Down Expand Up @@ -80,13 +83,24 @@ class BytesElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(BytesValue(value))

def __bytes__(self):
"""Return the MISB encoded representation of a Key, Length, Value element."""
if isinstance(self.value, int):
bytesvalue = int_to_bytes(self.value)
else:
bytesvalue = bytes(self.value)
return bytes(self.key) + bytes(self.length) + bytesvalue


class BytesValue(BaseValue):
def __init__(self, value):
self.value = value

def __bytes__(self):
return bytes(self.value)
if isinstance(self.value, int):
return int_to_bytes(self.value, length=1 if self.value <= 255 else 2)
else:
return bytes(self.value)

def __str__(self):
return bytes_to_hexstr(self.value, start='0x', sep='')
Expand All @@ -110,26 +124,79 @@ def __str__(self):

class StringElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(StringValue(value))
super().__init__(StringValue(value, self._encoding))

@property
def _encoding(cls):
return 'UTF-8'


class StringValue(BaseValue):
def __init__(self, value):
def __init__(self, value, _encoding = 'UTF-8'):
self._encoding = _encoding
try:
self.value = bytes_to_str(value)
except TypeError:
self.value = bytes_to_str(value, _encoding)
except UnicodeDecodeError:
self.value = value

def __bytes__(self):
return str_to_bytes(self.value)
return str_to_bytes(self.value, self._encoding)

def __str__(self):
return str(self.value)


class IntegerElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(IntegerValue(value, self._size, self._signed))

@property
@classmethod
@abstractmethod
def _signed(cls):
pass

@property
@classmethod
@abstractmethod
def _size(cls):
pass

class IntegerValue(BaseValue):
def __init__(self, value, _size, _signed):
self._size = _size
self._signed = _signed
if isinstance(value, int):
self.value = value
else:
try:
self.value = bytes_to_int(value, _signed)
except TypeError:
self.value = value

def __bytes__(self):
return int_to_bytes(self.value, self._size, self._signed)

def __str__(self):
return str(self.value)

class MappedElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(MappedValue(value, self._domain, self._range))
if isinstance(value, float):
localvalue = float_to_bytes(value, self._domain, self._range)
elif isinstance(value, int):
localvalue = float_to_bytes(float(value), self._domain, self._range)
else:
localvalue = value
if int.from_bytes(localvalue, byteorder='big', signed=True) == self._error:
super().__init__(StringValue(('%s (%s)'%(bytes_to_hexstr(localvalue, start='0x', sep=''),'Standard error indicator')).encode('UTF-8')))
else:
super().__init__(MappedValue(localvalue, self._domain, self._range))

def __bytes__(self):
"""Return the MISB encoded representation of a Key, Length, Value element."""
return bytes(self.key) + bytes(self.length) + bytes(self.value)


@property
@classmethod
Expand All @@ -143,6 +210,10 @@ def _domain(cls):
def _range(cls):
pass

@property
def _error(cls):
pass

class MappedValue(BaseValue):
def __init__(self, value, _domain, _range):
self._domain = _domain
Expand All @@ -162,6 +233,70 @@ def __str__(self):
def __float__(self):
return self.value

class EnumElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(EnumValue(value, self._enum))

@property
@classmethod
@abstractmethod
def _enum(cls):
pass

class EnumValue(BaseValue):
def __init__(self, value, _enum):
if isinstance(value, bytes):
self.value = bytes_to_int(value)
else:
self.value = value
self._enum = _enum

def __bytes__(self):
return int_to_bytes(self.value)

def __str__(self):
try:
return self._enum[self.value]
except KeyError:
return str(self.value)

class IMAPBElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(IMAPBValue(value, self._range))

@property
@classmethod
@abstractmethod
def _range(cls):
pass

class IMAPBValue(BaseValue):
def __init__(self, value, _range):
self._range = _range
self._length = len(value)
self.value = imapb_to_float(value, self._range)

def __bytes__(self):
return float_to_imapb(self.value, self._length, self._range)

def __str__(self):
return str(self.value)

class LocationElementParser(ElementParser, metaclass=ABCMeta):
def __init__(self, value):
super().__init__(LocationValue(value))

class LocationValue(BaseValue):
def __init__(self, value):
self.value = (imapb_to_float(value[0:4], (-90, 90)),
imapb_to_float(value[4:8], (-180, 180)),
imapb_to_float(value[8:10], (-900, 19000)))

def __bytes__(self):
lat, long, alt = self.value
return (float_to_imapb(lat, 4, (-90, 90)) +
float_to_imapb(long, 4, (-180, 180)) +
float_to_imapb(alt, 2, (-900, 19000)))

def __str__(self):
return str(self.value)
17 changes: 11 additions & 6 deletions klvdata/klvparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from sys import maxsize
from io import BytesIO
from io import IOBase
from klvdata.common import bytes_to_int

from klvdata.common import bytes_to_hexstr

class KLVParser(object):
"""Return key, value pairs parsed from an SMPTE ST 336 source."""
Expand All @@ -43,7 +44,7 @@ def __iter__(self):

def __next__(self):
key = self.__read(self.key_length)

byte_length = bytes_to_int(self.__read(1))

if byte_length < 128:
Expand All @@ -53,16 +54,20 @@ def __next__(self):
# BER Long Form
length = bytes_to_int(self.__read(byte_length - 128))

value = self.__read(length)

try:
value = self.__read(length)
except OverflowError:
return key, None

return key, value

def __read(self, size):
if size < 0 or size > maxsize:
raise OverflowError

if size == 0:
return b''

assert size > 0

data = self.source.read(size)

if data:
Expand Down
Loading