Skip to content

Commit

Permalink
Simplify pending email storage (#305)
Browse files Browse the repository at this point in the history
* Simplify pending storage implementation

* Don't store pending emails for non-Lokole domains

* Remove unused constants
  • Loading branch information
c-w authored Jan 30, 2020
1 parent 301c10d commit 0b08c22
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 58 deletions.
34 changes: 15 additions & 19 deletions opwen_email_server/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ class StoreInboundEmails(_Action):
def __init__(self,
raw_email_storage: AzureTextStorage,
email_storage: AzureObjectStorage,
pending_factory: Callable[[str], AzureTextStorage],
pending_storage: AzureTextStorage,
next_task: Callable[[str], None],
email_parser: Callable[[str], dict] = None):

self._raw_email_storage = raw_email_storage
self._email_storage = email_storage
self._pending_factory = pending_factory
self._pending_storage = pending_storage
self._next_task = next_task
self._email_parser = email_parser or MimeEmailParser()

Expand All @@ -103,8 +103,8 @@ def _store_inbound_email(self, email: dict) -> str:
self._email_storage.store_object(email_id, email)

for domain in get_domains(email):
pending_storage = self._pending_factory(domain)
pending_storage.store_text(email_id, 'pending')
if domain.endswith(mailbox.MAILBOX_DOMAIN):
self._pending_storage.store_text(f'{domain}/{email_id}', 'pending')

return email_id

Expand Down Expand Up @@ -239,12 +239,12 @@ def _new_email_id(cls, email: str) -> str:

class DownloadClientEmails(_Action):
def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, email_storage: AzureObjectStorage,
pending_factory: Callable[[str], AzureTextStorage]):
pending_storage: AzureTextStorage):

self._auth = auth
self._client_storage = client_storage
self._email_storage = email_storage
self._pending_factory = pending_factory
self._pending_storage = pending_storage

def _action(self, client_id, compression): # type: ignore
domain = self._auth.domain_for(client_id)
Expand All @@ -262,23 +262,21 @@ def mark_delivered(email: dict) -> dict:
delivered.add(email['_uid'])
return email

pending_storage = self._pending_factory(domain)

pending = self._fetch_pending_emails(pending_storage)
pending = self._fetch_pending_emails(domain)
pending = (mark_delivered(email) for email in pending)
pending = (self._encode_attachments(email) for email in pending)

resource_id = self._client_storage.store_objects((sync.EMAILS_FILE, pending, to_jsonl_bytes), compression)

self._mark_emails_as_delivered(pending_storage, delivered)
self._mark_emails_as_delivered(domain, delivered)

self.log_event(events.EMAILS_DELIVERED_TO_CLIENT, {'domain': domain, 'num_emails': len(delivered)}) # noqa: E501 # yapf: disable
return {
'resource_id': resource_id,
}

def _fetch_pending_emails(self, pending_storage: AzureTextStorage):
for email_id in pending_storage.iter():
def _fetch_pending_emails(self, domain: str) -> Iterable[dict]:
for email_id in self._pending_storage.iter(f'{domain}/'):
yield self._email_storage.fetch_object(email_id)

@classmethod
Expand All @@ -292,10 +290,9 @@ def _encode_attachments(cls, email: dict) -> dict:

return email

@classmethod
def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage, email_ids: Iterable[str]):
def _mark_emails_as_delivered(self, domain: str, email_ids: Iterable[str]) -> None:
for email_id in email_ids:
pending_storage.delete(email_id)
self._pending_storage.delete(f'{domain}/{email_id}')


class UploadClientEmails(_Action):
Expand Down Expand Up @@ -446,16 +443,15 @@ def _action(self, domain, **auth_args): # type: ignore


class CalculatePendingEmailsMetric(_Action):
def __init__(self, auth: AzureAuth, pending_factory: Callable[[str], AzureTextStorage]):
def __init__(self, auth: AzureAuth, pending_storage: AzureTextStorage):
self._auth = auth
self._pending_factory = pending_factory
self._pending_storage = pending_storage

def _action(self, domain, **auth_args): # type: ignore
if not self._auth.is_owner(domain, auth_args.get('user')):
return 'client does not belong to the user', 403

pending_storage = self._pending_factory(domain)
pending_emails = sum(1 for _ in pending_storage.iter())
pending_emails = sum(1 for _ in self._pending_storage.iter(f'{domain}/'))

