Skip to content

Commit

Permalink
Improve AppInsights logging (#115)
Browse files Browse the repository at this point in the history
* Switch to official Flask AppInsights middleware

* Add action exception handler

* Reduce default log level

* Short-circuit formatting when logger is disabled

* Remove unnecessary list

* Switch to async AppInsights channel

* Ensure all telemetry is always flushed

* Remove unnecessary supertype
  • Loading branch information
c-w authored Jan 2, 2019
1 parent 5706e01 commit 855f9dd
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 136 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ DOCKER_REPO=ascoderu
USE_DEVTOOLS=True
SERVER_WORKERS=1
QUEUE_WORKERS=1
LOKOLE_LOG_LEVEL=DEBUG
LOKOLE_LOG_LEVEL=INFO
47 changes: 31 additions & 16 deletions opwen_email_server/actions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import ABC
from typing import Callable
from typing import Iterable
from typing import Tuple
Expand All @@ -21,20 +22,34 @@
Response = Union[dict, Tuple[str, int]]


class Ping(object):
def __call__(self) -> Response:
class _Action(ABC, LogMixin):
def __call__(self, *args, **kwargs) -> Response:
try:
return self._action(*args, **kwargs)
except Exception as ex:
self.log_exception(ex, 'error in action %s',
self.__class__.__name__)
raise ex

def _action(self, *args, **kwargs) -> Response:
raise NotImplementedError


class Ping(_Action):
# noinspection PyMethodMayBeStatic
def _action(self): # type: ignore
return 'OK', 200


class SendOutboundEmails(LogMixin):
class SendOutboundEmails(_Action):
def __init__(self,
email_storage: AzureObjectStorage,
send_email: SendSendgridEmail):

self._email_storage = email_storage
self._send_email = send_email

def __call__(self, resource_id: str) -> Response:
def _action(self, resource_id): # type: ignore
email = self._email_storage.fetch_object(resource_id)

success = self._send_email(email)
Expand All @@ -45,7 +60,7 @@ def __call__(self, resource_id: str) -> Response:
return 'OK', 200


class StoreInboundEmails(LogMixin):
class StoreInboundEmails(_Action):
def __init__(self,
raw_email_storage: AzureTextStorage,
email_storage: AzureObjectStorage,
Expand All @@ -57,7 +72,7 @@ def __init__(self,
self._pending_factory = pending_factory
self._email_parser = email_parser or self._parse_mime_email

def __call__(self, resource_id: str) -> Response:
def _action(self, resource_id): # type: ignore
mime_email = self._raw_email_storage.fetch_text(resource_id)

email = self._email_parser(mime_email)
Expand Down Expand Up @@ -85,7 +100,7 @@ def _parse_mime_email(cls, mime_email: str) -> dict:
return email


class StoreWrittenClientEmails(LogMixin):
class StoreWrittenClientEmails(_Action):
def __init__(self,
client_storage: AzureObjectsStorage,
email_storage: AzureObjectStorage,
Expand All @@ -95,7 +110,7 @@ def __init__(self,
self._email_storage = email_storage
self._next_task = next_task

def __call__(self, resource_id: str) -> Response:
def _action(self, resource_id): # type: ignore
emails = self._client_storage.fetch_objects(
resource_id, sync.EMAILS_FILE)

Expand All @@ -116,7 +131,7 @@ def __call__(self, resource_id: str) -> Response:
return 'OK', 200


class ReceiveInboundEmail(LogMixin):
class ReceiveInboundEmail(_Action):
def __init__(self,
auth: AzureAuth,
raw_email_storage: AzureTextStorage,
Expand All @@ -128,7 +143,7 @@ def __init__(self,
self._next_task = next_task
self._email_id_source = email_id_source or self._new_email_id

def __call__(self, client_id: str, email: str) -> Response:
def _action(self, client_id, email): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
Expand All @@ -148,7 +163,7 @@ def _new_email_id(cls) -> str:
return str(uuid4())


class DownloadClientEmails(LogMixin):
class DownloadClientEmails(_Action):
def __init__(self,
auth: AzureAuth,
client_storage: AzureObjectsStorage,
Expand All @@ -160,7 +175,7 @@ def __init__(self,
self._email_storage = email_storage
self._pending_factory = pending_factory

def __call__(self, client_id: str, compression: str) -> Response:
def _action(self, client_id, compression): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
Expand Down Expand Up @@ -202,15 +217,15 @@ def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage,
pending_storage.delete(email_id)


class UploadClientEmails(LogMixin):
class UploadClientEmails(_Action):
def __init__(self,
auth: AzureAuth,
next_task: Callable[[str], None]):

self._auth = auth
self._next_task = next_task

def __call__(self, client_id: str, upload_info: dict) -> Response:
def _action(self, client_id, upload_info): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
Expand All @@ -224,7 +239,7 @@ def __call__(self, client_id: str, upload_info: dict) -> Response:
return 'uploaded', 200


class RegisterClient(LogMixin):
class RegisterClient(_Action):
def __init__(self,
auth: AzureAuth,
client_storage: AzureObjectsStorage,
Expand All @@ -238,7 +253,7 @@ def __init__(self,
self._setup_mx_records = setup_mx_records
self._client_id_source = client_id_source or self._new_client_id

def __call__(self, client: dict) -> Response:
def _action(self, client): # type: ignore
domain = client['domain']
if self._auth.client_id_for(domain) is not None:
return 'client already exists', 409
Expand Down
3 changes: 0 additions & 3 deletions opwen_email_server/constants/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@

STDERR = '%(asctime)s\t%(levelname)s\t%(message)s' # type: Final
SEPARATOR = '|' # type: Final

TELEMETRY_QUEUE_SECONDS = 30 # type: Final
TELEMETRY_QUEUE_ITEMS = 10 # type: Final
14 changes: 7 additions & 7 deletions opwen_email_server/services/sendgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ def _send_email(self, email: Mail, email_id: str) -> bool:
request = email.get()
try:
status = self._client(request)
except HTTPError as exception:
status = exception.code
self.log_exception('error sending email %s:%r:%r',
email_id, exception, request)
except URLError as exception:
except HTTPError as ex:
status = ex.code
self.log_exception(ex, 'error sending email %s:%r',
email_id, request)
except URLError as ex:
status = -1
self.log_exception('error sending email %s:%r:%r',
email_id, exception, request)
self.log_exception(ex, 'error sending email %s:%r',
email_id, request)
else:
self.log_debug('sent email %s', email_id)

Expand Down
6 changes: 6 additions & 0 deletions opwen_email_server/utils/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def chunks(iterable: Iterable[T], chunk_size: int) -> Iterable[Iterable[T]]:

def singleton(func: Callable) -> Callable:
return lru_cache(maxsize=1)(func)


def append(iterable: Iterable[T], next_item: T) -> Iterable[T]:
for item in iterable:
yield item
yield next_item
116 changes: 66 additions & 50 deletions opwen_email_server/utils/log.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,111 @@
from logging import CRITICAL
from logging import DEBUG
from logging import Formatter
from logging import Handler
from logging import INFO
from logging import Logger
from logging import StreamHandler
from logging import WARNING
from logging import getLogger
from typing import Any
from typing import Iterable
from typing import Optional

from applicationinsights import TelemetryClient
from applicationinsights import exceptions
from applicationinsights.channel import AsynchronousQueue
from applicationinsights.channel import AsynchronousSender
from applicationinsights.channel import TelemetryChannel
from applicationinsights.channel import TelemetryContext
from applicationinsights.logging import LoggingHandler
from cached_property import cached_property

from opwen_email_server.config import APPINSIGHTS_KEY
from opwen_email_server.config import LOG_LEVEL
from opwen_email_server.constants.logging import SEPARATOR
from opwen_email_server.constants.logging import STDERR
from opwen_email_server.constants.logging import TELEMETRY_QUEUE_ITEMS
from opwen_email_server.constants.logging import TELEMETRY_QUEUE_SECONDS
from opwen_email_server.utils.collections import append
from opwen_email_server.utils.collections import singleton


@singleton
def _get_log_handlers() -> Iterable[Handler]:
stderr = StreamHandler()
stderr.setFormatter(Formatter(STDERR))
return [stderr]
def _create_telemetry_channel() -> Optional[TelemetryChannel]:
if not APPINSIGHTS_KEY:
return None

sender = AsynchronousSender()
queue = AsynchronousQueue(sender)
context = TelemetryContext()
context.instrumentation_key = APPINSIGHTS_KEY
return TelemetryChannel(context, queue)

@singleton
def _get_logger() -> Logger:
log = getLogger()
for handler in _get_log_handlers():
log.addHandler(handler)
log.setLevel(LOG_LEVEL)
return log

class LogMixin:
_telemetry_channel = _create_telemetry_channel()

@singleton
def _get_telemetry_client() -> Optional[TelemetryClient]:
if not APPINSIGHTS_KEY:
return None
@cached_property
def _default_log_handlers(self) -> Iterable[Handler]:
handlers = []

stderr = StreamHandler()
stderr.setFormatter(Formatter(STDERR))
handlers.append(stderr)

if APPINSIGHTS_KEY:
handlers.append(LoggingHandler(
APPINSIGHTS_KEY,
telemetry_channel=self._telemetry_channel))

telemetry_client = TelemetryClient(APPINSIGHTS_KEY)
telemetry_client.channel.sender.send_interval_in_milliseconds = \
TELEMETRY_QUEUE_SECONDS * 1000
telemetry_client.channel.sender.max_queue_item_count = \
TELEMETRY_QUEUE_ITEMS
exceptions.enable(APPINSIGHTS_KEY)
return handlers

return telemetry_client
@cached_property
def _logger(self) -> Logger:
log = getLogger()
for handler in self._default_log_handlers:
log.addHandler(handler)
log.setLevel(LOG_LEVEL)
return log

@cached_property
def _telemetry_client(self) -> Optional[TelemetryClient]:
if not APPINSIGHTS_KEY:
return None

class LogMixin(object):
_logger = _get_logger()
_telemetry_client = _get_telemetry_client()
return TelemetryClient(APPINSIGHTS_KEY, self._telemetry_channel)

def log_debug(self, message: str, *args: Any):
self._log('debug', message, args)
self._log(DEBUG, message, args)

def log_info(self, message: str, *args: Any):
self._log('info', message, args)
self._log(INFO, message, args)

def log_warning(self, message: str, *args: Any):
self._log('warning', message, args)
self._log(WARNING, message, args)

def log_exception(self, message: str, *args: Any):
self._log('exception', message, args)
def log_exception(self, ex: Exception, message: str, *args: Any):
self._log(CRITICAL, message + ' (%r)', append(args, ex))

if self._telemetry_client:
# noinspection PyBroadException
try:
raise ex
except Exception:
self._telemetry_client.track_exception()
self._telemetry_channel.flush()

def _log(self, level: int, log_message: str, log_args: Iterable[Any]):
if not self._logger.isEnabledFor(level):
return

def _log(self, level: str, log_message: str, log_args: Iterable[Any]):
message_parts = ['%s']
args = [self.__class__.__name__]
message_parts.append(log_message)
args.extend(log_args)
message = SEPARATOR.join(message_parts)
log = getattr(self._logger, level)
log(message, *args)
self._logger.log(level, message, *args)

if self._telemetry_client:
self._telemetry_client.track_trace(
message % tuple(args), {'level': level})

if self.should_send_message_immediately(level):
self._telemetry_client.flush()

# noinspection PyMethodMayBeStatic
def log_event(self, event_name: str, properties: Optional[dict] = None):
self._logger.info('%s%s%s', event_name, SEPARATOR, properties)
self.log_info('%s%s%s', event_name, SEPARATOR, properties)

if self._telemetry_client:
self._telemetry_client.track_event(event_name, properties)
self._telemetry_client.flush()

# noinspection PyMethodMayBeStatic
def should_send_message_immediately(self, level: str) -> bool:
return level != 'debug'
self._telemetry_channel.flush()
17 changes: 17 additions & 0 deletions tests/opwen_email_server/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@
from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import Mock
from unittest.mock import patch
from uuid import uuid4

from opwen_email_server import actions
from opwen_email_server.constants import sync
from opwen_email_server.services.storage import AccessInfo


class ActionTests(TestCase):
@patch.object(actions._Action, '_telemetry_client')
@patch.object(actions._Action, '_telemetry_channel')
def test_logs_exception(self, mock_channel, mock_client):
class TestAction(actions._Action):
def _action(self):
int('not-a-number')

with self.assertRaises(ValueError):
action = TestAction()
action()

mock_client.track_exception.assert_called_once_with()
mock_channel.flush.assert_called_once_with()


class PingTests(TestCase):
def test_200(self):
action = actions.Ping()
Expand Down
Loading

0 comments on commit 855f9dd

Please sign in to comment.