diff --git a/.env b/.env index 95bd9209..9f40f852 100644 --- a/.env +++ b/.env @@ -4,4 +4,4 @@ DOCKER_REPO=ascoderu USE_DEVTOOLS=True SERVER_WORKERS=1 QUEUE_WORKERS=1 -LOKOLE_LOG_LEVEL=DEBUG +LOKOLE_LOG_LEVEL=INFO diff --git a/opwen_email_server/actions.py b/opwen_email_server/actions.py index 5e04a06d..38b719ca 100644 --- a/opwen_email_server/actions.py +++ b/opwen_email_server/actions.py @@ -1,3 +1,4 @@ +from abc import ABC from typing import Callable from typing import Iterable from typing import Tuple @@ -21,12 +22,26 @@ Response = Union[dict, Tuple[str, int]] -class Ping(object): - def __call__(self) -> Response: +class _Action(ABC, LogMixin): + def __call__(self, *args, **kwargs) -> Response: + try: + return self._action(*args, **kwargs) + except Exception as ex: + self.log_exception(ex, 'error in action %s', + self.__class__.__name__) + raise ex + + def _action(self, *args, **kwargs) -> Response: + raise NotImplementedError + + +class Ping(_Action): + # noinspection PyMethodMayBeStatic + def _action(self): # type: ignore return 'OK', 200 -class SendOutboundEmails(LogMixin): +class SendOutboundEmails(_Action): def __init__(self, email_storage: AzureObjectStorage, send_email: SendSendgridEmail): @@ -34,7 +49,7 @@ def __init__(self, self._email_storage = email_storage self._send_email = send_email - def __call__(self, resource_id: str) -> Response: + def _action(self, resource_id): # type: ignore email = self._email_storage.fetch_object(resource_id) success = self._send_email(email) @@ -45,7 +60,7 @@ def __call__(self, resource_id: str) -> Response: return 'OK', 200 -class StoreInboundEmails(LogMixin): +class StoreInboundEmails(_Action): def __init__(self, raw_email_storage: AzureTextStorage, email_storage: AzureObjectStorage, @@ -57,7 +72,7 @@ def __init__(self, self._pending_factory = pending_factory self._email_parser = email_parser or self._parse_mime_email - def __call__(self, resource_id: str) -> Response: + def _action(self, resource_id): # type: ignore mime_email = self._raw_email_storage.fetch_text(resource_id) email = self._email_parser(mime_email) @@ -85,7 +100,7 @@ def _parse_mime_email(cls, mime_email: str) -> dict: return email -class StoreWrittenClientEmails(LogMixin): +class StoreWrittenClientEmails(_Action): def __init__(self, client_storage: AzureObjectsStorage, email_storage: AzureObjectStorage, @@ -95,7 +110,7 @@ def __init__(self, self._email_storage = email_storage self._next_task = next_task - def __call__(self, resource_id: str) -> Response: + def _action(self, resource_id): # type: ignore emails = self._client_storage.fetch_objects( resource_id, sync.EMAILS_FILE) @@ -116,7 +131,7 @@ def __call__(self, resource_id: str) -> Response: return 'OK', 200 -class ReceiveInboundEmail(LogMixin): +class ReceiveInboundEmail(_Action): def __init__(self, auth: AzureAuth, raw_email_storage: AzureTextStorage, @@ -128,7 +143,7 @@ def __init__(self, self._next_task = next_task self._email_id_source = email_id_source or self._new_email_id - def __call__(self, client_id: str, email: str) -> Response: + def _action(self, client_id, email): # type: ignore domain = self._auth.domain_for(client_id) if not domain: self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501 @@ -148,7 +163,7 @@ def _new_email_id(cls) -> str: return str(uuid4()) -class DownloadClientEmails(LogMixin): +class DownloadClientEmails(_Action): def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, @@ -160,7 +175,7 @@ def __init__(self, self._email_storage = email_storage self._pending_factory = pending_factory - def __call__(self, client_id: str, compression: str) -> Response: + def _action(self, client_id, compression): # type: ignore domain = self._auth.domain_for(client_id) if not domain: self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501 @@ -202,7 +217,7 @@ def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage, pending_storage.delete(email_id) -class UploadClientEmails(LogMixin): +class UploadClientEmails(_Action): def __init__(self, auth: AzureAuth, next_task: Callable[[str], None]): @@ -210,7 +225,7 @@ def __init__(self, self._auth = auth self._next_task = next_task - def __call__(self, client_id: str, upload_info: dict) -> Response: + def _action(self, client_id, upload_info): # type: ignore domain = self._auth.domain_for(client_id) if not domain: self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501 @@ -224,7 +239,7 @@ def __call__(self, client_id: str, upload_info: dict) -> Response: return 'uploaded', 200 -class RegisterClient(LogMixin): +class RegisterClient(_Action): def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage, @@ -238,7 +253,7 @@ def __init__(self, self._setup_mx_records = setup_mx_records self._client_id_source = client_id_source or self._new_client_id - def __call__(self, client: dict) -> Response: + def _action(self, client): # type: ignore domain = client['domain'] if self._auth.client_id_for(domain) is not None: return 'client already exists', 409 diff --git a/opwen_email_server/constants/logging.py b/opwen_email_server/constants/logging.py index bfdfbe4d..eb3a9505 100644 --- a/opwen_email_server/constants/logging.py +++ b/opwen_email_server/constants/logging.py @@ -2,6 +2,3 @@ STDERR = '%(asctime)s\t%(levelname)s\t%(message)s' # type: Final SEPARATOR = '|' # type: Final - -TELEMETRY_QUEUE_SECONDS = 30 # type: Final -TELEMETRY_QUEUE_ITEMS = 10 # type: Final diff --git a/opwen_email_server/services/sendgrid.py b/opwen_email_server/services/sendgrid.py index 1d735c11..3ced8e6b 100644 --- a/opwen_email_server/services/sendgrid.py +++ b/opwen_email_server/services/sendgrid.py @@ -47,14 +47,14 @@ def _send_email(self, email: Mail, email_id: str) -> bool: request = email.get() try: status = self._client(request) - except HTTPError as exception: - status = exception.code - self.log_exception('error sending email %s:%r:%r', - email_id, exception, request) - except URLError as exception: + except HTTPError as ex: + status = ex.code + self.log_exception(ex, 'error sending email %s:%r', + email_id, request) + except URLError as ex: status = -1 - self.log_exception('error sending email %s:%r:%r', - email_id, exception, request) + self.log_exception(ex, 'error sending email %s:%r', + email_id, request) else: self.log_debug('sent email %s', email_id) diff --git a/opwen_email_server/utils/collections.py b/opwen_email_server/utils/collections.py index 1a26bae1..24f1e417 100644 --- a/opwen_email_server/utils/collections.py +++ b/opwen_email_server/utils/collections.py @@ -24,3 +24,9 @@ def chunks(iterable: Iterable[T], chunk_size: int) -> Iterable[Iterable[T]]: def singleton(func: Callable) -> Callable: return lru_cache(maxsize=1)(func) + + +def append(iterable: Iterable[T], next_item: T) -> Iterable[T]: + for item in iterable: + yield item + yield next_item diff --git a/opwen_email_server/utils/log.py b/opwen_email_server/utils/log.py index 98c6bf74..2f3c4b1a 100644 --- a/opwen_email_server/utils/log.py +++ b/opwen_email_server/utils/log.py @@ -1,95 +1,111 @@ +from logging import CRITICAL +from logging import DEBUG from logging import Formatter from logging import Handler +from logging import INFO from logging import Logger from logging import StreamHandler +from logging import WARNING from logging import getLogger from typing import Any from typing import Iterable from typing import Optional from applicationinsights import TelemetryClient -from applicationinsights import exceptions +from applicationinsights.channel import AsynchronousQueue +from applicationinsights.channel import AsynchronousSender +from applicationinsights.channel import TelemetryChannel +from applicationinsights.channel import TelemetryContext +from applicationinsights.logging import LoggingHandler +from cached_property import cached_property from opwen_email_server.config import APPINSIGHTS_KEY from opwen_email_server.config import LOG_LEVEL from opwen_email_server.constants.logging import SEPARATOR from opwen_email_server.constants.logging import STDERR -from opwen_email_server.constants.logging import TELEMETRY_QUEUE_ITEMS -from opwen_email_server.constants.logging import TELEMETRY_QUEUE_SECONDS +from opwen_email_server.utils.collections import append from opwen_email_server.utils.collections import singleton @singleton -def _get_log_handlers() -> Iterable[Handler]: - stderr = StreamHandler() - stderr.setFormatter(Formatter(STDERR)) - return [stderr] +def _create_telemetry_channel() -> Optional[TelemetryChannel]: + if not APPINSIGHTS_KEY: + return None + sender = AsynchronousSender() + queue = AsynchronousQueue(sender) + context = TelemetryContext() + context.instrumentation_key = APPINSIGHTS_KEY + return TelemetryChannel(context, queue) -@singleton -def _get_logger() -> Logger: - log = getLogger() - for handler in _get_log_handlers(): - log.addHandler(handler) - log.setLevel(LOG_LEVEL) - return log +class LogMixin: + _telemetry_channel = _create_telemetry_channel() -@singleton -def _get_telemetry_client() -> Optional[TelemetryClient]: - if not APPINSIGHTS_KEY: - return None + @cached_property + def _default_log_handlers(self) -> Iterable[Handler]: + handlers = [] + + stderr = StreamHandler() + stderr.setFormatter(Formatter(STDERR)) + handlers.append(stderr) + + if APPINSIGHTS_KEY: + handlers.append(LoggingHandler( + APPINSIGHTS_KEY, + telemetry_channel=self._telemetry_channel)) - telemetry_client = TelemetryClient(APPINSIGHTS_KEY) - telemetry_client.channel.sender.send_interval_in_milliseconds = \ - TELEMETRY_QUEUE_SECONDS * 1000 - telemetry_client.channel.sender.max_queue_item_count = \ - TELEMETRY_QUEUE_ITEMS - exceptions.enable(APPINSIGHTS_KEY) + return handlers - return telemetry_client + @cached_property + def _logger(self) -> Logger: + log = getLogger() + for handler in self._default_log_handlers: + log.addHandler(handler) + log.setLevel(LOG_LEVEL) + return log + @cached_property + def _telemetry_client(self) -> Optional[TelemetryClient]: + if not APPINSIGHTS_KEY: + return None -class LogMixin(object): - _logger = _get_logger() - _telemetry_client = _get_telemetry_client() + return TelemetryClient(APPINSIGHTS_KEY, self._telemetry_channel) def log_debug(self, message: str, *args: Any): - self._log('debug', message, args) + self._log(DEBUG, message, args) def log_info(self, message: str, *args: Any): - self._log('info', message, args) + self._log(INFO, message, args) def log_warning(self, message: str, *args: Any): - self._log('warning', message, args) + self._log(WARNING, message, args) - def log_exception(self, message: str, *args: Any): - self._log('exception', message, args) + def log_exception(self, ex: Exception, message: str, *args: Any): + self._log(CRITICAL, message + ' (%r)', append(args, ex)) + + if self._telemetry_client: + # noinspection PyBroadException + try: + raise ex + except Exception: + self._telemetry_client.track_exception() + self._telemetry_channel.flush() + + def _log(self, level: int, log_message: str, log_args: Iterable[Any]): + if not self._logger.isEnabledFor(level): + return - def _log(self, level: str, log_message: str, log_args: Iterable[Any]): message_parts = ['%s'] args = [self.__class__.__name__] message_parts.append(log_message) args.extend(log_args) message = SEPARATOR.join(message_parts) - log = getattr(self._logger, level) - log(message, *args) + self._logger.log(level, message, *args) - if self._telemetry_client: - self._telemetry_client.track_trace( - message % tuple(args), {'level': level}) - - if self.should_send_message_immediately(level): - self._telemetry_client.flush() - - # noinspection PyMethodMayBeStatic def log_event(self, event_name: str, properties: Optional[dict] = None): - self._logger.info('%s%s%s', event_name, SEPARATOR, properties) + self.log_info('%s%s%s', event_name, SEPARATOR, properties) if self._telemetry_client: self._telemetry_client.track_event(event_name, properties) - self._telemetry_client.flush() - - # noinspection PyMethodMayBeStatic - def should_send_message_immediately(self, level: str) -> bool: - return level != 'debug' + self._telemetry_channel.flush() diff --git a/tests/opwen_email_server/test_actions.py b/tests/opwen_email_server/test_actions.py index 2f6214c0..f3f2e713 100644 --- a/tests/opwen_email_server/test_actions.py +++ b/tests/opwen_email_server/test_actions.py @@ -2,6 +2,7 @@ from unittest import TestCase from unittest.mock import MagicMock from unittest.mock import Mock +from unittest.mock import patch from uuid import uuid4 from opwen_email_server import actions @@ -9,6 +10,22 @@ from opwen_email_server.services.storage import AccessInfo +class ActionTests(TestCase): + @patch.object(actions._Action, '_telemetry_client') + @patch.object(actions._Action, '_telemetry_channel') + def test_logs_exception(self, mock_channel, mock_client): + class TestAction(actions._Action): + def _action(self): + int('not-a-number') + + with self.assertRaises(ValueError): + action = TestAction() + action() + + mock_client.track_exception.assert_called_once_with() + mock_channel.flush.assert_called_once_with() + + class PingTests(TestCase): def test_200(self): action = actions.Ping() diff --git a/tests/opwen_email_server/utils/test_collections.py b/tests/opwen_email_server/utils/test_collections.py index 33a839b7..31e463fa 100644 --- a/tests/opwen_email_server/utils/test_collections.py +++ b/tests/opwen_email_server/utils/test_collections.py @@ -57,3 +57,10 @@ def function1(self): def function2(self): self.call_counts['function2'] += 1 return 'some-other-value' + + +class AppendTests(TestCase): + def test_yields_item_after_items(self): + collection = collections.append([1, 2, 3], 4) + + self.assertSequenceEqual(list(collection), [1, 2, 3, 4]) diff --git a/tests/opwen_email_server/utils/test_log.py b/tests/opwen_email_server/utils/test_log.py deleted file mode 100644 index 90908de5..00000000 --- a/tests/opwen_email_server/utils/test_log.py +++ /dev/null @@ -1,59 +0,0 @@ -from unittest import TestCase -from unittest.mock import patch - -from opwen_email_server.utils.log import LogMixin - - -class LogMixinTests(TestCase): - def test_adds_class_name_to_output(self): - class Foo(LogMixin): - def foo(self): - self.log_info('message %d', 123) - Foo().foo() - - self.assertDidLog('info', '%s|message %d', 'Foo', 123) - - def test_important_messages_get_quickly_sent_by_appinsights(self): - class Foo(LogMixin): - def foo(self): - self.log_info('message %d', 123) - Foo().foo() - self.assertAppInsightsIsSent() - - def test_not_important_messages_get_delayed_by_appinsights(self): - class Foo(LogMixin): - def foo(self): - self.log_debug('message %d', 123) - Foo().foo() - self.assertAppInsightsIsSent(False) - - def assertDidLog(self, log_level, *log_args): - self.assertIsLogMessage(*log_args) - self.assertDidCallLogger(log_level, *log_args) - self.assertDidCallApplicationInsights(log_level, *log_args) - - def assertAppInsightsIsSent(self, is_sent=True): - mock = self.appinsights_mock.flush - self.assertEqual(bool(mock.call_count), is_sent) - - def assertDidCallApplicationInsights(self, log_level, fmt, *args): - mock = self.appinsights_mock.track_trace - mock.assert_called_once_with(fmt % args, {'level': log_level}) - - def assertDidCallLogger(self, log_level, *log_args): - mock = getattr(self.logger_mock, log_level) - mock.assert_called_once_with(*log_args) - - def assertIsLogMessage(self, fmt, *args): - try: - fmt % args - except TypeError: - self.fail('unable to format string: %s %% %r' % (fmt, args)) - - def setUp(self): - log_patcher = patch.object(LogMixin, '_logger') - appinsights_patcher = patch.object(LogMixin, '_telemetry_client') - self.logger_mock = log_patcher.start() - self.appinsights_mock = appinsights_patcher.start() - self.addCleanup(log_patcher.stop) - self.addCleanup(appinsights_patcher.stop)