return {
'pending_emails': pending_emails,
Expand Down
7 changes: 2 additions & 5 deletions opwen_email_server/constants/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,5 @@
CONTAINER_MAILBOX = 'mailbox' # type: Final
CONTAINER_USERS = 'users' # type: Final
CONTAINER_SENDGRID_MIME = 'sendgridinboundemails' # type: Final
TABLE_DOMAIN_X_DELIVERED = 'emaildomainxdelivered' # type: Final
TABLE_AUTH = 'clientsauth' # type: Final
QUEUE_CLIENT_PACKAGE = 'lokoleinboundemails' # type: Final
QUEUE_EMAIL_SEND = 'sengridoutboundemails' # type: Final
QUEUE_SENDGRID_MIME = 'sengridinboundemails' # type: Final
CONTAINER_PENDING = 'pendingemails' # type: Final
CONTAINER_AUTH = 'clientsauth' # type: Final
3 changes: 0 additions & 3 deletions opwen_email_server/constants/cache.py

This file was deleted.

12 changes: 4 additions & 8 deletions opwen_email_server/integration/azure.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from functools import lru_cache

from opwen_email_server import config
from opwen_email_server.constants import azure as constants
from opwen_email_server.constants.cache import PENDING_STORAGE_CACHE_SIZE
from opwen_email_server.services.auth import AzureAuth
from opwen_email_server.services.storage import AzureFileStorage
from opwen_email_server.services.storage import AzureObjectsStorage
Expand All @@ -18,7 +15,7 @@ def get_auth() -> AzureAuth:
key=config.TABLES_KEY,
host=config.TABLES_HOST,
secure=config.TABLES_SECURE,
container=constants.TABLE_AUTH,
container=constants.CONTAINER_AUTH,
provider=config.STORAGE_PROVIDER,
))

Expand Down Expand Up @@ -83,14 +80,13 @@ def get_mailbox_storage() -> AzureObjectStorage:
)


@lru_cache(maxsize=PENDING_STORAGE_CACHE_SIZE)
def get_pending_storage(domain: str) -> AzureTextStorage:
container = domain.replace('.', '-')
@singleton
def get_pending_storage() -> AzureTextStorage:
return AzureTextStorage(
account=config.TABLES_ACCOUNT,
key=config.TABLES_KEY,
host=config.TABLES_HOST,
secure=config.TABLES_SECURE,
container=container,
container=constants.CONTAINER_PENDING,
provider=config.STORAGE_PROVIDER,
)
2 changes: 1 addition & 1 deletion opwen_email_server/integration/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def inbound_store(resource_id: str) -> None:
action = StoreInboundEmails(
raw_email_storage=get_raw_email_storage(),
email_storage=get_email_storage(),
pending_factory=get_pending_storage,
pending_storage=get_pending_storage(),
next_task=index_received_email_for_mailbox.delay,
)

Expand Down
4 changes: 2 additions & 2 deletions opwen_email_server/integration/connexion.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
auth=get_auth(),
client_storage=get_client_storage(),
email_storage=get_email_storage(),
pending_factory=get_pending_storage,
pending_storage=get_pending_storage(),
)

