Skip to content

Commit

Permalink
Merge pull request #707 from NCATSTranslator/permisive_timeout
Browse files Browse the repository at this point in the history
applied permissive timeout for pathfinder queries
  • Loading branch information
ShervinAbd92 authored Nov 13, 2024
2 parents 69a2b8d + 0d38a5e commit 0fd2091
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
15 changes: 12 additions & 3 deletions tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
from django.urls import path, re_path, include, reverse
from django.utils import timezone
from tr_ars import utils
from tr_ars import tasks
from utils2 import urlRemoteFromInforesid
from .models import Agent, Message, Channel, Actor
import json, sys, logging
import traceback
from inspect import currentframe, getframeinfo
from tr_ars import status_report
from datetime import datetime, timedelta
#from tr_ars.tasks import send_message
#from tr_ars.tasks import catch_timeout_async
import ast
from tr_smartapi_client.smart_api_discover import ConfigFile
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
Expand Down Expand Up @@ -96,6 +97,13 @@ def submit(req):
try:
logger.debug('++ submit: %s' % req.body)
data = json.loads(req.body)
# derive query_type
node_count=len(data['message']['query_graph']['nodes'].keys())
edge_count=len(data['message']['query_graph']['edges'].keys())
if node_count==3 and edge_count==2:
params = {"query_type":"pathfinder"}
if node_count==2 and edge_count==1:
params = {"query_type":"standard"}
# if 'message' not in data:
# return HttpResponse('Not a valid Translator query json', status=400)
# create a head message
Expand All @@ -110,15 +118,15 @@ def submit(req):
if(isinstance(wf,list)):
if(len(wf)>0):
message = Message.create(code=202, status='Running', data=data,
actor=get_workflow_actor())
actor=get_workflow_actor(), params=params),
logger.debug("Sending message to workflow runner")#TO-DO CHANGE
# message.save()
# send_message(get_workflow_actor().to_dict(),message.to_dict())
# return HttpResponse(json.dumps(data, indent=2),
# content_type='application/json', status=201)
else:
message = Message.create(code=202, status='Running', data=data,
actor=get_default_actor())
actor=get_default_actor(), params=params)

if 'name' in data:
message.name = data['name']
Expand Down Expand Up @@ -812,6 +820,7 @@ def timeoutTest(req,time=300):
#utils.validate(message)
pass
else:
#tasks.catch_timeout_async()
pass
#utils.remove_blocked()
def block(req,key):
Expand Down
18 changes: 18 additions & 0 deletions tr_sys/tr_ars/migrations/0012_message_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.23 on 2024-11-12 19:16

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('tr_ars', '0011_alter_message_data'),
]

operations = [
migrations.AddField(
model_name='message',
name='params',
field=models.JSONField(null=True),
),
]
1 change: 1 addition & 0 deletions tr_sys/tr_ars/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Message(ARSModel):
merged_version = models.ForeignKey('self', related_name="version_merged",null=True, blank=True,
on_delete=models.CASCADE)
merged_versions_list= models.JSONField('Ordered list of merged_version PKs', null=True)
params = models.JSONField(null=True)


def __str__(self):
Expand Down
20 changes: 15 additions & 5 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def send_message(actor_dict, mesg_dict, timeout=300):
}
mesg = Message.create(actor=Actor.objects.get(pk=actor_dict['pk']),
name=mesg_dict['fields']['name'], status='R',
ref=get_object_or_404(Message.objects.filter(pk=mesg_dict['pk'])))
ref=get_object_or_404(Message.objects.filter(pk=mesg_dict['pk'])),
params=mesg_dict['fields']['params'])

if mesg.status == 'R':
mesg.code = 202
Expand Down Expand Up @@ -236,15 +237,17 @@ def catch_timeout_async():
time_threshold = now - timezone.timedelta(minutes=10)
max_time = now-timezone.timedelta(minutes=5)
max_time_merged=now-timezone.timedelta(minutes=8)
max_time_pathfinder = now-timezone.timedelta(minutes=10)

messages = Message.objects.filter(timestamp__gt=time_threshold, status__in='R').values_list('actor','id','timestamp','updated_at')
messages = Message.objects.filter(timestamp__gt=time_threshold, status__in='R').values_list('actor','id','timestamp','updated_at','params')
for mesg in messages:
mpk=mesg[0]
id = mesg[1]
actor = Agent.objects.get(pk=mpk)
timestamp=mesg[2]
query_type=mesg[4]['query_type']

logging.info(f'actor: {actor} id: {mesg[1]} timestamp: {mesg[2]} updated_at {mesg[3]}')
logging.info(f'actor: {actor} id: {mesg[1]} timestamp: {mesg[2]} updated_at {mesg[3]} query_type {query_type}')

#exempting parents from timing out
if actor.name == 'ars-default-agent':
Expand All @@ -260,8 +263,15 @@ def catch_timeout_async():
else:
continue
else:
if timestamp < max_time:
logging.info(f'for actor: {actor.name}, and pk {str(id)} the status is still "Running" after 5 min, setting code to 598')
if query_type == 'standard' and timestamp < max_time:
logging.info(f'for actor: {actor.name}, and pk {str(id)} of query type: {query_type},the status is still "Running" after 5 min, setting code to 598')
message = get_object_or_404(Message.objects.filter(pk=mesg[1]))
message.code = 598
message.status = 'E'
message.save(update_fields=['status','code'])

elif query_type == 'pathfinder' and timestamp < max_time_pathfinder:
logging.info(f'for actor: {actor.name}, and pk {str(id)} of query type: {query_type},the status is still "Running" after 10 min, setting code to 598')
message = get_object_or_404(Message.objects.filter(pk=mesg[1]))
message.code = 598
message.status = 'E'
Expand Down

0 comments on commit 0fd2091

Please sign in to comment.