Skip to content

Commit

Permalink
Make inbound email processing idempotent (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
c-w authored May 27, 2019
1 parent 205d1f3 commit af5249c
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 36 deletions.
19 changes: 12 additions & 7 deletions opwen_email_server/actions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from abc import ABC
from hashlib import sha256
from typing import Callable
from typing import Iterable
from typing import Tuple
from typing import Union
from uuid import uuid4

from libcloud.storage.types import ObjectDoesNotExistError

from opwen_email_server.constants import events
from opwen_email_server.constants import sync
from opwen_email_server.services.auth import AzureAuth
Expand Down Expand Up @@ -78,7 +81,11 @@ def __init__(self,
self._email_parser = email_parser or self._parse_mime_email

def _action(self, resource_id): # type: ignore
mime_email = self._raw_email_storage.fetch_text(resource_id)
try:
mime_email = self._raw_email_storage.fetch_text(resource_id)
except ObjectDoesNotExistError:
self.log_warning('Inbound email %s does not exist', resource_id)
return 'skipped', 202

email = self._email_parser(mime_email)
self._store_inbound_email(resource_id, email)
Expand Down Expand Up @@ -150,21 +157,19 @@ class ReceiveInboundEmail(_Action):
def __init__(self,
auth: AzureAuth,
raw_email_storage: AzureTextStorage,
next_task: Callable[[str], None],
email_id_source: Callable[[], str] = None):
next_task: Callable[[str], None]):

self._auth = auth
self._raw_email_storage = raw_email_storage
self._next_task = next_task
self._email_id_source = email_id_source or self._new_email_id

def _action(self, client_id, email): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
return 'client is not registered', 403

email_id = self._email_id_source()
email_id = self._new_email_id(email)

self._raw_email_storage.store_text(email_id, email)

Expand All @@ -174,8 +179,8 @@ def _action(self, client_id, email): # type: ignore
return 'received', 200

@classmethod
def _new_email_id(cls) -> str:
return str(uuid4())
def _new_email_id(cls, email: str) -> str:
return sha256(email.encode('utf-8')).hexdigest()


class DownloadClientEmails(_Action):
Expand Down
10 changes: 7 additions & 3 deletions opwen_email_server/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ def ensure_exists(self):
self._client

def delete(self, resource_id: str):
resource = self._client.get_object(resource_id)
resource.delete()
self.log_debug('deleted %s', resource_id)
try:
resource = self._client.get_object(resource_id)
except ObjectDoesNotExistError:
self.log_warning('deleted missing %s', resource_id)
else:
resource.delete()
self.log_debug('deleted %s', resource_id)

def iter(self) -> Iterator[str]:
for resource in self._client.list_objects():
Expand Down
Binary file modified tests/files/end_to_end/client-emails.tar.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/files/end_to_end/inbound-email.mime
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ From: Clemens Wolff <clemens.wolff@gmail.com>
Date: Sun, 12 Feb 2017 22:25:01 -0800
Message-ID: <CAL79TcnjhV5PinZVY8Y3QEoNNcSa9uuNU5N3EP-gqcYPFfuHLA@mail.gmail.com>
Subject: Test email sent to Lokole client
To: clemens@developer.lokole.ca, laura@developer.lokole.ca
To: clemens@developer1.lokole.ca, laura@developer1.lokole.ca, nzola@developer2.lokole.ca
Content-Type: multipart/alternative; boundary=001a1146392641b94705486384bf

--001a1146392641b94705486384bf
Expand Down
12 changes: 10 additions & 2 deletions tests/integration/0-register-client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ mkdir -p "${out_dir}"
curl -fs \
-H "Content-Type: application/json" \
-u "admin:password" \
-d '{"domain":"developer.lokole.ca"}' \
-d '{"domain":"developer1.lokole.ca"}' \
"http://localhost:8080/api/email/register/" \
| tee "${out_dir}/register.json"
| tee "${out_dir}/register1.json"

# registering a client with bad credentials should fail
if curl -fs \
Expand All @@ -20,3 +20,11 @@ if curl -fs \
-d '{"domain":"hacker.lokole.ca"}' \
"http://localhost:8080/api/email/register/" \
; then echo "Was able to register a client with bad credentials" >&2; exit 4; fi

# also register another client to simulate multi-client emails
curl -fs \
-H "Content-Type: application/json" \
-u "admin:password" \
-d '{"domain":"developer2.lokole.ca"}' \
"http://localhost:8080/api/email/register/" \
| tee "${out_dir}/register2.json"
4 changes: 2 additions & 2 deletions tests/integration/1-client-uploads-emails.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ out_dir="$(dirname "$0")/../files/end_to_end/test.out"
mkdir -p "${out_dir}"

emails_to_send="${in_dir}/client-emails.tar.gz"
client_id="$(jq -r '.client_id' < "${out_dir}/register.json")"
resource_container="$(jq -r '.resource_container' < "${out_dir}/register.json")"
client_id="$(jq -r '.client_id' < "${out_dir}/register1.json")"
resource_container="$(jq -r '.resource_container' < "${out_dir}/register1.json")"
resource_id="$(python3 -c 'import uuid;print(str(uuid.uuid4()))').tar.gz"

# workflow 1: send emails written on the client to the world
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/2-receive-email-for-client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ mkdir -p "${out_dir}"

email_to_receive="${in_dir}/inbound-email.mime"

client_id="$(jq -r '.client_id' < "${out_dir}/register.json")"
client_id="$(jq -r '.client_id' < "${out_dir}/register1.json")"

# workflow 2a: receive an email directed at one of the clients
# this simulates sendgrid delivering an email to the service
curl -fs \
-H "Content-Type: multipart/form-data" \
-F "email=$(cat "${email_to_receive}")" \
"http://localhost:8080/api/email/sendgrid/${client_id}"

# simulate delivery of the same email to the second mailbox
curl -fs \
-H "Content-Type: multipart/form-data" \
-F "email=$(cat "${email_to_receive}")" \
"http://localhost:8080/api/email/sendgrid/${client_id}"
20 changes: 13 additions & 7 deletions tests/integration/3-client-downloads-emails.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ set -euo pipefail
out_dir="$(dirname "$0")/../files/end_to_end/test.out"
mkdir -p "${out_dir}"

client_id="$(jq -r '.client_id' < "${out_dir}/register.json")"
resource_container="$(jq -r '.resource_container' < "${out_dir}/register.json")"
for i in 1 2; do

client_id="$(jq -r '.client_id' < "${out_dir}/register${i}.json")"
resource_container="$(jq -r '.resource_container' < "${out_dir}/register${i}.json")"

# workflow 2b: deliver emails written by the world to a lokole client
# first the client makes a request to ask the server to package up all the
Expand All @@ -14,21 +16,25 @@ resource_container="$(jq -r '.resource_container' < "${out_dir}/register.json")"
curl -fs \
-H "Accept: application/json" \
"http://localhost:8080/api/email/download/${client_id}" \
| tee "${out_dir}/download.json"
| tee "${out_dir}/download${i}.json"

resource_id="$(jq -r '.resource_id' < "${out_dir}/download.json")"
resource_id="$(jq -r '.resource_id' < "${out_dir}/download${i}.json")"

# now we simulate the client downloading the package from the shared blob storage
curl -fs \
"http://localhost:10000/devstoreaccount1/${resource_container}/${resource_id}" \
> "${out_dir}/downloaded.tar.gz"
> "${out_dir}/downloaded${i}.tar.gz"

tar xzf "${out_dir}/downloaded.tar.gz" -C "${out_dir}"
tar xzf "${out_dir}/downloaded${i}.tar.gz" -C "${out_dir}"

num_emails_actual="$(wc -l "${out_dir}/emails.jsonl" | cut -d' ' -f1)"
num_emails_expected=1

if [[ "${num_emails_actual}" -ne "${num_emails_expected}" ]]; then
echo "Got ${num_emails_actual} but expected ${num_emails_expected}" >&2
echo "Got ${num_emails_actual} emails but expected ${num_emails_expected}" >&2
exit 1
fi

rm "${out_dir}/emails.jsonl"

done
8 changes: 0 additions & 8 deletions tests/integration/assert.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ set -euo pipefail
out_dir="$(dirname "$0")/../files/end_to_end/test.out"
mkdir -p "${out_dir}"

num_emails_actual="$(wc -l "${out_dir}/emails.jsonl" | cut -d' ' -f1)"
num_emails_expected=1

if [[ "${num_emails_actual}" -ne "${num_emails_expected}" ]]; then
echo "Got ${num_emails_actual} emails but expected ${num_emails_expected}" >&2
exit 1
fi

sql_query() {
docker-compose exec postgres psql -Aqt -c "$1" -U ascoderu -d telemetry \
| tr -d -C '0-9'
Expand Down
2 changes: 2 additions & 0 deletions tests/opwen_email_server/services/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def test_stores_fetches_and_deletes_file(self):
with self.assertRaises(ObjectDoesNotExistError):
self._storage.fetch_file(resource_id)

self._storage.delete(resource_id)

def assertFileContains(self, path: str, content: str):
with open(path, encoding='utf-8') as fobj:
self.assertEqual(fobj.read(), content)
Expand Down
48 changes: 43 additions & 5 deletions tests/opwen_email_server/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from unittest.mock import patch
from uuid import uuid4

from libcloud.storage.types import ObjectDoesNotExistError

from opwen_email_server import actions
from opwen_email_server.constants import sync
from opwen_email_server.services.storage import AccessInfo
Expand Down Expand Up @@ -85,6 +87,25 @@ def setUp(self):
self.pending_factory = MagicMock()
self.email_parser = MagicMock()

def test_202(self):
# noinspection PyUnusedLocal
def throw(*args, **kwargs):
raise ObjectDoesNotExistError(None, None, None)

resource_id = str(uuid4())

self.raw_email_storage.fetch_text.side_effect = throw

_, status = self._execute_action(resource_id)

self.assertEqual(status, 202)
self.raw_email_storage.fetch_text.assert_called_once_with(resource_id)
self.assertFalse(self.raw_email_storage.delete.called)
self.assertFalse(self.email_storage.store_object.called)
self.assertFalse(self.pending_factory.called)
self.assertFalse(self.pending_storage.store_text.called)
self.assertFalse(self.email_parser.called)

def test_200(self):
resource_id = str(uuid4())
domain = 'test.com'
Expand Down Expand Up @@ -175,27 +196,44 @@ def test_403(self):

def test_200(self):
client_id = str(uuid4())
email_id = str(uuid4())
email_id = 'dfbab492b9adcf20ca8424b993b0f7ec26731d069be4d451ebbf7910937a999c'
domain = 'test.com'
email = 'dummy-mime'

self.auth.domain_for.return_value = domain
self.email_id_source.return_value = email_id

_, status = self._execute_action(client_id, email)

self.assertEqual(status, 200)
self.auth.domain_for.assert_called_once_with(client_id)
self.email_id_source.assert_called_once_with()
self.raw_email_storage.store_text.assert_called_once_with(email_id, email)
self.next_task.assert_called_once_with(email_id)

def test_is_idempotent(self):
client_id = str(uuid4())
domain = 'test.com'
email = 'dummy-mime'
num_repeated_emails = 3

self.auth.domain_for.return_value = domain

for _ in range(num_repeated_emails):
_, status = self._execute_action(client_id, email)
self.assertEqual(status, 200)

self.assertHasSameCalls(self.raw_email_storage.store_text, num_repeated_emails)
self.assertHasSameCalls(self.next_task, num_repeated_emails)

def assertHasSameCalls(self, mocked_function, num_calls):
self.assertEqual(len(mocked_function.call_args_list), num_calls)
for call in mocked_function.call_args_list:
self.assertEqual(call, mocked_function.call_args_list[0])

def _execute_action(self, *args, **kwargs):
action = actions.ReceiveInboundEmail(
auth=self.auth,
raw_email_storage=self.raw_email_storage,
next_task=self.next_task,
email_id_source=self.email_id_source)
next_task=self.next_task)

return action(*args, **kwargs)

Expand Down

0 comments on commit af5249c

Please sign in to comment.