client_create = CreateClient(
Expand Down Expand Up @@ -71,7 +71,7 @@

metrics_pending = CalculatePendingEmailsMetric(
auth=get_auth(),
pending_factory=get_pending_storage,
pending_storage=get_pending_storage(),
)

basic_auth = AnyOfBasicAuth(auths=[
Expand Down
30 changes: 10 additions & 20 deletions tests/opwen_email_server/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def setUp(self):
self.raw_email_storage = Mock()
self.email_storage = Mock()
self.pending_storage = Mock()
self.pending_factory = MagicMock()
self.email_parser = MagicMock()
self.next_task = MagicMock()

Expand All @@ -98,21 +97,19 @@ def test_202(self):
self.raw_email_storage.fetch_text.assert_called_once_with(resource_id)
self.assertFalse(self.raw_email_storage.delete.called)
self.assertFalse(self.email_storage.store_object.called)
self.assertFalse(self.pending_factory.called)
self.assertFalse(self.pending_storage.store_text.called)
self.assertFalse(self.email_parser.called)

def test_200(self):
resource_id = 'b8dcaf40-fd14-4a89-8898-c9514b0ad724'
domain = 'test.com'
domain = 'test.lokole.ca'
raw_email = 'dummy-mime'
parsed_email = {'to': [f'foo@{domain}']}
email_id = 'c1763288b50107e4e4df4f2d7144f1085729ed112500995ab8103dd532276c18'
parsed_email = {'to': [f'foo@{domain}', 'bar@test.com']}
email_id = '9241404a42d74e1b6eba626711cae643fccf79af9c2c2cc385a89142b89fed1a'
stored_email = dict(parsed_email)
stored_email['_uid'] = email_id

self.raw_email_storage.fetch_text.return_value = raw_email
self.pending_factory.return_value = self.pending_storage
self.email_parser.return_value = parsed_email

_, status = self._execute_action(resource_id)
Expand All @@ -121,16 +118,15 @@ def test_200(self):
self.raw_email_storage.fetch_text.assert_called_once_with(resource_id)
self.raw_email_storage.delete.assert_called_once_with(resource_id)
self.email_storage.store_object.assert_called_once_with(email_id, stored_email)
self.pending_factory.assert_called_once_with(domain)
self.pending_storage.store_text.assert_called_once_with(email_id, 'pending')
self.pending_storage.store_text.assert_called_once_with(f'{domain}/{email_id}', 'pending')
self.email_parser.assert_called_once_with(raw_email)
self.next_task.assert_called_once_with(email_id)

def _execute_action(self, *args, **kwargs):
action = actions.StoreInboundEmails(
raw_email_storage=self.raw_email_storage,
email_storage=self.email_storage,
pending_factory=self.pending_factory,
pending_storage=self.pending_storage,
email_parser=self.email_parser,
next_task=self.next_task,
)
Expand Down Expand Up @@ -324,7 +320,6 @@ def setUp(self):
self.auth = Mock()
self.client_storage = Mock()
self.email_storage = Mock()
self.pending_factory = MagicMock()
self.pending_storage = Mock()

def test_400(self):
Expand Down Expand Up @@ -386,7 +381,6 @@ def store_objects_mock(upload, compression):
return resource_id

self.auth.domain_for.return_value = domain
self.pending_factory.return_value = self.pending_storage
self.pending_storage.iter.return_value = [email_id]
self.email_storage.fetch_object.return_value = server_email
self.client_storage.store_objects.side_effect = store_objects_mock
Expand All @@ -396,9 +390,8 @@ def store_objects_mock(upload, compression):

self.assertEqual(response.get('resource_id'), resource_id)
self.auth.domain_for.assert_called_once_with(client_id)
self.pending_factory.assert_called_once_with(domain)
self.pending_storage.iter.assert_called_once_with()
self.pending_storage.delete.assert_called_once_with(email_id)
self.pending_storage.iter.assert_called_once_with(f'{domain}/')
self.pending_storage.delete.assert_called_once_with(f'{domain}/{email_id}')
self.email_storage.fetch_object.assert_called_once_with(email_id)
self.assertEqual(_stored[sync.EMAILS_FILE], [client_email])
self.assertEqual(_compression[sync.EMAILS_FILE], ['gz'])
Expand All @@ -409,7 +402,7 @@ def _execute_action(self, *args, **kwargs):
auth=self.auth,
client_storage=self.client_storage,
email_storage=self.email_storage,
pending_factory=self.pending_factory,
pending_storage=self.pending_storage,
)

return action(*args, **kwargs)
Expand Down Expand Up @@ -747,7 +740,6 @@ class CalculatePendingEmailsMetricTests(TestCase):
def setUp(self):
self.auth = Mock()
self.pending_storage = Mock()
self.pending_factory = MagicMock()

def test_403(self):
domain = 'test.com'
Expand All @@ -769,20 +761,18 @@ def test_200(self):
]

self.auth.is_owner.return_value = True
self.pending_factory.return_value = self.pending_storage
self.pending_storage.iter.return_value = pending_email_ids

response = self._execute_action(domain, user=user)

self.assertEqual(response['pending_emails'], len(pending_email_ids))
self.auth.is_owner.assert_called_once_with(domain, user)
self.pending_factory.assert_called_once_with(domain)
self.pending_storage.iter.assert_called_once_with()
self.pending_storage.iter.assert_called_once_with(f'{domain}/')

def _execute_action(self, *args, **kwargs):
action = actions.CalculatePendingEmailsMetric(
auth=self.auth,
pending_factory=self.pending_factory,
pending_storage=self.pending_storage,
)

return action(*args, **kwargs)

0 comments on commit 0b08c22

Please sign in to comment.