Skip to content

Commit

Permalink
Offload client registration to queue (#255)
Browse files Browse the repository at this point in the history
* Add retries to Sendgrid mailbox setup

* Make MX record setup idempotent

* Remove auth caching

* Offload client registration to queue
  • Loading branch information
c-w authored Nov 25, 2019
1 parent 0c7e79a commit 4be0c63
Show file tree
Hide file tree
Showing 20 changed files with 372 additions and 109 deletions.
2 changes: 1 addition & 1 deletion docker/app/run-celery.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

if [[ "${CELERY_QUEUE_NAMES}" = "all" ]]; then
CELERY_QUEUE_NAMES="inbound,written,send,mailboxreceived,mailboxsent"
CELERY_QUEUE_NAMES="register,inbound,written,send,mailboxreceived,mailboxsent"
fi

"${PY_ENV}/bin/celery" \
Expand Down
21 changes: 17 additions & 4 deletions docker/integtest/1-register-client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ curl -fs \
-H "Content-Type: application/json" \
-u "${REGISTRATION_CREDENTIALS}" \
-d '{"domain":"developer1.lokole.ca"}' \
"http://nginx:8888/api/email/register/" \
| tee "${out_dir}/register1.json"
"http://nginx:8888/api/email/register/"

while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer1.lokole.ca" | tee "${out_dir}/register1.json"; do
log "Waiting for client 1 registration"
sleep 1s
done

# registering a client with bad credentials should fail
if curl -fs \
Expand All @@ -29,8 +33,12 @@ curl -fs \
-H "Content-Type: application/json" \
-u "${REGISTRATION_CREDENTIALS}" \
-d '{"domain":"developer2.lokole.ca"}' \
"http://nginx:8888/api/email/register/" \
| tee "${out_dir}/register2.json"
"http://nginx:8888/api/email/register/"

while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer2.lokole.ca" | tee "${out_dir}/register2.json"; do
log "Waiting for client 2 registration"
sleep 1s
done

# after creating a client, creating the same one again should fail but we should be able to delete it
curl -fs \
Expand All @@ -39,6 +47,11 @@ curl -fs \
-d '{"domain":"developer3.lokole.ca"}' \
"http://nginx:8888/api/email/register/"

while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer3.lokole.ca"; do
log "Waiting for client 3 registration"
sleep 1s
done

if curl -fs \
-H "Content-Type: application/json" \
-u "${REGISTRATION_CREDENTIALS}" \
Expand Down
47 changes: 40 additions & 7 deletions opwen_email_server/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,55 @@ def __init__(self,
self._setup_mx_records = setup_mx_records
self._client_id_source = client_id_source or new_client_id

def _action(self, domain, owner): # type: ignore
client_id = self._client_id_source()

self._setup_mailbox(client_id, domain)
self._setup_mx_records(domain)
self._client_storage.ensure_exists()
self._auth.insert(client_id, domain, owner)

self.log_event(events.NEW_CLIENT_REGISTERED, {'domain': domain}) # noqa: E501 # yapf: disable
return 'OK', 200


class CreateClient(_Action):
def __init__(self, auth: AzureAuth, task: Callable[[str, str], None]):
self._auth = auth
self._task = task

def _action(self, client, **auth_args): # type: ignore
domain = client['domain']
if not is_lowercase(domain):
return 'domain must be lowercase', 400
if self._auth.client_id_for(domain) is not None:
return 'client already exists', 409

client_id = self._client_id_source()
access_info = self._client_storage.access_info()
self._task(domain, auth_args.get('user'))

self._setup_mailbox(client_id, domain)
self._setup_mx_records(domain)
self._client_storage.ensure_exists()
self._auth.insert(client_id, domain, auth_args.get('user'))
self.log_event(events.CLIENT_CREATED, {'domain': domain}) # noqa: E501 # yapf: disable
return 'accepted', 201

self.log_event(events.NEW_CLIENT_REGISTERED, {'domain': domain}) # noqa: E501 # yapf: disable

class GetClient(_Action):
def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage):
self._auth = auth
self._client_storage = client_storage

def _action(self, domain, **auth_args): # type: ignore
if not is_lowercase(domain):
return 'domain must be lowercase', 400

client_id = self._auth.client_id_for(domain)
if client_id is None:
return 'client does not exist', 404

if not self._auth.is_owner(domain, auth_args.get('user')):
return 'client does not belong to the user', 403

access_info = self._client_storage.access_info()

self.log_event(events.CLIENT_FETCHED, {'domain': domain}) # noqa: E501 # yapf: disable
return {
'client_id': client_id,
'storage_account': access_info.account,
Expand Down
1 change: 0 additions & 1 deletion opwen_email_server/constants/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing_extensions import Final # noqa: F401

AUTH_DOMAIN_CACHE_SIZE = 128 # type: Final
PENDING_STORAGE_CACHE_SIZE = 128 # type: Final
2 changes: 2 additions & 0 deletions opwen_email_server/constants/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing_extensions import Final # noqa: F401

CLIENT_DELETED = 'client_deleted' # type: Final
CLIENT_FETCHED = 'client_fetched' # type: Final
CLIENT_CREATED = 'client_created' # type: Final
NEW_CLIENT_REGISTERED = 'new_client_registered' # type: Final
UNREGISTERED_CLIENT = 'unregistered_client' # type: Final
UNKNOWN_USER = 'unknown_user' # type: Final
Expand Down
1 change: 1 addition & 0 deletions opwen_email_server/constants/queues.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing_extensions import Final # noqa: F401

REGISTER_CLIENT_QUEUE = 'register' # type: Final
INBOUND_STORE_QUEUE = 'inbound' # type: Final
WRITTEN_STORE_QUEUE = 'written' # type: Final
SEND_QUEUE = 'send' # type: Final
Expand Down
2 changes: 1 addition & 1 deletion opwen_email_server/constants/sendgrid.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing_extensions import Final # noqa: F401

MAILBOX_CREATE_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings' # type: Final # noqa: E501 # yapf: disable
MAILBOX_DELETE_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings/{}' # type: Final # noqa: E501 # yapf: disable
MAILBOX_DETAIL_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings/{}' # type: Final # noqa: E501 # yapf: disable

INBOX_URL = 'https://mailserver.lokole.ca/api/email/sendgrid/{}' # type: Final

Expand Down
18 changes: 18 additions & 0 deletions opwen_email_server/integration/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,42 @@

from opwen_email_server.actions import IndexReceivedEmailForMailbox
from opwen_email_server.actions import IndexSentEmailForMailbox
from opwen_email_server.actions import RegisterClient
from opwen_email_server.actions import SendOutboundEmails
from opwen_email_server.actions import StoreInboundEmails
from opwen_email_server.actions import StoreWrittenClientEmails
from opwen_email_server.config import QUEUE_BROKER
from opwen_email_server.constants.queues import INBOUND_STORE_QUEUE
from opwen_email_server.constants.queues import MAILBOX_RECEIVED_QUEUE
from opwen_email_server.constants.queues import MAILBOX_SENT_QUEUE
from opwen_email_server.constants.queues import REGISTER_CLIENT_QUEUE
from opwen_email_server.constants.queues import SEND_QUEUE
from opwen_email_server.constants.queues import WRITTEN_STORE_QUEUE
from opwen_email_server.integration.azure import get_auth
from opwen_email_server.integration.azure import get_client_storage
from opwen_email_server.integration.azure import get_email_sender
from opwen_email_server.integration.azure import get_email_storage
from opwen_email_server.integration.azure import get_mailbox_setup
from opwen_email_server.integration.azure import get_mailbox_storage
from opwen_email_server.integration.azure import get_mx_setup
from opwen_email_server.integration.azure import get_pending_storage
from opwen_email_server.integration.azure import get_raw_email_storage

celery = Celery(broker=QUEUE_BROKER)


@celery.task(ignore_result=True)
def register_client(domain: str, owner: str) -> None:
action = RegisterClient(
auth=get_auth(),
client_storage=get_client_storage(),
setup_mailbox=get_mailbox_setup(),
setup_mx_records=get_mx_setup(),
)

action(domain, owner)


@celery.task(ignore_result=True)
def index_received_email_for_mailbox(resource_id: str) -> None:
action = IndexReceivedEmailForMailbox(
Expand Down Expand Up @@ -85,6 +102,7 @@ def _fqn(task):

celery.conf.update(
task_routes={
_fqn(register_client): {'queue': REGISTER_CLIENT_QUEUE},
_fqn(index_received_email_for_mailbox): {'queue': MAILBOX_RECEIVED_QUEUE},
_fqn(index_sent_email_for_mailbox): {'queue': MAILBOX_SENT_QUEUE},
_fqn(inbound_store): {'queue': INBOUND_STORE_QUEUE},
Expand Down
15 changes: 9 additions & 6 deletions opwen_email_server/integration/connexion.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from opwen_email_server import config
from opwen_email_server.actions import CalculatePendingEmailsMetric
from opwen_email_server.actions import CreateClient
from opwen_email_server.actions import DeleteClient
from opwen_email_server.actions import DownloadClientEmails
from opwen_email_server.actions import GetClient
from opwen_email_server.actions import Ping
from opwen_email_server.actions import ReceiveInboundEmail
from opwen_email_server.actions import RegisterClient
from opwen_email_server.actions import UploadClientEmails
from opwen_email_server.integration.azure import get_auth
from opwen_email_server.integration.azure import get_client_storage
from opwen_email_server.integration.azure import get_email_storage
from opwen_email_server.integration.azure import get_mailbox_deletion
from opwen_email_server.integration.azure import get_mailbox_setup
from opwen_email_server.integration.azure import get_mx_deletion
from opwen_email_server.integration.azure import get_mx_setup
from opwen_email_server.integration.azure import get_pending_storage
from opwen_email_server.integration.azure import get_raw_email_storage
from opwen_email_server.integration.celery import inbound_store
from opwen_email_server.integration.celery import register_client
from opwen_email_server.integration.celery import written_store
from opwen_email_server.services.auth import AnyOfBasicAuth
from opwen_email_server.services.auth import BasicAuth
Expand All @@ -39,11 +39,14 @@
pending_factory=get_pending_storage,
)

client_register = RegisterClient(
client_create = CreateClient(
auth=get_auth(),
task=register_client.delay,
)

client_get = GetClient(
auth=get_auth(),
client_storage=get_client_storage(),
setup_mailbox=get_mailbox_setup(),
setup_mx_records=get_mx_setup(),
)

client_delete = DeleteClient(
Expand Down
9 changes: 1 addition & 8 deletions opwen_email_server/services/auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from functools import lru_cache
from json import JSONDecodeError
from typing import Callable
from typing import Dict
Expand All @@ -12,7 +11,6 @@

from opwen_email_server.constants import events
from opwen_email_server.constants import github
from opwen_email_server.constants.cache import AUTH_DOMAIN_CACHE_SIZE
from opwen_email_server.services.storage import AzureTextStorage
from opwen_email_server.utils.log import LogMixin
from opwen_email_server.utils.serialization import from_json
Expand Down Expand Up @@ -164,7 +162,6 @@ def is_owner(self, domain: str, username: str) -> bool:
def delete(self, client_id: str, domain: str) -> bool:
self._storage.delete(domain)
self._storage.delete(client_id)
self._domain_for_cached.cache_clear()
return True

def client_id_for(self, domain: str) -> Optional[str]:
Expand All @@ -184,14 +181,10 @@ def client_id_for(self, domain: str) -> Optional[str]:

def domain_for(self, client_id: str) -> Optional[str]:
try:
domain = self._domain_for_cached(client_id)
domain = self._storage.fetch_text(client_id)
except ObjectDoesNotExistError:
self.log_debug('Unrecognized client %s', client_id)
return None
else:
self.log_debug('Client %s has domain %s', client_id, domain)
return domain

@lru_cache(maxsize=AUTH_DOMAIN_CACHE_SIZE)
def _domain_for_cached(self, client_id: str) -> str:
return self._storage.fetch_text(client_id)
20 changes: 12 additions & 8 deletions opwen_email_server/services/dns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from cached_property import cached_property
from libcloud.common.types import LibcloudError
from libcloud.dns.base import DNSDriver
from libcloud.dns.base import Zone
from libcloud.dns.providers import get_driver
Expand Down Expand Up @@ -48,11 +49,14 @@ def _run(self, client_name: str, zone: Zone) -> None:

class SetupMxRecords(_MxRecords):
def _run(self, client_name: str, zone: Zone) -> None:
self._driver.create_record(
zone=zone,
name=client_name,
type=RecordType.MX,
data=MX_RECORD,
)

self.log_debug('Set up MX records for client %s.%s', client_name, zone.domain)
try:
self._driver.create_record(
zone=zone,
name=client_name,
type=RecordType.MX,
data=MX_RECORD,
)
except LibcloudError:
self.log_debug('MX records for client %s.%s already exist', client_name, zone.domain)
else:
self.log_debug('Set up MX records for client %s.%s', client_name, zone.domain)
60 changes: 44 additions & 16 deletions opwen_email_server/services/sendgrid.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from itertools import count
from mimetypes import guess_type
from time import sleep
from typing import Callable

from cached_property import cached_property
from python_http_client import BadRequestsError
from requests import delete as http_delete
from requests import get as http_get
from requests import post as http_post
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Attachment
Expand All @@ -16,7 +19,7 @@

from opwen_email_server.constants.sendgrid import INBOX_URL
from opwen_email_server.constants.sendgrid import MAILBOX_CREATE_URL
from opwen_email_server.constants.sendgrid import MAILBOX_DELETE_URL
from opwen_email_server.constants.sendgrid import MAILBOX_DETAIL_URL
from opwen_email_server.utils.log import LogMixin
from opwen_email_server.utils.serialization import to_base64

Expand Down Expand Up @@ -139,7 +142,7 @@ def _run(self, client_id: str, domain: str) -> None:
class DeleteSendgridMailbox(_SendgridManagement):
def _run(self, client_id: str, domain: str) -> None:
http_delete(
url=MAILBOX_DELETE_URL.format(domain),
url=MAILBOX_DETAIL_URL.format(domain),
headers={
'Authorization': f'Bearer {self._key}',
},
Expand All @@ -149,18 +152,43 @@ def _run(self, client_id: str, domain: str) -> None:


class SetupSendgridMailbox(_SendgridManagement):
def _run(self, client_id: str, domain: str) -> None:
http_post(
url=MAILBOX_CREATE_URL,
json={
'hostname': domain,
'url': INBOX_URL.format(client_id),
'spam_check': True,
'send_raw': True,
},
headers={
'Authorization': f'Bearer {self._key}',
},
).raise_for_status()
def __init__(self, key: str, max_retries: int = 10, retry_interval_seconds: float = 1):
super().__init__(key)
self._max_retries = max_retries
self._retry_interval_seconds = retry_interval_seconds

self.log_debug('Set up mailbox for %s', domain)
def _run(self, client_id: str, domain: str) -> None:
for retry in count():
get_response = http_get(
url=MAILBOX_DETAIL_URL.format(domain),
headers={
'Authorization': f'Bearer {self._key}',
},
)

if get_response.ok:
self.log_debug('Mailbox %s already exists', domain)
break

create_response = http_post(
url=MAILBOX_CREATE_URL,
json={
'hostname': domain,
'url': INBOX_URL.format(client_id),
'spam_check': True,
'send_raw': True,
},
headers={
'Authorization': f'Bearer {self._key}',
},
)

if create_response.ok:
self.log_debug('Set up mailbox for %s', domain)
break

if retry > self._max_retries:
self.log_debug('Too many attempts to set up mailbox for %s', domain)
create_response.raise_for_status()

sleep(self._retry_interval_seconds)
Loading

0 comments on commit 4be0c63

Please sign in to comment.