diff --git a/web/api/maestro_api/__init__.py b/web/api/maestro_api/__init__.py index 0053d702..b0a7276b 100644 --- a/web/api/maestro_api/__init__.py +++ b/web/api/maestro_api/__init__.py @@ -36,3 +36,13 @@ def create_frontend_app(settings): init_auth_routes(flask_app) return flask_app + + +def create_console_app(settings): + flask_app = Flask(__name__) + + flask_app.config.from_object(settings) + + init_db(flask_app) + + return flask_app diff --git a/web/api/maestro_api/controllers/run.py b/web/api/maestro_api/controllers/run.py index 2a89ae7f..439a26ac 100644 --- a/web/api/maestro_api/controllers/run.py +++ b/web/api/maestro_api/controllers/run.py @@ -1,7 +1,7 @@ from mongoengine import Q from maestro_api.db.models.run import Run -from maestro_api.db.models.agent import Agent +from maestro_api.db.repo.run import RunRepository from maestro_api.db.models.run_agent import RunAgent from maestro_api.db.models.run_metric import RunMetric from maestro_api.db.models.run_configuration import RunConfiguration @@ -16,8 +16,9 @@ class RunController: - def __init__(self, flask_app): + def __init__(self, flask_app=None): self.flask_app = flask_app + self.run_repo = RunRepository() def delete_one(self, run_id, user): "Delete Run and related metrics by ID" @@ -69,41 +70,8 @@ def create_one(self, data, user): run_configuration_id = data.get("run_configuration_id") - configuration = get_obj_or_404(RunConfiguration, id=run_configuration_id) - hosts = [host.to_mongo() for host in configuration.hosts] - load_profile = [ - load_profile_step.to_mongo() - for load_profile_step in configuration.load_profile - ] - - custom_properties = [ - custom_property.to_mongo() - for custom_property in configuration.custom_properties - ] - - new_run = Run( - title=configuration.title, - run_configuration_id=configuration.id, - agent_ids=configuration.agent_ids, - run_plan_id=configuration.run_plan_id, - custom_data_ids=configuration.custom_data_ids, - labels=configuration.labels, - load_profile=load_profile, - hosts=hosts, - custom_properties=custom_properties, - ).save() - - agent_ids = configuration.agent_ids - - agents = Agent.objects(id__in=agent_ids) - run_agents = [ - RunAgent( - run_id=new_run.id, agent_id=agent.id, agent_hostname=agent.hostname - ) - for agent in agents - ] - - RunAgent.objects.insert(run_agents) + run_configuration = get_obj_or_404(RunConfiguration, id=run_configuration_id) + new_run = self.run_repo.create_run(run_configuration) return make_json_response(new_run.to_json()) diff --git a/web/api/maestro_api/db/models/run_configuration.py b/web/api/maestro_api/db/models/run_configuration.py index dca82728..5a1f3c60 100644 --- a/web/api/maestro_api/db/models/run_configuration.py +++ b/web/api/maestro_api/db/models/run_configuration.py @@ -1,6 +1,7 @@ import mongoengine_goodjson as gj from mongoengine import ( BooleanField, + DateTimeField, IntField, StringField, ObjectIdField, @@ -52,3 +53,5 @@ class RunConfiguration(CreatedUpdatedDocumentMixin, gj.Document): is_schedule_enabled = BooleanField(default=False) schedule = EmbeddedDocumentField(RunConfigurationSchedule) + + last_scheduled_at = DateTimeField() diff --git a/web/api/maestro_api/db/repo/run.py b/web/api/maestro_api/db/repo/run.py new file mode 100644 index 00000000..92a0ecbb --- /dev/null +++ b/web/api/maestro_api/db/repo/run.py @@ -0,0 +1,48 @@ +from maestro_api.db.models.run import Run +from maestro_api.db.models.run_configuration import RunConfiguration +from maestro_api.db.models.agent import Agent +from maestro_api.db.models.run_agent import RunAgent + + +class RunRepository: + """ + Repository to manage common opperation over run and related agents + """ + + def create_run(self, run_configuration: RunConfiguration): + hosts = [host.to_mongo() for host in run_configuration.hosts] + load_profile = [ + load_profile_step.to_mongo() + for load_profile_step in run_configuration.load_profile + ] + + custom_properties = [ + custom_property.to_mongo() + for custom_property in run_configuration.custom_properties + ] + + new_run = Run( + title=run_configuration.title, + run_configuration_id=run_configuration.id, + agent_ids=run_configuration.agent_ids, + run_plan_id=run_configuration.run_plan_id, + custom_data_ids=run_configuration.custom_data_ids, + labels=run_configuration.labels, + load_profile=load_profile, + hosts=hosts, + custom_properties=custom_properties, + ).save() + + agent_ids = run_configuration.agent_ids + + agents = Agent.objects(id__in=agent_ids) + run_agents = [ + RunAgent( + run_id=new_run.id, agent_id=agent.id, agent_hostname=agent.hostname + ) + for agent in agents + ] + + RunAgent.objects.insert(run_agents) + + return new_run diff --git a/web/api/maestro_api/scheduler.py b/web/api/maestro_api/scheduler.py new file mode 100644 index 00000000..424fce49 --- /dev/null +++ b/web/api/maestro_api/scheduler.py @@ -0,0 +1,60 @@ +import atexit +import math + +from maestro_api.db.models.run_configuration import RunConfiguration +from maestro_api.db.repo.run import RunRepository +from maestro_api.logging import Logger +from maestro_api.libs.datetime import now, TZ_UTC +from maestro_api.enums import DaysOfTheWeek + + +def start_scheduled_run(run_repo: RunRepository): + + day_of_the_week = DaysOfTheWeek.list() + now_time = now() + now_minute = math.floor(now_time.minute / 5) * 5 + now_minute_str = now_minute if now_minute >= 10 else f"0{now_minute}" + time_to_run = f"{now_time.hour}:{now_minute_str}" + day_to_run = day_of_the_week[now_time.weekday()] + + run_configurations = RunConfiguration.objects.filter( + is_schedule_enabled=True, + schedule__time=time_to_run, + schedule__days__in=[day_to_run], + ) + + Logger.info(f"Search for scheduled runs. day={day_to_run}, time={time_to_run}") + for run_configuration in run_configurations: + if run_configuration.last_scheduled_at: + last_scheduled_at = run_configuration.last_scheduled_at.astimezone(TZ_UTC) + is_already_run = (now_time - last_scheduled_at).total_seconds() < 300 + if is_already_run: + continue + + run_repo.create_run(run_configuration) + run_configuration.last_scheduled_at = now_time + run_configuration.save() + + Logger.info( + f"Started scheduled run. run_configuration_id={run_configuration.id}" + ) + + +def register_shutdown_events(scheduler): + # Shut down the scheduler when exiting the app + atexit.register(lambda: scheduler.shutdown()) + + +def init_scheduler(scheduler): + run_repo = RunRepository() + register_shutdown_events(scheduler) + + scheduler.add_job( + func=lambda: start_scheduled_run(run_repo), + trigger="interval", + seconds=10, + replace_existing=True, + max_instances=1, + ) + + scheduler.start() diff --git a/web/api/maestro_api/settings.py b/web/api/maestro_api/settings.py index 6e0ad395..ccd9666e 100644 --- a/web/api/maestro_api/settings.py +++ b/web/api/maestro_api/settings.py @@ -39,8 +39,6 @@ def parse_list(str_value): SWAGGER_ENABLED = parse_bool(os.environ.get("SWAGGER_ENABLED", "True")) -SCHEDULER_ENABLED = parse_bool(os.environ.get("SCHEDULER_ENABLED", "True")) - JMETER_BASE_IMAGE = "jmeter" SECRET_KEY = os.environ.get("SECRET_KEY", "SECRETKEY") diff --git a/web/api/scheduler.py b/web/api/scheduler.py new file mode 100644 index 00000000..6214a80e --- /dev/null +++ b/web/api/scheduler.py @@ -0,0 +1,15 @@ +from pytz import UTC +from apscheduler.schedulers.background import BlockingScheduler + +from maestro_api import create_console_app +from maestro_api import settings +from maestro_api.logging import Logger +from maestro_api.scheduler import init_scheduler + +Logger.setup_logging() + +console_app = create_console_app(settings) + +scheduler = BlockingScheduler(timezone=UTC) + +init_scheduler(scheduler) diff --git a/web/api/tests/test_scheduler.py b/web/api/tests/test_scheduler.py new file mode 100644 index 00000000..767239fe --- /dev/null +++ b/web/api/tests/test_scheduler.py @@ -0,0 +1,121 @@ +import pytest +from freezegun import freeze_time +from datetime import datetime + +from maestro_api.db.models.agent import Agent +from maestro_api.db.models.run_plan import RunPlan +from maestro_api.db.models.run import Run +from maestro_api.db.models.run_configuration import RunConfiguration +from maestro_api.db.repo.run import RunRepository +from maestro_api.enums import DaysOfTheWeek + +from maestro_api.scheduler import start_scheduled_run + + +@pytest.mark.parametrize( + "run_configurations,created_runs", + [ + ( + [ + { + "is_schedule_enabled": True, + "schedule": { + "days": [DaysOfTheWeek.Mon.value, DaysOfTheWeek.Tue.value], + "time": "10:00", + }, + }, + { + "is_schedule_enabled": False, + "schedule": { + "days": [DaysOfTheWeek.Mon.value, DaysOfTheWeek.Tue.value], + "time": "10:00", + }, + }, + { + "is_schedule_enabled": True, + "schedule": { + "days": [DaysOfTheWeek.Wed.value], + "time": "10:00", + }, + }, + ], + 1, + ), + ( + [ + { + "is_schedule_enabled": True, + "schedule": { + "days": [DaysOfTheWeek.Mon.value], + "time": "10:00", + }, + "last_scheduled_at": datetime(2012, 1, 2, 10, 1, 0), + }, + { + "is_schedule_enabled": True, + "schedule": { + "days": [DaysOfTheWeek.Mon.value], + "time": "10:00", + }, + "last_scheduled_at": datetime(2012, 1, 2, 10, 0, 0), + }, + ], + 0, + ), + ], +) +@freeze_time("2012-01-02 10:02:00") +def test_start_scheduled_run(app, run_configurations, created_runs): + run_plan_id = "6076d1e3a216ff15b6e95e9d" + agent_ids = ["6076d1e3a216ff15b6e95e8d"] + RunPlan(id=run_plan_id, title="Example Plan").save() + for agent_id in agent_ids: + Agent(id=agent_id, hostname="host_%s" % agent_id, ip="test_ip").save() + for run_configuration in run_configurations: + RunConfiguration( + title="Example test plan", + run_plan_id=run_plan_id, + agent_ids=agent_ids, + **run_configuration + ).save() + + run_repo = RunRepository() + + start_scheduled_run(run_repo) + + runs_count = len(Run.objects()) + + assert created_runs == runs_count + + +@freeze_time("2012-01-02 10:02:00") +def test_start_scheduled_run_second_time(app): + run_plan_id = "6076d1e3a216ff15b6e95e9d" + agent_ids = ["6076d1e3a216ff15b6e95e8d"] + RunPlan(id=run_plan_id, title="Example Plan").save() + for agent_id in agent_ids: + Agent(id=agent_id, hostname="host_%s" % agent_id, ip="test_ip").save() + + RunConfiguration( + title="Example test plan", + run_plan_id=run_plan_id, + agent_ids=agent_ids, + is_schedule_enabled=True, + schedule=dict( + days=[DaysOfTheWeek.Mon.value], + time="10:00", + ), + ).save() + + run_repo = RunRepository() + + start_scheduled_run(run_repo) + start_scheduled_run(run_repo) + + run_configurations = RunConfiguration.objects() + runs_count = len(Run.objects()) + + assert 1 == runs_count + assert "2012-01-02 10:02:00" == run_configurations[0].last_scheduled_at.strftime( + "%Y-%m-%d %H:%M:%S" + )