Skip to content

Commit

Permalink
Store objects internally in msgpack (#118)
Browse files Browse the repository at this point in the history
* Store objects internally in msgpack

This ensures that we store the emails in a sane format internally (i.e.
with attachment content bytes stored as bytes) and only serialize them
to the data transfer format (base64 encoded bytes) at the edges when
communicating with the clients.

* Fix responses typings

* Fix set equality typings

* Extract base64 serialization methods

* Make AzureBytesStorage private

* Move to/from serialization methods together
  • Loading branch information
c-w authored Jan 3, 2019
1 parent 9dc97f0 commit 0e5cd9c
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 114 deletions.
25 changes: 25 additions & 0 deletions opwen_email_server/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from opwen_email_server.utils.email_parser import get_domains
from opwen_email_server.utils.email_parser import parse_mime_email
from opwen_email_server.utils.log import LogMixin
from opwen_email_server.utils.serialization import from_base64
from opwen_email_server.utils.serialization import from_jsonl_bytes
from opwen_email_server.utils.serialization import to_base64
from opwen_email_server.utils.serialization import to_jsonl_bytes

Response = Union[dict, Tuple[str, int]]
Expand Down Expand Up @@ -120,6 +122,7 @@ def _action(self, resource_id): # type: ignore
num_stored = 0
for email in emails:
email_id = email['_uid']
email = self._decode_attachments(email)
self._email_storage.store_object(email_id, email)

self._next_task(email_id)
Expand All @@ -132,6 +135,16 @@ def _action(self, resource_id): # type: ignore
self.log_event(events.EMAIL_STORED_FROM_CLIENT, {'domain': domain, 'num_emails': num_stored}) # noqa: E501
return 'OK', 200

@classmethod
def _decode_attachments(cls, email: dict) -> dict:
if not email.get('attachments'):
return email

for attachment in email['attachments']:
attachment['content'] = from_base64(attachment['content'])

return email


class ReceiveInboundEmail(_Action):
def __init__(self,
Expand Down Expand Up @@ -197,6 +210,7 @@ def mark_delivered(email: dict) -> dict:

pending = self._fetch_pending_emails(pending_storage)
pending = (mark_delivered(email) for email in pending)
pending = (self._encode_attachments(email) for email in pending)

resource_id = self._client_storage.store_objects(
(sync.EMAILS_FILE, pending, to_jsonl_bytes),
Expand All @@ -213,6 +227,17 @@ def _fetch_pending_emails(self, pending_storage: AzureTextStorage):
for email_id in pending_storage.iter():
yield self._email_storage.fetch_object(email_id)

@classmethod
def _encode_attachments(cls, email: dict) -> dict:
if not email.get('attachments'):
return email

for attachment in email['attachments']:
content_bytes = attachment['content']
attachment['content'] = to_base64(content_bytes)

return email

@classmethod
def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage,
email_ids: Iterable[str]):
Expand Down
9 changes: 4 additions & 5 deletions opwen_email_server/integration/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ def get_mx_setup() -> SetupCloudflareMxRecords:
@singleton
def get_email_storage() -> AzureObjectStorage:
return AzureObjectStorage(
text_storage=AzureTextStorage(
account=config.BLOBS_ACCOUNT,
key=config.BLOBS_KEY,
container=constants.CONTAINER_EMAILS,
provider=config.STORAGE_PROVIDER))
account=config.BLOBS_ACCOUNT,
key=config.BLOBS_KEY,
container=constants.CONTAINER_EMAILS,
provider=config.STORAGE_PROVIDER)


@lru_cache(maxsize=PENDING_STORAGE_CACHE_SIZE)
Expand Down
60 changes: 39 additions & 21 deletions opwen_email_server/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from xtarfile.xtarfile import SUPPORTED_FORMATS

from opwen_email_server.utils.log import LogMixin
from opwen_email_server.utils.serialization import from_json
from opwen_email_server.utils.serialization import gunzip_string
from opwen_email_server.utils.serialization import gzip_string
from opwen_email_server.utils.serialization import to_json
from opwen_email_server.utils.path import get_extension
from opwen_email_server.utils.serialization import from_msgpack_bytes
from opwen_email_server.utils.serialization import gunzip_bytes
from opwen_email_server.utils.serialization import gzip_bytes
from opwen_email_server.utils.serialization import to_msgpack_bytes
from opwen_email_server.utils.temporary import create_tempfilename
from opwen_email_server.utils.temporary import removing

Expand Down Expand Up @@ -72,7 +73,8 @@ def delete(self, resource_id: str):

def iter(self) -> Iterator[str]:
for resource in self._client.list_objects():
resource_id = resource.name
extension = get_extension(resource.name)
resource_id = resource.name.replace(extension, '')
yield resource_id
self.log_debug('listed %s', resource_id)

Expand All @@ -90,38 +92,55 @@ def fetch_file(self, resource_id: str) -> str:
return path


class AzureTextStorage(_BaseAzureStorage):
class _AzureBytesStorage(_BaseAzureStorage):
_compression = 'gz'

def store_text(self, resource_id: str, text: str):
def store_bytes(self, resource_id: str, content: bytes):
filename = self._to_filename(resource_id)
self.log_debug('storing %d characters at %s', len(text), filename)
self.log_debug('storing %d bytes at %s', len(content), filename)
upload = BytesIO()
upload.write(gzip_string(text))
upload.write(gzip_bytes(content))
upload.seek(0)
self._client.upload_object_via_stream(upload, filename)

def fetch_text(self, resource_id: str) -> str:
def fetch_bytes(self, resource_id: str) -> bytes:
filename = self._to_filename(resource_id)
download = BytesIO()
resource = self._client.get_object(filename)
for chunk in resource.as_stream():
download.write(chunk)
download.seek(0)
text = gunzip_string(download.read())
self.log_debug('fetched %d characters from %s', len(text), filename)
return text
content = gunzip_bytes(download.read())
self.log_debug('fetched %d bytes from %s', len(content), filename)
return content

def delete(self, resource_id: str):
filename = self._to_filename(resource_id)
super().delete(filename)

def _to_filename(self, resource_id: str) -> str:
extension = '.txt.{}'.format(self._compression)
extension = '.{}.{}'.format(self._extension, self._compression)
if resource_id.endswith(extension):
return resource_id
return '{}{}'.format(resource_id, extension)

@property
def _extension(self) -> str:
raise NotImplementedError


class AzureTextStorage(_AzureBytesStorage):
_encoding = 'utf-8'
_extension = 'txt'

def store_text(self, resource_id: str, text: str):
content = text.encode(self._encoding)
self.store_bytes(resource_id, content)

def fetch_text(self, resource_id: str) -> str:
content = self.fetch_bytes(resource_id)
return content.decode(self._encoding)


class AzureObjectsStorage(LogMixin):
_compression = 'zstd'
Expand Down Expand Up @@ -230,14 +249,13 @@ def _new_resource_id(cls) -> str:
return str(uuid4())


class AzureObjectStorage(LogMixin):
def __init__(self, text_storage: AzureTextStorage):
self._text_storage = text_storage
class AzureObjectStorage(_AzureBytesStorage):
_extension = 'msgpack'

def fetch_object(self, resource_id: str) -> dict:
serialized = self._text_storage.fetch_text(resource_id)
return from_json(serialized)
serialized = self.fetch_bytes(resource_id)
return from_msgpack_bytes(serialized)

def store_object(self, resource_id: str, obj: dict) -> None:
serialized = to_json(obj)
self._text_storage.store_text(resource_id, serialized)
serialized = to_msgpack_bytes(obj)
self.store_bytes(resource_id, serialized)
24 changes: 10 additions & 14 deletions opwen_email_server/utils/email_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from base64 import b64decode
from base64 import b64encode
from copy import deepcopy
from datetime import datetime
from datetime import timezone
Expand All @@ -22,6 +20,7 @@

from opwen_email_server.config import MAX_HEIGHT_IMAGES
from opwen_email_server.config import MAX_WIDTH_IMAGES
from opwen_email_server.utils.serialization import to_base64


def _parse_body(message: PyzMessage, default_charset: str = 'ascii') -> str:
Expand All @@ -43,8 +42,7 @@ def _parse_attachments(mailparts: Iterable[MailPart]) -> Iterable[dict]:
filename = part.sanitized_filename
payload = part.get_payload()
if filename and payload:
content = b64encode(payload).decode('ascii')
yield {'filename': filename, 'content': content}
yield {'filename': filename, 'content': payload}


def _parse_addresses(message: PyzMessage, address_type: str) -> List[str]:
Expand Down Expand Up @@ -94,7 +92,7 @@ def format_attachments(email: dict) -> dict:

for i, attachment in enumerate(attachments):
filename = attachment.get('filename', '')
content = attachment.get('content', '')
content = attachment.get('content', b'')
formatted_content = _format_attachment(filename, content)

if content != formatted_content:
Expand All @@ -109,7 +107,7 @@ def format_attachments(email: dict) -> dict:
return new_email


def _format_attachment(filename: str, content: str) -> str:
def _format_attachment(filename: str, content: bytes) -> bytes:
attachment_type = guess_type(filename)[0]

if not attachment_type:
Expand Down Expand Up @@ -148,23 +146,21 @@ def _is_already_small(size: Tuple[int, int]) -> bool:
return width <= MAX_WIDTH_IMAGES and height <= MAX_HEIGHT_IMAGES


def _change_image_size(image_content_b64: str) -> str:
image_content_bytes = b64decode(image_content_b64)
def _change_image_size(image_content_bytes: bytes) -> bytes:
image_bytes = BytesIO(image_content_bytes)
image_bytes.seek(0)
image = Image.open(image_bytes)

if _is_already_small(image.size):
return image_content_b64
return image_content_bytes

new_size = (MAX_WIDTH_IMAGES, MAX_HEIGHT_IMAGES)
image.thumbnail(new_size, Image.ANTIALIAS)
new_image = BytesIO()
image.save(new_image, image.format)
new_image.seek(0)
new_image_bytes = new_image.read()
new_b64 = b64encode(new_image_bytes).decode('ascii')
return new_b64
return new_image_bytes


def _fetch_image_to_base64(image_url: str) -> Optional[str]:
Expand All @@ -179,9 +175,9 @@ def _fetch_image_to_base64(image_url: str) -> Optional[str]:
if not response.content:
return None

image_content = b64encode(response.content).decode('ascii')
image_content = _change_image_size(image_content)
return 'data:{};base64,{}'.format(image_type, image_content)
small_image_bytes = _change_image_size(response.content)
small_image_base64 = to_base64(small_image_bytes)
return 'data:{};base64,{}'.format(image_type, small_image_base64)


def _is_valid_url(url: Optional[str]) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions opwen_email_server/utils/path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pathlib import Path
from typing import Optional


def get_extension(path: Optional[str]) -> str:
return ''.join(Path(path).suffixes) if path else ''
39 changes: 30 additions & 9 deletions opwen_email_server/utils/serialization.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
from base64 import b64decode
from base64 import b64encode
from gzip import GzipFile
from io import BytesIO
from json import JSONDecodeError
from json import dumps
from json import loads
from typing import Optional

from msgpack import packb as msgpack_dump
from msgpack import unpackb as msgpack_load


def to_json(obj: object) -> str:
return dumps(obj, separators=(',', ':'))


def from_json(obj: str) -> dict:
return loads(obj)


def to_jsonl_bytes(obj) -> bytes:
return to_json(obj).encode('utf-8') + b'\n'


def from_jsonl_bytes(obj: bytes) -> Optional[dict]:
try:
decoded = obj.decode('utf-8')
Expand All @@ -22,16 +35,25 @@ def from_jsonl_bytes(obj: bytes) -> Optional[dict]:
return None


def to_json(obj: object) -> str:
return dumps(obj, separators=(',', ':'))
def to_msgpack_bytes(obj) -> bytes:
encoded = msgpack_dump(obj, use_bin_type=True)
return encoded + b'\n'


def to_jsonl_bytes(obj) -> bytes:
return to_json(obj).encode('utf-8') + b'\n'
def from_msgpack_bytes(serialized: bytes) -> dict:
encoded = serialized.rstrip(b'\n')
return msgpack_load(encoded, raw=False)


def to_base64(content: bytes) -> str:
return b64encode(content).decode('ascii')


def from_base64(encoded: str) -> bytes:
return b64decode(encoded)


def gzip_string(uncompressed: str) -> bytes:
uncompressed_bytes = uncompressed.encode()
def gzip_bytes(uncompressed_bytes: bytes) -> bytes:
stream = BytesIO()
with GzipFile(fileobj=stream, mode='wb') as fobj:
fobj.write(uncompressed_bytes) # type: ignore
Expand All @@ -40,11 +62,10 @@ def gzip_string(uncompressed: str) -> bytes:
return compressed


def gunzip_string(compressed: bytes) -> str:
def gunzip_bytes(compressed: bytes) -> bytes:
stream = BytesIO()
stream.write(compressed)
stream.seek(0)
with GzipFile(fileobj=stream, mode='rb') as fobj:
uncompressed_bytes = fobj.read() # type: ignore
uncompressed = uncompressed_bytes.decode()
return uncompressed
return uncompressed_bytes
5 changes: 3 additions & 2 deletions opwen_email_server/utils/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from contextlib import suppress
from os import remove
from os.path import join
from pathlib import Path
from tempfile import gettempdir
from typing import Generator
from typing import Optional
from uuid import uuid4

from opwen_email_server.utils.path import get_extension


def create_tempfilename(suffix: Optional[str] = None) -> str:
extension = ''.join(Path(suffix).suffixes) if suffix else ''
extension = get_extension(suffix)
return join(gettempdir(), '{}{}'.format(uuid4(), extension))


Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ applicationinsights==0.11.7
beautifulsoup4==4.6.3
cached-property==1.5.1
connexion[swagger-ui]==2.2.0
msgpack==0.6.0
pyzmail36==1.0.4
requests==2.21.0
sendgrid==5.6.0
Expand Down

This file was deleted.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 0e5cd9c

Please sign in to comment.