-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: start run by using schedule from configuration (#249)
* feat: start run by using shcedule from configuration * fix: create run method call * feat: add scheduler tests * fix: params typo
- Loading branch information
Vitalii Melnychuk
authored
Mar 2, 2022
1 parent
8124c46
commit 2278873
Showing
8 changed files
with
262 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from maestro_api.db.models.run import Run | ||
from maestro_api.db.models.run_configuration import RunConfiguration | ||
from maestro_api.db.models.agent import Agent | ||
from maestro_api.db.models.run_agent import RunAgent | ||
|
||
|
||
class RunRepository: | ||
""" | ||
Repository to manage common opperation over run and related agents | ||
""" | ||
|
||
def create_run(self, run_configuration: RunConfiguration): | ||
hosts = [host.to_mongo() for host in run_configuration.hosts] | ||
load_profile = [ | ||
load_profile_step.to_mongo() | ||
for load_profile_step in run_configuration.load_profile | ||
] | ||
|
||
custom_properties = [ | ||
custom_property.to_mongo() | ||
for custom_property in run_configuration.custom_properties | ||
] | ||
|
||
new_run = Run( | ||
title=run_configuration.title, | ||
run_configuration_id=run_configuration.id, | ||
agent_ids=run_configuration.agent_ids, | ||
run_plan_id=run_configuration.run_plan_id, | ||
custom_data_ids=run_configuration.custom_data_ids, | ||
labels=run_configuration.labels, | ||
load_profile=load_profile, | ||
hosts=hosts, | ||
custom_properties=custom_properties, | ||
).save() | ||
|
||
agent_ids = run_configuration.agent_ids | ||
|
||
agents = Agent.objects(id__in=agent_ids) | ||
run_agents = [ | ||
RunAgent( | ||
run_id=new_run.id, agent_id=agent.id, agent_hostname=agent.hostname | ||
) | ||
for agent in agents | ||
] | ||
|
||
RunAgent.objects.insert(run_agents) | ||
|
||
return new_run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import atexit | ||
import math | ||
|
||
from maestro_api.db.models.run_configuration import RunConfiguration | ||
from maestro_api.db.repo.run import RunRepository | ||
from maestro_api.logging import Logger | ||
from maestro_api.libs.datetime import now, TZ_UTC | ||
from maestro_api.enums import DaysOfTheWeek | ||
|
||
|
||
def start_scheduled_run(run_repo: RunRepository): | ||
|
||
day_of_the_week = DaysOfTheWeek.list() | ||
now_time = now() | ||
now_minute = math.floor(now_time.minute / 5) * 5 | ||
now_minute_str = now_minute if now_minute >= 10 else f"0{now_minute}" | ||
time_to_run = f"{now_time.hour}:{now_minute_str}" | ||
day_to_run = day_of_the_week[now_time.weekday()] | ||
|
||
run_configurations = RunConfiguration.objects.filter( | ||
is_schedule_enabled=True, | ||
schedule__time=time_to_run, | ||
schedule__days__in=[day_to_run], | ||
) | ||
|
||
Logger.info(f"Search for scheduled runs. day={day_to_run}, time={time_to_run}") | ||
for run_configuration in run_configurations: | ||
if run_configuration.last_scheduled_at: | ||
last_scheduled_at = run_configuration.last_scheduled_at.astimezone(TZ_UTC) | ||
is_already_run = (now_time - last_scheduled_at).total_seconds() < 300 | ||
if is_already_run: | ||
continue | ||
|
||
run_repo.create_run(run_configuration) | ||
run_configuration.last_scheduled_at = now_time | ||
run_configuration.save() | ||
|
||
Logger.info( | ||
f"Started scheduled run. run_configuration_id={run_configuration.id}" | ||
) | ||
|
||
|
||
def register_shutdown_events(scheduler): | ||
# Shut down the scheduler when exiting the app | ||
atexit.register(lambda: scheduler.shutdown()) | ||
|
||
|
||
def init_scheduler(scheduler): | ||
run_repo = RunRepository() | ||
register_shutdown_events(scheduler) | ||
|
||
scheduler.add_job( | ||
func=lambda: start_scheduled_run(run_repo), | ||
trigger="interval", | ||
seconds=10, | ||
replace_existing=True, | ||
max_instances=1, | ||
) | ||
|
||
scheduler.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from pytz import UTC | ||
from apscheduler.schedulers.background import BlockingScheduler | ||
|
||
from maestro_api import create_console_app | ||
from maestro_api import settings | ||
from maestro_api.logging import Logger | ||
from maestro_api.scheduler import init_scheduler | ||
|
||
Logger.setup_logging() | ||
|
||
console_app = create_console_app(settings) | ||
|
||
scheduler = BlockingScheduler(timezone=UTC) | ||
|
||
init_scheduler(scheduler) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import pytest | ||
from freezegun import freeze_time | ||
from datetime import datetime | ||
|
||
from maestro_api.db.models.agent import Agent | ||
from maestro_api.db.models.run_plan import RunPlan | ||
from maestro_api.db.models.run import Run | ||
from maestro_api.db.models.run_configuration import RunConfiguration | ||
from maestro_api.db.repo.run import RunRepository | ||
from maestro_api.enums import DaysOfTheWeek | ||
|
||
from maestro_api.scheduler import start_scheduled_run | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"run_configurations,created_runs", | ||
[ | ||
( | ||
[ | ||
{ | ||
"is_schedule_enabled": True, | ||
"schedule": { | ||
"days": [DaysOfTheWeek.Mon.value, DaysOfTheWeek.Tue.value], | ||
"time": "10:00", | ||
}, | ||
}, | ||
{ | ||
"is_schedule_enabled": False, | ||
"schedule": { | ||
"days": [DaysOfTheWeek.Mon.value, DaysOfTheWeek.Tue.value], | ||
"time": "10:00", | ||
}, | ||
}, | ||
{ | ||
"is_schedule_enabled": True, | ||
"schedule": { | ||
"days": [DaysOfTheWeek.Wed.value], | ||
"time": "10:00", | ||
}, | ||
}, | ||
], | ||
1, | ||
), | ||
( | ||
[ | ||
{ | ||
"is_schedule_enabled": True, | ||
"schedule": { | ||
"days": [DaysOfTheWeek.Mon.value], | ||
"time": "10:00", | ||
}, | ||
"last_scheduled_at": datetime(2012, 1, 2, 10, 1, 0), | ||
}, | ||
{ | ||
"is_schedule_enabled": True, | ||
"schedule": { | ||
"days": [DaysOfTheWeek.Mon.value], | ||
"time": "10:00", | ||
}, | ||
"last_scheduled_at": datetime(2012, 1, 2, 10, 0, 0), | ||
}, | ||
], | ||
0, | ||
), | ||
], | ||
) | ||
@freeze_time("2012-01-02 10:02:00") | ||
def test_start_scheduled_run(app, run_configurations, created_runs): | ||
run_plan_id = "6076d1e3a216ff15b6e95e9d" | ||
agent_ids = ["6076d1e3a216ff15b6e95e8d"] | ||
RunPlan(id=run_plan_id, title="Example Plan").save() | ||
for agent_id in agent_ids: | ||
Agent(id=agent_id, hostname="host_%s" % agent_id, ip="test_ip").save() | ||
for run_configuration in run_configurations: | ||
RunConfiguration( | ||
title="Example test plan", | ||
run_plan_id=run_plan_id, | ||
agent_ids=agent_ids, | ||
**run_configuration | ||
).save() | ||
|
||
run_repo = RunRepository() | ||
|
||
start_scheduled_run(run_repo) | ||
|
||
runs_count = len(Run.objects()) | ||
|
||
assert created_runs == runs_count | ||
|
||
|
||
@freeze_time("2012-01-02 10:02:00") | ||
def test_start_scheduled_run_second_time(app): | ||
run_plan_id = "6076d1e3a216ff15b6e95e9d" | ||
agent_ids = ["6076d1e3a216ff15b6e95e8d"] | ||
RunPlan(id=run_plan_id, title="Example Plan").save() | ||
for agent_id in agent_ids: | ||
Agent(id=agent_id, hostname="host_%s" % agent_id, ip="test_ip").save() | ||
|
||
RunConfiguration( | ||
title="Example test plan", | ||
run_plan_id=run_plan_id, | ||
agent_ids=agent_ids, | ||
is_schedule_enabled=True, | ||
schedule=dict( | ||
days=[DaysOfTheWeek.Mon.value], | ||
time="10:00", | ||
), | ||
).save() | ||
|
||
run_repo = RunRepository() | ||
|
||
start_scheduled_run(run_repo) | ||
start_scheduled_run(run_repo) | ||
|
||
run_configurations = RunConfiguration.objects() | ||
runs_count = len(Run.objects()) | ||
|
||
assert 1 == runs_count | ||
assert "2012-01-02 10:02:00" == run_configurations[0].last_scheduled_at.strftime( | ||
"%Y-%m-%d %H:%M:%S" | ||
) |