From e9d718ced54ef18dc8a23f7e6118c69687cd6c2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Diemer?= Date: Wed, 10 May 2023 16:06:04 +0200 Subject: [PATCH] Allow to mark commands as in progress in long polling | refs #37780 --- README.md | 21 +++++++- examples/fake_mediacoder.py | 81 ---------------------------- examples/recorder_controller.py | 93 +++++++++++++++++++++++++-------- examples/screen_controller.py | 19 +++---- examples/wol_relay.py | 28 ++++++---- mm_client/client.py | 30 +++++++---- mm_client/lib/long_polling.py | 29 ++++++---- mm_client/lib/signing.py | 16 +++--- tests/test_client.py | 68 +++++++++++++++++++----- tests/test_signature.py | 17 +++--- 10 files changed, 221 insertions(+), 181 deletions(-) delete mode 100644 examples/fake_mediacoder.py diff --git a/README.md b/README.md index 620db78..558aafd 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ If you are using the first version of this client (commit `33b554991303b573254d5 * Replace all occurences of `CHECK_SSL` by `VERIFY_SSL` in all configuration. -## Example +## Examples ### Ping the server @@ -58,7 +58,24 @@ response = mmc.api_request('PING') print(response) ``` -There are more examples in the `examples` directory. + +### Recorder system + +This example is the use case of a recorder system that can be controlled through the long polling. + +[Link to the file](/examples/recorder_controller.py) + + +### Wake on LAN requests + +This example is the use case of a client that forwards wake on LAN requests received through the long polling to its network. + +[Link to the file](/examples/wol_relay.py) + + +### Other examples + +There are more examples in the [examples](/examples) directory. ## Actions diff --git a/examples/fake_mediacoder.py b/examples/fake_mediacoder.py deleted file mode 100644 index 1c348dd..0000000 --- a/examples/fake_mediacoder.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python3 -''' -Fake MediaCoder client for tests. -''' -import json -import logging -import os -import sys -import time -from mm_client.client import MirisManagerClient - -logger = logging.getLogger('fake_mediacoder') - - -class FakeMediaCoder(MirisManagerClient): - DEFAULT_CONF = { - 'CAPABILITIES': ['record', 'network_record', 'web_control', 'screenshot'], - } - PROFILES = { - 'main': { - 'has_password': False, - 'can_live': False, - 'name': 'main', - 'label': 'Main', - 'type': 'recorder' - } - } - - def handle_action(self, action, params): - if action == 'START_RECORDING': - logger.info('Starting recording with params %s', params) - self.set_status( - status='initializing', - status_message='', - remaining_space='auto' - ) - time.sleep(3) - self.set_status( - status='running', - status_message='', - status_info='{"playlist": "/videos/BigBuckBunny_320x180.m3u8"}', - remaining_space='auto' - ) - - elif action == 'STOP_RECORDING': - logger.info('Stopping recording.') - self.set_status( - status='ready', - status_message='', - remaining_space='auto' - ) - - elif action == 'LIST_PROFILES': - logger.info('Returning list of profiles.') - return json.dumps(self.PROFILES) - - elif action == 'GET_SCREENSHOT': - self.set_status(remaining_space='auto') # Send remaining space to Miris Manager - self.set_screenshot( - path='/var/lib/AccountsService/icons/%s' % (os.environ.get('USER') or 'root'), - file_name='screen.png' - ) - logger.info('Screenshot sent.') - - else: - raise NotImplementedError('Unsupported action: %s.' % action) - - -if __name__ == '__main__': - local_conf = sys.argv[1] if len(sys.argv) > 1 else None - client = FakeMediaCoder(local_conf) - client.update_capabilities() - client.set_status( - status='ready', - status_message='Ready to record', - remaining_space='auto' - ) - try: - client.long_polling_loop() - except KeyboardInterrupt: - logger.info('KeyboardInterrupt received, stopping application.') diff --git a/examples/recorder_controller.py b/examples/recorder_controller.py index 668f641..9fef6a2 100644 --- a/examples/recorder_controller.py +++ b/examples/recorder_controller.py @@ -1,11 +1,12 @@ #!/usr/bin/env python3 ''' -An example of Miris Manager client usage. -This script is intended to control a recorder. +Fake MediaCoder client for tests. ''' import json import logging +import os import sys +import time from mm_client.client import MirisManagerClient logger = logging.getLogger('recorder_controller') @@ -13,41 +14,87 @@ class RecorderController(MirisManagerClient): DEFAULT_CONF = { - 'CAPABILITIES': ['record', 'shutdown'], + 'CAPABILITIES': ['record', 'network_record', 'web_control', 'screenshot'], + } + PROFILES = { + 'main': { + 'has_password': False, + 'can_live': False, + 'name': 'main', + 'label': 'Main', + 'type': 'recorder' + } } - PROFILES = {'main': {'has_password': False, 'can_live': False, 'name': 'main', 'label': 'Main', 'type': 'recorder'}} - def handle_action(self, action, params): - if action == 'SHUTDOWN': - logger.info('Shutdown requested.') - # TODO + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) - elif action == 'REBOOT': - logger.info('Reboot requested.') - # TODO + self.update_capabilities() + self.set_status( + status='ready', + status_message='Ready to record', + remaining_space='auto' + ) + try: + self.long_polling_loop() + except KeyboardInterrupt: + logger.info('KeyboardInterrupt received, stopping application.') - elif action == 'START_RECORDING': + def handle_action(self, uid, action, params): + # See help on the handle action function: + # https://github.com/UbiCastTeam/miris-manager-client/blob/main/mm_client/client.py#L184 + # Possible actions: + # https://mirismanager.ubicast.eu/static/skyreach/docs/api/values.html#system-command-actions + if action == 'START_RECORDING': logger.info('Starting recording with params %s', params) - self.set_status(status='recording', remaining_space='auto') - # TODO + self.set_status( + status='initializing', + status_message='', + remaining_space='auto' + ) + time.sleep(3) + self.set_status( + status='running', + status_message='', + status_info='{"playlist": "/videos/BigBuckBunny_320x180.m3u8"}', + remaining_space='auto' + ) + return 'DONE', '' elif action == 'STOP_RECORDING': logger.info('Stopping recording.') - self.set_status(status='recorder_idle', remaining_space='auto') - # TODO + self.set_status( + status='ready', + status_message='', + remaining_space='auto' + ) + return 'DONE', '' elif action == 'LIST_PROFILES': logger.info('Returning list of profiles.') - return json.dumps(self.PROFILES) + return 'DONE', json.dumps(self.PROFILES) + + elif action == 'GET_SCREENSHOT': + self.set_status(remaining_space='auto') # Send remaining space to Miris Manager + self.set_screenshot( + path='/var/lib/AccountsService/icons/%s' % (os.environ.get('USER') or 'root'), + file_name='screen.png' + ) + logger.info('Screenshot sent.') + return 'DONE', '' + + elif action == 'UPGRADE': + logger.info('Starting upgrade.') + + # Start your asynchronous upgrade process here then call: + # self.set_command_status(uid, 'DONE', '') + + return 'IN_PROGRESS', '' else: - raise Exception('Unsupported action: %s.' % action) + raise NotImplementedError('Unsupported action: %s.' % action) if __name__ == '__main__': local_conf = sys.argv[1] if len(sys.argv) > 1 else None - client = RecorderController(local_conf) - try: - client.long_polling_loop() - except KeyboardInterrupt: - logger.info('KeyboardInterrupt received, stopping application.') + RecorderController(local_conf) diff --git a/examples/screen_controller.py b/examples/screen_controller.py index ee58923..6a7f29e 100644 --- a/examples/screen_controller.py +++ b/examples/screen_controller.py @@ -16,33 +16,26 @@ class ScreenController(MirisManagerClient): 'CAPABILITIES': ['screen_control', 'screenshot'], } - def handle_action(self, action, params): - if action == 'SHUTDOWN': - logger.info('Shutdown requested.') - # TODO - - elif action == 'REBOOT': - logger.info('Reboot requested.') - # TODO - - elif action == 'GET_SCREENSHOT': + def handle_action(self, uid, action, params): + if action == 'GET_SCREENSHOT': self.set_status(remaining_space='auto') # Send remaining space to Miris Manager self.set_screenshot( path='/var/lib/AccountsService/icons/%s' % (os.environ.get('USER') or 'root'), file_name='screen.png' ) logger.info('Screenshot sent.') + return 'DONE', '' elif action == 'SIMULATE_CLICK': logger.info('Click requested: %s.', params) - # TODO + return 'DONE', '' elif action == 'SEND_TEXT': logger.info('Text received: %s.', params) - # TODO + return 'DONE', '' else: - raise Exception('Unsupported action: %s.' % action) + raise NotImplementedError('Unsupported action: %s.' % action) if __name__ == '__main__': diff --git a/examples/wol_relay.py b/examples/wol_relay.py index d843161..d2e5126 100644 --- a/examples/wol_relay.py +++ b/examples/wol_relay.py @@ -18,16 +18,29 @@ class WOLRelay(MirisManagerClient): 'WOL_PATH': 'wakeonlan', # Path to the wake on lan binary, installed with `apt install wakeonlan` } - def handle_action(self, action, params): - # This method must be implemented in your client + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.update_capabilities() + try: + self.long_polling_loop() + except KeyboardInterrupt: + logger.info('KeyboardInterrupt received, stopping application.') + + def handle_action(self, uid, action, params): + # See help on the handle action function: + # https://github.com/UbiCastTeam/miris-manager-client/blob/main/mm_client/client.py#L184 + # Possible actions: + # https://mirismanager.ubicast.eu/static/skyreach/docs/api/values.html#system-command-actions if action == 'WAKE_ON_LAN': # wol_relay capability # Send wake on lan success, message = self.send_wake_on_lan(params) logger.info('Running wake on lan: success: %s, message: %s', success, message) if not success: - raise Exception('Failed to send wake on lan: %s' % message) + raise RuntimeError('Failed to send wake on lan: %s' % message) + return 'DONE', '' else: - raise Exception('Unsupported action: %s.' % action) + raise NotImplementedError('Unsupported action: %s.' % action) def send_wake_on_lan(self, params): # Check that arguments are valid @@ -56,9 +69,4 @@ def send_wake_on_lan(self, params): if __name__ == '__main__': local_conf = sys.argv[1] if len(sys.argv) > 1 else None - client = WOLRelay(local_conf) - client.update_capabilities() - try: - client.long_polling_loop() - except KeyboardInterrupt: - logger.info('KeyboardInterrupt received, stopping application.') + WOLRelay(local_conf) diff --git a/mm_client/client.py b/mm_client/client.py index b7ee6e4..8840512 100644 --- a/mm_client/client.py +++ b/mm_client/client.py @@ -159,7 +159,7 @@ def api_request(self, url_or_action, method='get', headers=None, params=None, # headers with "_" are ignored by Django _headers = {'api-key': self.conf['API_KEY']} if not anonymous: - signature = signing_lib.get_signature(self) + signature = signing_lib.get_signature(self.conf) if signature: _headers.update(signature) if headers: @@ -176,17 +176,27 @@ def api_request(self, url_or_action, method='get', headers=None, params=None, ) return response - def long_polling_loop(self): + def long_polling_loop(self, single_loop=False): if not self._long_polling_manager: self._long_polling_manager = long_polling_lib.LongPollingManager(self) - self._long_polling_manager.loop() - - def handle_action(self, action, params): - # Function that should be implemented in your client to process the - # long polling responses. - # IMPORTANT: Any code written here should not be blocking more than 5s - # because of the delay after which the system is considered as offline - # in Miris Manager. + self._long_polling_manager.loop(single_loop) + + def handle_action(self, uid, action, params): + ''' + Function that should be implemented in your client to process the long polling responses. + IMPORTANT: Any code written here should not be blocking more than 5s because of the + delay after which the system is considered as offline in Miris Manager. + Arguments: + - uid: The system command unique identifier. + - action: The action to run. + - params: The action parameters. + Must return a tuple: (status, data) + - status: The system command status (string). Possible values: + - "DONE": The command has been executed successfully. + - "IN_PROGRESS": The command has been started but is not yet completed. + - "FAILED": The command execution has failed. + - data: The command result data (string). It can be a json dump or a message. Empty strings are allowed. + ''' raise NotImplementedError('Your class should override the "handle_action" method.') def set_command_status(self, command_uid, status='DONE', data=None): diff --git a/mm_client/lib/long_polling.py b/mm_client/lib/long_polling.py index fc3e4e3..150d230 100644 --- a/mm_client/lib/long_polling.py +++ b/mm_client/lib/long_polling.py @@ -23,7 +23,7 @@ def __init__(self, client): self.last_error = None self.loop_running = False - def loop(self): + def loop(self, single_loop=False): # Check if systemd-notify should be called self.run_systemd_notify = self.client.conf.get('WATCHDOG') and os.system('which systemd-notify') == 0 # Start connection loop @@ -41,6 +41,8 @@ def exit_handler(*args, **kwargs): while self.loop_running: start = datetime.datetime.utcnow() success = self.call_long_polling() + if single_loop: + break if not success: # Avoid starting too often new connections duration = (datetime.datetime.utcnow() - start).seconds @@ -67,16 +69,16 @@ def call_long_polling(self): success = True uid = response.get('uid') try: - result = self.process_long_polling(response) + status, data = self.process_long_polling(response) except Exception as e: success = False logger.error('Failed to process response: %s\n%s', e, traceback.format_exc()) self.client.set_command_status(uid, 'FAILED', str(e)) if os.environ.get('CI_PIPELINE_ID'): - # propagate exception so that it can be detected in CI + # Propagate exception so that it can be detected in CI raise else: - self.client.set_command_status(uid, 'DONE', result) + self.client.set_command_status(uid, status, data) finally: if self.run_systemd_notify: logger.debug('Notifying systemd watchdog.') @@ -86,15 +88,22 @@ def call_long_polling(self): def process_long_polling(self, response): logger.debug('Processing response.') if self.client.conf.get('API_KEY'): - invalid = check_signature(self.client, response) + invalid = check_signature(self.client.conf, response) if invalid: raise ValueError('Invalid signature: %s' % invalid) + uid = response.get('uid') action = response.get('action') if not action: raise ValueError('No action received.') - params = response.get('params', dict()) - logger.debug('Received command "%s": %s.', response.get('uid'), action) + params = response.get('params', {}) + logger.debug('Received command "%s": %s.', uid, action) if action == 'PING': - pass - else: - return self.client.handle_action(action, params) + return 'DONE', '' + status, data = self.client.handle_action(uid=uid, action=action, params=params) + if status not in ('DONE', 'IN_PROGRESS', 'FAILED'): + logger.error('Your client has returned an invalid status in "handle_action".') + raise ValueError('An error occurred during the processing of the action by the client.') + if data is not None and not isinstance(data, str): + logger.error('Your client has returned an invalid type for data in "handle_action".') + raise ValueError('An error occurred during the processing of the action by the client.') + return status, data diff --git a/mm_client/lib/signing.py b/mm_client/lib/signing.py index d9f2ed7..8a62b27 100644 --- a/mm_client/lib/signing.py +++ b/mm_client/lib/signing.py @@ -11,13 +11,13 @@ logger = logging.getLogger('mm_client.lib.signing') -def get_signature(client): - if not client.conf.get('SECRET_KEY') or not client.conf.get('API_KEY'): +def get_signature(conf): + if not conf.get('SECRET_KEY') or not conf.get('API_KEY'): return {} utime = datetime.datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S_%f') - to_sign = 'time=%s|api_key=%s' % (utime, client.conf['API_KEY']) + to_sign = 'time=%s|api_key=%s' % (utime, conf['API_KEY']) hm = hmac.new( - client.conf['SECRET_KEY'].encode('utf-8'), + conf['SECRET_KEY'].encode('utf-8'), msg=to_sign.encode('utf-8'), digestmod=hashlib.sha256 ).digest() @@ -25,8 +25,8 @@ def get_signature(client): return {'time': utime, 'hmac': hm} -def check_signature(client, rdata): - if not client.conf.get('SECRET_KEY') or not client.conf.get('API_KEY'): +def check_signature(conf, rdata): + if not conf.get('SECRET_KEY') or not conf.get('API_KEY'): return None remote_time = rdata.get('time') remote_hmac = rdata.get('hmac') @@ -44,9 +44,9 @@ def check_signature(client, rdata): diff = utcnow - rdate if utcnow > rdate else rdate - utcnow if diff.seconds > 300: return 'the difference between the request time and the current time is too large.' - to_sign = 'time=%s|api_key=%s' % (remote_time, client.conf['API_KEY']) + to_sign = 'time=%s|api_key=%s' % (remote_time, conf['API_KEY']) hm = hmac.new( - client.conf['SECRET_KEY'].encode('utf-8'), + conf['SECRET_KEY'].encode('utf-8'), msg=to_sign.encode('utf-8'), digestmod=hashlib.sha256 ).digest() diff --git a/tests/test_client.py b/tests/test_client.py index da0f135..66bd3ca 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,28 +2,44 @@ from unittest.mock import patch CONFIG = { - 'SERVER_URL': 'https://mmctest' + 'SERVER_URL': 'https://mmctest', + 'SECRET_KEY': 'the secret key', + 'API_KEY': 'test API key', } -def mocked_requests_get(*args, **kwargs): - class MockResponse: - def __init__(self, json_data, status_code): - self.text = json.dumps(json_data) - self.json_data = json_data - self.status_code = status_code +class MockResponse: + def __init__(self, json_data, status_code): + self.text = json.dumps(json_data) + self.json_data = json_data + self.status_code = status_code - def json(self): - return self.json_data + def json(self): + return self.json_data - if kwargs['url'] == CONFIG['SERVER_URL'] + '/api/': - return MockResponse({'version': '8.0.0'}, 200) +def mocked_request(*args, **kwargs): + url = kwargs['url'] + if url == CONFIG['SERVER_URL'] + '/api/': + return MockResponse({'version': '8.0.0'}, 200) + if url == CONFIG['SERVER_URL'] + '/api/v3/fleet/control/set-command-status/': + return MockResponse({}, 200) + if url == CONFIG['SERVER_URL'] + '/remote-event/v3': + from mm_client.lib.signing import get_signature + data = get_signature(CONFIG) + data.update({ + 'uid': 'test_uid', + 'action': 'START_RECORDING', + 'params': {'channel': 'Chan'}, + }) + return MockResponse(data, 200) + print(f'Non mocked URL: {url}') return MockResponse(None, 404) -@patch('requests.get', side_effect=mocked_requests_get) -def test_client(mock_get): +@patch('requests.post', side_effect=mocked_request) +@patch('requests.get', side_effect=mocked_request) +def test_client(mock_get, mock_post): from mm_client.client import MirisManagerClient mmc = MirisManagerClient(local_conf=CONFIG) response = mmc.api_request('PING') @@ -31,3 +47,29 @@ def test_client(mock_get): assert response['version'] == '8.0.0' assert len(mock_get.call_args_list) == 1 + assert len(mock_post.call_args_list) == 0 + + +@patch('requests.post', side_effect=mocked_request) +@patch('requests.get', side_effect=mocked_request) +def test_long_polling(mock_get, mock_post): + from mm_client.client import MirisManagerClient + + commands = [] + + class LongPollingClient(MirisManagerClient): + DEFAULT_CONF = CONFIG + + def handle_action(self, uid, action, params): + commands.append((uid, action, params)) + return 'DONE', '' + + mmc = LongPollingClient(local_conf=CONFIG) + mmc.long_polling_loop(single_loop=True) + + assert len(mock_get.call_args_list) == 1 + assert len(mock_post.call_args_list) == 1 + call = mock_post.call_args_list[0] + assert call.kwargs['data'] == {'uid': 'test_uid', 'status': 'DONE', 'data': ''} + + assert commands == [('test_uid', 'START_RECORDING', {'channel': 'Chan'})] diff --git a/tests/test_signature.py b/tests/test_signature.py index 8d1610f..1a52ee8 100644 --- a/tests/test_signature.py +++ b/tests/test_signature.py @@ -4,31 +4,28 @@ def test_signature__default(): - from mm_client.client import MirisManagerClient from mm_client.lib.signing import get_signature, check_signature - client = MirisManagerClient() + conf = {} - signature = get_signature(client) + signature = get_signature(conf) assert signature == {} - assert check_signature(client, {}) is None + assert check_signature(conf, {}) is None def test_signature__configured(): - from mm_client.client import MirisManagerClient from mm_client.lib.signing import get_signature, check_signature conf = { 'SECRET_KEY': 'the secret key', 'API_KEY': 'the API key', } - client = MirisManagerClient(conf) - signature = get_signature(client) + signature = get_signature(conf) assert sorted(signature.keys()) == ['hmac', 'time'] - assert check_signature(client, signature) is None + assert check_signature(conf, signature) is None @pytest.mark.parametrize('signature, expected', [ @@ -50,13 +47,11 @@ def test_signature__configured(): id='hmac'), ]) def test_signature__invalid(signature, expected): - from mm_client.client import MirisManagerClient from mm_client.lib.signing import check_signature conf = { 'SECRET_KEY': 'the secret key', 'API_KEY': 'the API key', } - client = MirisManagerClient(conf) - assert check_signature(client, signature) == expected + assert check_signature(conf, signature) == expected