diff --git a/opwen_email_server/actions.py b/opwen_email_server/actions.py index ed27ab66..50a1702c 100644 --- a/opwen_email_server/actions.py +++ b/opwen_email_server/actions.py @@ -70,13 +70,13 @@ class StoreInboundEmails(_Action): def __init__(self, raw_email_storage: AzureTextStorage, email_storage: AzureObjectStorage, - pending_factory: Callable[[str], AzureTextStorage], + pending_storage: AzureTextStorage, next_task: Callable[[str], None], email_parser: Callable[[str], dict] = None): self._raw_email_storage = raw_email_storage self._email_storage = email_storage - self._pending_factory = pending_factory + self._pending_storage = pending_storage self._next_task = next_task self._email_parser = email_parser or MimeEmailParser() @@ -103,8 +103,8 @@ def _store_inbound_email(self, email: dict) -> str: self._email_storage.store_object(email_id, email) for domain in get_domains(email): - pending_storage = self._pending_factory(domain) - pending_storage.store_text(email_id, 'pending') + if domain.endswith(mailbox.MAILBOX_DOMAIN): + self._pending_storage.store_text(f'{domain}/{email_id}', 'pending') return email_id @@ -239,12 +239,12 @@ def _new_email_id(cls, email: str) -> str: class DownloadClientEmails(_Action): def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, email_storage: AzureObjectStorage, - pending_factory: Callable[[str], AzureTextStorage]): + pending_storage: AzureTextStorage): self._auth = auth self._client_storage = client_storage self._email_storage = email_storage - self._pending_factory = pending_factory + self._pending_storage = pending_storage def _action(self, client_id, compression): # type: ignore domain = self._auth.domain_for(client_id) @@ -262,23 +262,21 @@ def mark_delivered(email: dict) -> dict: delivered.add(email['_uid']) return email - pending_storage = self._pending_factory(domain) - - pending = self._fetch_pending_emails(pending_storage) + pending = self._fetch_pending_emails(domain) pending = (mark_delivered(email) for email in pending) pending = (self._encode_attachments(email) for email in pending) resource_id = self._client_storage.store_objects((sync.EMAILS_FILE, pending, to_jsonl_bytes), compression) - self._mark_emails_as_delivered(pending_storage, delivered) + self._mark_emails_as_delivered(domain, delivered) self.log_event(events.EMAILS_DELIVERED_TO_CLIENT, {'domain': domain, 'num_emails': len(delivered)}) # noqa: E501 # yapf: disable return { 'resource_id': resource_id, } - def _fetch_pending_emails(self, pending_storage: AzureTextStorage): - for email_id in pending_storage.iter(): + def _fetch_pending_emails(self, domain: str) -> Iterable[dict]: + for email_id in self._pending_storage.iter(f'{domain}/'): yield self._email_storage.fetch_object(email_id) @classmethod @@ -292,10 +290,9 @@ def _encode_attachments(cls, email: dict) -> dict: return email - @classmethod - def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage, email_ids: Iterable[str]): + def _mark_emails_as_delivered(self, domain: str, email_ids: Iterable[str]) -> None: for email_id in email_ids: - pending_storage.delete(email_id) + self._pending_storage.delete(f'{domain}/{email_id}') class UploadClientEmails(_Action): @@ -446,16 +443,15 @@ def _action(self, domain, **auth_args): # type: ignore class CalculatePendingEmailsMetric(_Action): - def __init__(self, auth: AzureAuth, pending_factory: Callable[[str], AzureTextStorage]): + def __init__(self, auth: AzureAuth, pending_storage: AzureTextStorage): self._auth = auth - self._pending_factory = pending_factory + self._pending_storage = pending_storage def _action(self, domain, **auth_args): # type: ignore if not self._auth.is_owner(domain, auth_args.get('user')): return 'client does not belong to the user', 403 - pending_storage = self._pending_factory(domain) - pending_emails = sum(1 for _ in pending_storage.iter()) + pending_emails = sum(1 for _ in self._pending_storage.iter(f'{domain}/')) return { 'pending_emails': pending_emails, diff --git a/opwen_email_server/constants/azure.py b/opwen_email_server/constants/azure.py index 35e2b251..293a3928 100644 --- a/opwen_email_server/constants/azure.py +++ b/opwen_email_server/constants/azure.py @@ -5,8 +5,5 @@ CONTAINER_MAILBOX = 'mailbox' # type: Final CONTAINER_USERS = 'users' # type: Final CONTAINER_SENDGRID_MIME = 'sendgridinboundemails' # type: Final -TABLE_DOMAIN_X_DELIVERED = 'emaildomainxdelivered' # type: Final -TABLE_AUTH = 'clientsauth' # type: Final -QUEUE_CLIENT_PACKAGE = 'lokoleinboundemails' # type: Final -QUEUE_EMAIL_SEND = 'sengridoutboundemails' # type: Final -QUEUE_SENDGRID_MIME = 'sengridinboundemails' # type: Final +CONTAINER_PENDING = 'pendingemails' # type: Final +CONTAINER_AUTH = 'clientsauth' # type: Final diff --git a/opwen_email_server/constants/cache.py b/opwen_email_server/constants/cache.py deleted file mode 100644 index 4365b901..00000000 --- a/opwen_email_server/constants/cache.py +++ /dev/null @@ -1,3 +0,0 @@ -from typing_extensions import Final # noqa: F401 - -PENDING_STORAGE_CACHE_SIZE = 128 # type: Final diff --git a/opwen_email_server/integration/azure.py b/opwen_email_server/integration/azure.py index 625ae1b6..7e8dde87 100644 --- a/opwen_email_server/integration/azure.py +++ b/opwen_email_server/integration/azure.py @@ -1,8 +1,5 @@ -from functools import lru_cache - from opwen_email_server import config from opwen_email_server.constants import azure as constants -from opwen_email_server.constants.cache import PENDING_STORAGE_CACHE_SIZE from opwen_email_server.services.auth import AzureAuth from opwen_email_server.services.storage import AzureFileStorage from opwen_email_server.services.storage import AzureObjectsStorage @@ -18,7 +15,7 @@ def get_auth() -> AzureAuth: key=config.TABLES_KEY, host=config.TABLES_HOST, secure=config.TABLES_SECURE, - container=constants.TABLE_AUTH, + container=constants.CONTAINER_AUTH, provider=config.STORAGE_PROVIDER, )) @@ -83,14 +80,13 @@ def get_mailbox_storage() -> AzureObjectStorage: ) -@lru_cache(maxsize=PENDING_STORAGE_CACHE_SIZE) -def get_pending_storage(domain: str) -> AzureTextStorage: - container = domain.replace('.', '-') +@singleton +def get_pending_storage() -> AzureTextStorage: return AzureTextStorage( account=config.TABLES_ACCOUNT, key=config.TABLES_KEY, host=config.TABLES_HOST, secure=config.TABLES_SECURE, - container=container, + container=constants.CONTAINER_PENDING, provider=config.STORAGE_PROVIDER, ) diff --git a/opwen_email_server/integration/celery.py b/opwen_email_server/integration/celery.py index 9fa57fa8..998d37f7 100644 --- a/opwen_email_server/integration/celery.py +++ b/opwen_email_server/integration/celery.py @@ -73,7 +73,7 @@ def inbound_store(resource_id: str) -> None: action = StoreInboundEmails( raw_email_storage=get_raw_email_storage(), email_storage=get_email_storage(), - pending_factory=get_pending_storage, + pending_storage=get_pending_storage(), next_task=index_received_email_for_mailbox.delay, ) diff --git a/opwen_email_server/integration/connexion.py b/opwen_email_server/integration/connexion.py index dc525643..5e5bcec1 100644 --- a/opwen_email_server/integration/connexion.py +++ b/opwen_email_server/integration/connexion.py @@ -39,7 +39,7 @@ auth=get_auth(), client_storage=get_client_storage(), email_storage=get_email_storage(), - pending_factory=get_pending_storage, + pending_storage=get_pending_storage(), ) client_create = CreateClient( @@ -71,7 +71,7 @@ metrics_pending = CalculatePendingEmailsMetric( auth=get_auth(), - pending_factory=get_pending_storage, + pending_storage=get_pending_storage(), ) basic_auth = AnyOfBasicAuth(auths=[ diff --git a/tests/opwen_email_server/test_actions.py b/tests/opwen_email_server/test_actions.py index 624aaf30..95e5ef53 100644 --- a/tests/opwen_email_server/test_actions.py +++ b/tests/opwen_email_server/test_actions.py @@ -83,7 +83,6 @@ def setUp(self): self.raw_email_storage = Mock() self.email_storage = Mock() self.pending_storage = Mock() - self.pending_factory = MagicMock() self.email_parser = MagicMock() self.next_task = MagicMock() @@ -98,21 +97,19 @@ def test_202(self): self.raw_email_storage.fetch_text.assert_called_once_with(resource_id) self.assertFalse(self.raw_email_storage.delete.called) self.assertFalse(self.email_storage.store_object.called) - self.assertFalse(self.pending_factory.called) self.assertFalse(self.pending_storage.store_text.called) self.assertFalse(self.email_parser.called) def test_200(self): resource_id = 'b8dcaf40-fd14-4a89-8898-c9514b0ad724' - domain = 'test.com' + domain = 'test.lokole.ca' raw_email = 'dummy-mime' - parsed_email = {'to': [f'foo@{domain}']} - email_id = 'c1763288b50107e4e4df4f2d7144f1085729ed112500995ab8103dd532276c18' + parsed_email = {'to': [f'foo@{domain}', 'bar@test.com']} + email_id = '9241404a42d74e1b6eba626711cae643fccf79af9c2c2cc385a89142b89fed1a' stored_email = dict(parsed_email) stored_email['_uid'] = email_id self.raw_email_storage.fetch_text.return_value = raw_email - self.pending_factory.return_value = self.pending_storage self.email_parser.return_value = parsed_email _, status = self._execute_action(resource_id) @@ -121,8 +118,7 @@ def test_200(self): self.raw_email_storage.fetch_text.assert_called_once_with(resource_id) self.raw_email_storage.delete.assert_called_once_with(resource_id) self.email_storage.store_object.assert_called_once_with(email_id, stored_email) - self.pending_factory.assert_called_once_with(domain) - self.pending_storage.store_text.assert_called_once_with(email_id, 'pending') + self.pending_storage.store_text.assert_called_once_with(f'{domain}/{email_id}', 'pending') self.email_parser.assert_called_once_with(raw_email) self.next_task.assert_called_once_with(email_id) @@ -130,7 +126,7 @@ def _execute_action(self, *args, **kwargs): action = actions.StoreInboundEmails( raw_email_storage=self.raw_email_storage, email_storage=self.email_storage, - pending_factory=self.pending_factory, + pending_storage=self.pending_storage, email_parser=self.email_parser, next_task=self.next_task, ) @@ -324,7 +320,6 @@ def setUp(self): self.auth = Mock() self.client_storage = Mock() self.email_storage = Mock() - self.pending_factory = MagicMock() self.pending_storage = Mock() def test_400(self): @@ -386,7 +381,6 @@ def store_objects_mock(upload, compression): return resource_id self.auth.domain_for.return_value = domain - self.pending_factory.return_value = self.pending_storage self.pending_storage.iter.return_value = [email_id] self.email_storage.fetch_object.return_value = server_email self.client_storage.store_objects.side_effect = store_objects_mock @@ -396,9 +390,8 @@ def store_objects_mock(upload, compression): self.assertEqual(response.get('resource_id'), resource_id) self.auth.domain_for.assert_called_once_with(client_id) - self.pending_factory.assert_called_once_with(domain) - self.pending_storage.iter.assert_called_once_with() - self.pending_storage.delete.assert_called_once_with(email_id) + self.pending_storage.iter.assert_called_once_with(f'{domain}/') + self.pending_storage.delete.assert_called_once_with(f'{domain}/{email_id}') self.email_storage.fetch_object.assert_called_once_with(email_id) self.assertEqual(_stored[sync.EMAILS_FILE], [client_email]) self.assertEqual(_compression[sync.EMAILS_FILE], ['gz']) @@ -409,7 +402,7 @@ def _execute_action(self, *args, **kwargs): auth=self.auth, client_storage=self.client_storage, email_storage=self.email_storage, - pending_factory=self.pending_factory, + pending_storage=self.pending_storage, ) return action(*args, **kwargs) @@ -747,7 +740,6 @@ class CalculatePendingEmailsMetricTests(TestCase): def setUp(self): self.auth = Mock() self.pending_storage = Mock() - self.pending_factory = MagicMock() def test_403(self): domain = 'test.com' @@ -769,20 +761,18 @@ def test_200(self): ] self.auth.is_owner.return_value = True - self.pending_factory.return_value = self.pending_storage self.pending_storage.iter.return_value = pending_email_ids response = self._execute_action(domain, user=user) self.assertEqual(response['pending_emails'], len(pending_email_ids)) self.auth.is_owner.assert_called_once_with(domain, user) - self.pending_factory.assert_called_once_with(domain) - self.pending_storage.iter.assert_called_once_with() + self.pending_storage.iter.assert_called_once_with(f'{domain}/') def _execute_action(self, *args, **kwargs): action = actions.CalculatePendingEmailsMetric( auth=self.auth, - pending_factory=self.pending_factory, + pending_storage=self.pending_storage, ) return action(*args, **kwargs)