diff --git a/config/tr_sys_settings.py b/config/tr_sys_settings.py index 2393ad15..ed518323 100755 --- a/config/tr_sys_settings.py +++ b/config/tr_sys_settings.py @@ -11,6 +11,9 @@ """ import os +from tr_sys.tr_sys.otel_config import configure_opentelemetry + +configure_opentelemetry() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) diff --git a/deploy/templates/deployment.yaml b/deploy/templates/deployment.yaml index 3bfbd2aa..9a3c7f6f 100644 --- a/deploy/templates/deployment.yaml +++ b/deploy/templates/deployment.yaml @@ -39,6 +39,7 @@ spec: value: {{ .Values.arsserver.env.TR_NORMALIZER }} - name: TR_ANNOTATOR value: {{ .Values.arsserver.env.TR_ANNOTATOR }} + {{- with .Values.arsserver.resources }} resources: {{- toYaml . | nindent 12 }} @@ -59,6 +60,7 @@ spec: value: {{ .Values.celeryworker.env.TR_NORMALIZER }} - name: TR_ANNOTATOR value: {{ .Values.celeryworker.env.TR_ANNOTATOR }} + {{- with .Values.celeryworker.resources }} resources: {{- toYaml . | nindent 12 }} diff --git a/requirements.txt b/requirements.txt index 14214491..1dc3a251 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,11 @@ statistics sympy objsize reasoner-pydantic +opentelemetry-api +opentelemetry-sdk +opentelemetry-instrumentation +opentelemetry-instrumentation-django +opentelemetry-exporter-jaeger +opentelemetry-instrumentation-requests +opentelemetry-instrumentation-celery + diff --git a/tr_sys/tr_ars/api.py b/tr_sys/tr_ars/api.py index f7a4b4de..2be28a24 100644 --- a/tr_sys/tr_ars/api.py +++ b/tr_sys/tr_ars/api.py @@ -15,10 +15,12 @@ #from tr_ars.tasks import send_message import ast from tr_smartapi_client.smart_api_discover import ConfigFile - - +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry import trace +from opentelemetry.propagate import extract +from opentelemetry.context import attach, detach #from reasoner_validator import validate_Message, ValidationError, validate_Query - +tracer = trace.get_tracer(__name__) logger = logging.getLogger(__name__) def index(req): @@ -84,52 +86,55 @@ def submit(req): logger.debug("submit") """Query submission""" logger.debug("entering submit") - if req.method != 'POST': - return HttpResponse('Only POST is permitted!', status=405) - - try: - logger.debug('++ submit: %s' % req.body) - data = json.loads(req.body) - # if 'message' not in data: - # return HttpResponse('Not a valid Translator query json', status=400) - # create a head message - # try: - # validate_Query(data) - # except ValidationError as ve: - # logger.debug("Warning! Input query failed TRAPI validation "+str(data)) - # logger.debug(ve) - # return HttpResponse('Input query failed TRAPI validation',status=400) - if("workflow" in data): - wf = data["workflow"] - if(isinstance(wf,list)): - if(len(wf)>0): - message = Message.create(code=202, status='Running', data=data, - actor=get_workflow_actor()) - logger.debug("Sending message to workflow runner")#TO-DO CHANGE - # message.save() - # send_message(get_workflow_actor().to_dict(),message.to_dict()) - # return HttpResponse(json.dumps(data, indent=2), - # content_type='application/json', status=201) - else: - message = Message.create(code=202, status='Running', data=data, - actor=get_default_actor()) - - if 'name' in data: - message.name = data['name'] - # save and broadcast + with tracer.start_as_current_span("submit") as span: + span.set_attribute("http.method", req.method) + span.set_attribute("http.url", req.build_absolute_uri()) + span.set_attribute("user.id", req.user.id if req.user.is_authenticated else "anonymous") - message.save() - data = message.to_dict() - return HttpResponse(json.dumps(data, indent=2), - content_type='application/json', status=201) - except Exception as e: - logger.error("Unexpected error 10: {}".format(traceback.format_exception(type(e), e, e.__traceback__))) - logging.info(e, exc_info=True) - logging.info('error message %s' % str(e)) - logging.info(e.__cause__) - logging.error(type(e).__name__) - logging.error(e.args) - return HttpResponse('failing due to %s with the message %s' % (e.__cause__, str(e)), status=400) + if req.method != 'POST': + return HttpResponse('Only POST is permitted!', status=405) + try: + logger.debug('++ submit: %s' % req.body) + data = json.loads(req.body) + # if 'message' not in data: + # return HttpResponse('Not a valid Translator query json', status=400) + # create a head message + # try: + # validate_Query(data) + # except ValidationError as ve: + # logger.debug("Warning! Input query failed TRAPI validation "+str(data)) + # logger.debug(ve) + # return HttpResponse('Input query failed TRAPI validation',status=400) + if("workflow" in data): + wf = data["workflow"] + if(isinstance(wf,list)): + if(len(wf)>0): + message = Message.create(code=202, status='Running', data=data, + actor=get_workflow_actor()) + logger.debug("Sending message to workflow runner")#TO-DO CHANGE + # message.save() + # send_message(get_workflow_actor().to_dict(),message.to_dict()) + # return HttpResponse(json.dumps(data, indent=2), + # content_type='application/json', status=201) + else: + message = Message.create(code=202, status='Running', data=data, + actor=get_default_actor()) + + if 'name' in data: + message.name = data['name'] + # save and broadcast + message.save() + data = message.to_dict() + return HttpResponse(json.dumps(data, indent=2), + content_type='application/json', status=201) + except Exception as e: + logger.error("Unexpected error 10: {}".format(traceback.format_exception(type(e), e, e.__traceback__))) + logging.info(e, exc_info=True) + logging.info('error message %s' % str(e)) + logging.info(e.__cause__) + logging.error(type(e).__name__) + logging.error(e.args) + return HttpResponse('failing due to %s with the message %s' % (e.__cause__, str(e)), status=400) @csrf_exempt def messages(req): @@ -181,7 +186,7 @@ def trace_message_deepfirst(node): children = Message.objects.filter(ref__pk=node['message']) logger.debug('%s: %d children' % (node['message'], len(children))) for child in children: - if child.actor.inforesid == 'ARS': + if child.actor.inforesid == 'infores:ars': pass else: channel_names=[] @@ -450,93 +455,108 @@ def message(req, key): return HttpResponse('Unknown message: %s' % key, status=404) elif req.method == 'POST': + # headers = dict(req.headers) + # if 'Traceparent' in headers.keys(): + # logging.info('traceparent for mesg pk: %s is %s'% (str(key),headers['Traceparent'])) + # else: + # logging.info('there is not Traceparent for mesg pk: %s'% (str(key))) + # carrier ={'traceparent': headers['Traceparent']} + # ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + # with tracer.start_as_current_span('message', context=ctx) as span: + # Extract the trace context from the incoming request headers + extracted_context = extract(req.headers) + token = attach(extracted_context) try: - data = json.loads(req.body) - #if 'query_graph' not in data or 'knowledge_graph' not in data or 'results' not in data: - # return HttpResponse('Not a valid Translator API json', status=400) - mesg = get_object_or_404(Message.objects.filter(pk=key)) - status = 'D' - code = 200 - if 'tr_ars.message.status' in req.headers: - status = req.headers['tr_ars.message.status'] - res=utils.get_safe(data,"message","results") - kg = utils.get_safe(data,"message", "knowledge_graph") - actor = Actor.objects.get(pk=mesg.actor_id) - inforesid =actor.inforesid - logging.info('received msg from agent: %s with parent pk: %s' % (str(inforesid), str(mesg.ref_id))) - if mesg.result_count is not None and mesg.result_count >0: - return HttpResponse('ARS already has a response with: %s results for pk %s \nWe are temporarily ' - 'disallowing subsequent updates to PKs which already have results\n' - % (str(len(res)), str(key)),status=409) - - if mesg.status=='E': - return HttpResponse("Response received but Message is already in state "+str(mesg.code)+". Response rejected\n",status=400) - if res is not None and len(res)>0: - mesg.result_count = len(res) - scorestat = utils.ScoreStatCalc(res) - mesg.result_stat = scorestat - #before we do basically anything else, we normalize - parent_pk = mesg.ref_id - #message_to_merge =utils.get_safe(data,"message") - message_to_merge = data - agent_name = str(mesg.actor.agent.name) - logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(res))) - utils.pre_merge_process(message_to_merge,key, agent_name, inforesid) - if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0: - mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg) - valid = utils.validate(data) - if valid: - if agent_name.startswith('ara-'): - logger.info("pre async call for agent %s" % agent_name) - #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) - utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) - logger.info("post async call for agent %s" % agent_name) - else: - logger.debug("Validation problem found for agent %s with pk %s" % (agent_name, str(mesg.ref_id))) - code = 422 - status = 'E' + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span('message') as span: + span.set_attribute("pk", str(key)) + try: + data = json.loads(req.body) + #if 'query_graph' not in data or 'knowledge_graph' not in data or 'results' not in data: + # return HttpResponse('Not a valid Translator API json', status=400) + mesg = get_object_or_404(Message.objects.filter(pk=key)) + status = 'D' + code = 200 + if 'tr_ars.message.status' in req.headers: + status = req.headers['tr_ars.message.status'] + res=utils.get_safe(data,"message","results") + kg = utils.get_safe(data,"message", "knowledge_graph") + actor = Actor.objects.get(pk=mesg.actor_id) + inforesid =actor.inforesid + span.set_attribute("agent", inforesid) + logging.info('received msg from agent: %s with parent pk: %s' % (str(inforesid), str(mesg.ref_id))) + if mesg.result_count is not None and mesg.result_count >0: + return HttpResponse('ARS already has a response with: %s results for pk %s \nWe are temporarily ' + 'disallowing subsequent updates to PKs which already have results\n' + % (str(len(res)), str(key)),status=409) + + if mesg.status=='E': + return HttpResponse("Response received but Message is already in state "+str(mesg.code)+". Response rejected\n",status=400) + if res is not None and len(res)>0: + mesg.result_count = len(res) + scorestat = utils.ScoreStatCalc(res) + mesg.result_stat = scorestat + #before we do basically anything else, we normalize + parent_pk = mesg.ref_id + #message_to_merge =utils.get_safe(data,"message") + message_to_merge = data + agent_name = str(mesg.actor.agent.name) + logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(res))) + utils.pre_merge_process(message_to_merge,key, agent_name, inforesid) + if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0: + mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg) + valid = utils.validate(data) + if valid: + if agent_name.startswith('ara-'): + logger.info("pre async call for agent %s" % agent_name) + #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) + utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) + logger.info("post async call for agent %s" % agent_name) + else: + logger.debug("Validation problem found for agent %s with pk %s" % (agent_name, str(mesg.ref_id))) + code = 422 + status = 'E' + mesg.status = status + mesg.code = code + mesg.save_compressed_dict(data) + mesg.save() + return HttpResponse("Problem with TRAPI Validation", + status=422) + mesg.status = status mesg.code = code mesg.save_compressed_dict(data) + if len(res) == 0 and res is not None: + mesg.result_count = 0 mesg.save() - return HttpResponse("Problem with TRAPI Validation", - status=422) - - mesg.status = status - mesg.code = code - mesg.save_compressed_dict(data) - if len(res) == 0 and res is not None: - mesg.result_count = 0 - mesg.save() - - return HttpResponse(json.dumps(mesg.to_dict(), indent=2), - status=201) - - except Message.DoesNotExist: - return HttpResponse('Unknown state reference %s' % key, status=404) - - except json.decoder.JSONDecodeError: - return HttpResponse('Can not decode json:
\n%s for the pk: %s' % (req.body, key), status=500) - - except Exception as e: - mesg.status = 'E' - mesg.code = 500 - log_entry = { - "message":"Internal ARS Server Error", - "timestamp":mesg.updated_at, - "level":"ERROR" - } - if 'logs' in data.keys(): - data['logs'].append(log_entry) - else: - data['logs'] = [log_entry] - mesg.save_compressed_dict(data) - #mesg.data = data - mesg.save() - logger.error("Unexpected error 12: {} with the pk: %s".format(traceback.format_exception(type(e), e, e.__traceback__), key)) - - return HttpResponse('Internal server error', status=500) + return HttpResponse(json.dumps(mesg.to_dict(), indent=2), + status=201) + + except Message.DoesNotExist: + return HttpResponse('Unknown state reference %s' % key, status=404) + + except json.decoder.JSONDecodeError: + return HttpResponse('Can not decode json:
\n%s for the pk: %s' % (req.body, key), status=500) + + except Exception as e: + mesg.status = 'E' + mesg.code = 500 + log_entry = { + "message":"Internal ARS Server Error", + "timestamp":mesg.updated_at, + "level":"ERROR" + } + if 'logs' in data.keys(): + data['logs'].append(log_entry) + else: + data['logs'] = [log_entry] + mesg.save_compressed_dict(data) + mesg.save() + logger.error("Unexpected error 12: {} with the pk: %s".format(traceback.format_exception(type(e), e, e.__traceback__), key)) + return HttpResponse('Internal server error', status=500) + finally: + detach(token) else: return HttpResponse('Method %s not supported!' % req.method, status=400) @@ -791,11 +811,7 @@ def answers(req, key): except Message.DoesNotExist: return HttpResponse('Unknown message: %s' % key, status=404) -@csrf_exempt -def status(req): - if req.method == 'GET': - return HttpResponse(json.dumps(status_report.status(req), indent=2), - content_type='application/json', status=200) + @csrf_exempt def timeoutTest(req,time=300): if req.method == 'POST': @@ -892,7 +908,6 @@ def post_process(req, key): path('messages/', message, name='ars-message'), re_path(r'^filters/?$', filters, name='ars-filters'), path('filter/', filter, name='ars-filter'), - re_path(r'^status/?$', status, name='ars-status'), path('reports/',get_report,name='ars-report'), re_path(r'^timeoutTest/?$', timeoutTest, name='ars-timeout'), path('merge/', merge, name='ars-merge'), diff --git a/tr_sys/tr_ars/pubsub.py b/tr_sys/tr_ars/pubsub.py index 0c2c5a4a..b0d94546 100644 --- a/tr_sys/tr_ars/pubsub.py +++ b/tr_sys/tr_ars/pubsub.py @@ -1,10 +1,10 @@ from django.core import serializers import sys, logging, json, threading, queue, requests from .models import Message -from tr_ars.tasks import send_message +from .tasks import send_message from django.utils import timezone from django.conf import settings - +from opentelemetry import trace logger = logging.getLogger(__name__) @@ -22,6 +22,8 @@ def send_messages(actors, messages): logger.debug("Skipping actor %s/%s; it's inactive..." % ( actor.agent, actor.url())) elif settings.USE_CELERY: + span = trace.get_current_span() + logger.debug(f"CURRENT span before Celery task submission: {span}") result = send_message.delay(actor.to_dict(), mesg.to_dict()) #logger.debug('>>>> task future: %s' % result) result.forget() diff --git a/tr_sys/tr_ars/tasks.py b/tr_sys/tr_ars/tasks.py index 12869f54..81cbd221 100644 --- a/tr_sys/tr_ars/tasks.py +++ b/tr_sys/tr_ars/tasks.py @@ -12,14 +12,31 @@ import traceback from django.utils import timezone from django.shortcuts import get_object_or_404 -import copy - +from opentelemetry import trace +from opentelemetry.propagate import inject +# Ensure that the tracing context is properly propagated within tasks +from opentelemetry.context import attach, detach, set_value, get_current logger = get_task_logger(__name__) -#logger.propagate = True + +def propagate_context(func): + def wrapper(*args, **kwargs): + token = attach(get_current()) + try: + return func(*args, **kwargs) + finally: + detach(token) + return wrapper + @shared_task(name="send-message-to-actor") +@propagate_context def send_message(actor_dict, mesg_dict, timeout=300): + span = trace.get_current_span() + logger.debug(f"CURRENT span before task execution: {span}") + tracer = trace.get_tracer(__name__) + infores=actor_dict['fields']['inforesid'] + agent= infores.split(':')[1] logger.info(mesg_dict) url = settings.DEFAULT_HOST + actor_dict['fields']['url'] logger.debug('sending message %s to %s...' % (mesg_dict['pk'], url)) @@ -51,76 +68,112 @@ def send_message(actor_dict, mesg_dict, timeout=300): endpoint=SmartApiDiscover().endpoint(inforesid) params=SmartApiDiscover().params(inforesid) query_endpoint = (endpoint if endpoint is not None else "") + (("?"+params) if params is not None else "") + task_id=str(mesg.pk) + with tracer.start_as_current_span(f"{agent}") as span: + logger.debug(f"CURRENT span during task execution: {span}") + span.set_attribute("pk", str(mesg.pk)) + span.set_attribute("ref_pk", str(mesg.ref_id)) + span.set_attribute("agent", agent) + headers={} + inject(headers) + # Make HTTP request and trace it + try: + logging.info('POSTing to agent %s pk:%s with header %s'% (agent,task_id, headers)) + r = requests.post(url, json=data, headers=headers, timeout=timeout) + span.set_attribute("http.url", url) + span.set_attribute("http.status_code", r.status_code) + span.set_attribute("http.method", "POST") + span.set_attribute("task.id", task_id) + #span.set_attribute("celery.task_id", send_message.request.id) + logger.debug('%d: receive message from actor %s...\n%s.\n' + % (r.status_code, url, str(r.text)[:500])) + status_code = r.status_code + url = r.url + if 'tr_ars.url' in r.headers: + url = r.headers['tr_ars.url'] + # status defined in https://github.com/NCATSTranslator/ReasonerAPI/blob/master/TranslatorReasonerAPI.yaml + # paths: /query: post: responses: + # 200 = OK. There may or may not be results. Note that some of the provided + # identifiers may not have been recognized. + # 202 = Accepted. Poll /aresponse for results. + # 400 = Bad request. The request is invalid according to this OpenAPI + # schema OR a specific identifier is believed to be invalid somehow + # (not just unrecognized). + # 500 = Internal server error. + # 501 = Not implemented. + # Message.STATUS + # ('D', 'Done'), + # ('S', 'Stopped'), + # ('R', 'Running'), + # ('E', 'Error'), + # ('W', 'Waiting'), + # ('U', 'Unknown') + if r.status_code == 200: + rdata = dict() + try: + rdata = r.json() + except json.decoder.JSONDecodeError: + status = 'E' + # now create a new message here + if(endpoint)=="asyncquery": + if(callback is not None): + try: + ar = requests.get(callback, json=data, timeout=timeout) + arj=ar.json() + if utils.get_safe(rdata,"fields","data", "message") is None: + logger.debug("data field empty") + status = 'R' + status_code = 202 + else: + logger.debug("data field contains "+ arj["fields"]["data"]["message"]) + status = 'D' + status_code = 200 + except json.decoder.JSONDecodeError: + status = 'E' + status_code = 422 - try: - r = requests.post(url, json=data, timeout=timeout) - logger.debug('%d: receive message from actor %s...\n%s.\n' - % (r.status_code, url, str(r.text)[:500])) - status_code = r.status_code - url = r.url - if 'tr_ars.url' in r.headers: - url = r.headers['tr_ars.url'] - # status defined in https://github.com/NCATSTranslator/ReasonerAPI/blob/master/TranslatorReasonerAPI.yaml - # paths: /query: post: responses: - # 200 = OK. There may or may not be results. Note that some of the provided - # identifiers may not have been recognized. - # 202 = Accepted. Poll /aresponse for results. - # 400 = Bad request. The request is invalid according to this OpenAPI - # schema OR a specific identifier is believed to be invalid somehow - # (not just unrecognized). - # 500 = Internal server error. - # 501 = Not implemented. - # Message.STATUS - # ('D', 'Done'), - # ('S', 'Stopped'), - # ('R', 'Running'), - # ('E', 'Error'), - # ('W', 'Waiting'), - # ('U', 'Unknown') - if r.status_code == 200: - rdata = dict() - try: - rdata = r.json() - except json.decoder.JSONDecodeError: - status = 'E' - # now create a new message here - if(endpoint)=="asyncquery": - if(callback is not None): - try: - ar = requests.get(callback, json=data, timeout=timeout) - arj=ar.json() - if utils.get_safe(rdata,"fields","data", "message") is None: - logger.debug("data field empty") - status = 'R' - status_code = 202 - else: - logger.debug("data field contains "+ arj["fields"]["data"]["message"]) - status = 'D' - status_code = 200 - except json.decoder.JSONDecodeError: - status = 'E' - status_code = 422 - + else: + logger.debug("Not async for agent: %s and endpoint: %s? " % (inforesid,query_endpoint)) + status = 'D' + status_code = 200 + results = utils.get_safe(rdata,"message","results") + kg = utils.get_safe(rdata,"message", "knowledge_graph") + #before we do basically anything else, we normalize + #no sense in processing something without results + if results is not None and len(results)>0: + mesg.result_count = len(rdata["message"]["results"]) + scorestat = utils.ScoreStatCalc(results) + mesg.result_stat = scorestat + parent_pk = mesg.ref.id + #message_to_merge = utils.get_safe(rdata,"message") + message_to_merge=rdata + agent_name = str(mesg.actor.agent.name) + child_pk=str(mesg.pk) + logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(results))) + utils.pre_merge_process(message_to_merge,child_pk, agent_name, inforesid) + #Whether we did any additional processing or not, we need to save what we have + mesg.code = status_code + mesg.status = status + mesg.save_compressed_dict(rdata) + #mesg.data = rdata + mesg.url = url + mesg.save() + logger.debug('+++ message saved: %s' % (mesg.pk)) else: - logger.debug("Not async for agent: %s and endpoint: %s? " % (inforesid,query_endpoint)) - status = 'D' - status_code = 200 - results = utils.get_safe(rdata,"message","results") - kg = utils.get_safe(rdata,"message", "knowledge_graph") - #before we do basically anything else, we normalize - #no sense in processing something without results - if results is not None and len(results)>0: - mesg.result_count = len(rdata["message"]["results"]) - scorestat = utils.ScoreStatCalc(results) - mesg.result_stat = scorestat - parent_pk = mesg.ref.id - #message_to_merge = utils.get_safe(rdata,"message") - message_to_merge=rdata - agent_name = str(mesg.actor.agent.name) - child_pk=str(mesg.pk) - logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(results))) - utils.pre_merge_process(message_to_merge,child_pk, agent_name, inforesid) - #Whether we did any additional processing or not, we need to save what we have + #if the tool returned something other than 200, we log as appropriate and then save + if 'tr_ars.message.status' in r.headers: + status = r.headers['tr_ars.message.status'] + if r.status_code == 202: + status = 'R' + url = url[:url.rfind('/')] + '/aresponse/' + r.text + if r.status_code >= 400: + if r.status_code != 503: + status = 'E' + rdata['logs'] = [] + rdata['logs'].append(html.escape(r.text)) + for key in r.headers: + if key.lower().find('tr_ars') > -1: + rdata['logs'].append(key+": "+r.headers[key]) mesg.code = status_code mesg.status = status mesg.save_compressed_dict(rdata) @@ -128,21 +181,15 @@ def send_message(actor_dict, mesg_dict, timeout=300): mesg.url = url mesg.save() logger.debug('+++ message saved: %s' % (mesg.pk)) - else: - #if the tool returned something other than 200, we log as appropriate and then save - if 'tr_ars.message.status' in r.headers: - status = r.headers['tr_ars.message.status'] - if r.status_code == 202: - status = 'R' - url = url[:url.rfind('/')] + '/aresponse/' + r.text - if r.status_code >= 400: - if r.status_code != 503: - status = 'E' - rdata['logs'] = [] - rdata['logs'].append(html.escape(r.text)) - for key in r.headers: - if key.lower().find('tr_ars') > -1: - rdata['logs'].append(key+": "+r.headers[key]) + #This exception is meant to handle unexpected errors in the ORIGINAL return from the ARA + except Exception as e: + logger.error("Unexpected error 2: {}".format(traceback.format_exception(type(e), e, e.__traceback__))) + logger.exception("Can't send message to actor %s\n%s for pk: %s" + % (url,sys.exc_info(),mesg.pk)) + span.set_attribute("error", True) + span.set_attribute("error.message", str(e)) + status_code = 500 + status = 'E' mesg.code = status_code mesg.status = status mesg.save_compressed_dict(rdata) @@ -150,48 +197,33 @@ def send_message(actor_dict, mesg_dict, timeout=300): mesg.url = url mesg.save() logger.debug('+++ message saved: %s' % (mesg.pk)) - #This exception is meant to handle unexpected errors in the ORIGINAL return from the ARA - except Exception as e: - logger.error("Unexpected error 2: {}".format(traceback.format_exception(type(e), e, e.__traceback__))) - logger.exception("Can't send message to actor %s\n%s for pk: %s" - % (url,sys.exc_info(),mesg.pk)) - status_code = 500 - status = 'E' - mesg.code = status_code - mesg.status = status - mesg.save_compressed_dict(rdata) - #mesg.data = rdata - mesg.url = url - mesg.save() - logger.debug('+++ message saved: %s' % (mesg.pk)) - agent_name = str(mesg.actor.agent.name) - if mesg.code == 200 and results is not None and len(results)>0: - valid = utils.validate(message_to_merge) - if valid: - if agent_name.startswith('ara-'): - logger.info("pre async call for agent %s" % agent_name) - # logging.debug("Merge starting for "+str(mesg.pk)) - # new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name) - # logging.debug("Merge complete for "+str(new_merged.pk)) - # utils.post_process(new_merged.data,new_merged.pk, agent_name) - # logging.debug("Post processing done for "+str(new_merged.pk)) - #parent = get_object_or_404(Message.objects.filter(pk=parent_pk)) - #logging.info(f'parent merged_versions_list before going into merge&post-process for pk: %s are %s' % (parent_pk,parent.merged_versions_list)) - #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) - utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) - logger.info("post async call for agent %s" % agent_name) + agent_name = str(mesg.actor.agent.name) + if mesg.code == 200 and results is not None and len(results)>0: + valid = utils.validate(message_to_merge) + if valid: + if agent_name.startswith('ara-'): + logger.info("pre async call for agent %s" % agent_name) + # logging.debug("Merge starting for "+str(mesg.pk)) + # new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name) + # logging.debug("Merge complete for "+str(new_merged.pk)) + # utils.post_process(new_merged.data,new_merged.pk, agent_name) + # logging.debug("Post processing done for "+str(new_merged.pk)) + #parent = get_object_or_404(Message.objects.filter(pk=parent_pk)) + #logging.info(f'parent merged_versions_list before going into merge&post-process for pk: %s are %s' % (parent_pk,parent.merged_versions_list)) + #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) + utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) + logger.info("post async call for agent %s" % agent_name) + else: + logger.debug("Validation problem found for agent %s with pk %s" % (agent_name, str(mesg.ref_id))) + code = 422 + status = 'E' + mesg.code = code + mesg.status = status + mesg.save() else: - logger.debug("Validation problem found for agent %s with pk %s" % (agent_name, str(mesg.ref_id))) - code = 422 - status = 'E' - mesg.code = code - mesg.status = status - mesg.save() - else: - logging.debug("Skipping merge and post for "+str(mesg.pk)+ - " because the contributing message is in state: "+str(mesg.code)) - + logging.debug("Skipping merge and post for "+str(mesg.pk)+ + " because the contributing message is in state: "+str(mesg.code)) @shared_task(name="catch_timeout") def catch_timeout_async(): diff --git a/tr_sys/tr_ars/urls.py b/tr_sys/tr_ars/urls.py index 52b45ab0..3ca04a28 100644 --- a/tr_sys/tr_ars/urls.py +++ b/tr_sys/tr_ars/urls.py @@ -16,7 +16,6 @@ re_path(r'^filters/?$', api.filters, name='ars-filters'), path('filter/', api.filter, name='ars-filter'), path('reports/',api.get_report,name='ars-report'), - re_path(r'^status/?$', api.status, name='ars-status'), re_path(r'^timeoutTest/?$', api.timeoutTest, name='ars-timeout'), path('merge/', api.merge, name='ars-merge'), path('retain/', api.retain, name='ars-retain'), diff --git a/tr_sys/tr_ars/utils.py b/tr_sys/tr_ars/utils.py index 0e2a653c..f27999bf 100644 --- a/tr_sys/tr_ars/utils.py +++ b/tr_sys/tr_ars/utils.py @@ -31,7 +31,8 @@ Response as vResponse ) from pydantic import ValidationError - +from opentelemetry import trace +tracer = trace.get_tracer(__name__) ARS_ACTOR = { 'channel': [], @@ -43,7 +44,7 @@ 'inforesid': 'ARS' } -NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenormalization-sri.renci.org/1.4/get_normalized_nodes" +NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenorm.ci.transltr.io/get_normalized_nodes" ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://biothings.ncats.io/annotator/" APPRAISER_URL=os.getenv("TR_APPRAISE") if os.getenv("TR_APPRAISE") is not None else "http://localhost:9096/get_appraisal" @@ -707,16 +708,15 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0): merged.status='E' merged.code = 422 merged.save() - else: - #If there is currently a merge happening, we wait until it finishes to do our merge if counter < 5: - logging.debug("Merged_version locked for %s. Attempt %s:" % (agent_name, str(counter))) + logging.info("Merged_version locked for %s. Attempt %s:" % (agent_name, str(counter))) sleeptime.sleep(5) counter = counter + 1 merge_and_post_process(parent_pk,message_to_merge, agent_name, counter) else: - logging.debug("Merging failed for %s %s" % (agent_name, str(parent_pk))) + logging.info("Merging failed for %s %s" % (agent_name, str(parent_pk))) + if merged is not None: logging.info('merged data for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id))) @@ -726,6 +726,7 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0): merged.code = code merged.save() + def remove_blocked(mesg, data, blocklist=None): try: #logging.info("Getting the length of the dictionary in {} bytes".format(get_deep_size(data))) @@ -953,53 +954,55 @@ def appraise(mesg,data, agent_name,retry_counter=0): headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} json_data = json.dumps(data) logging.info('sending data for agent: %s to APPRAISER URL: %s' % (agent_name, APPRAISER_URL)) - try: - with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r: - logging.info("Appraiser being called at: "+APPRAISER_URL) - logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code)) - if r.status_code==200: - rj = r.json() - #for now, just update the whole message, but we could be more precise/efficient - logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id))) - data['message']['results']=rj['message']['results'] - logging.info("Updating message with appraiser data complete for "+str(mesg.id)) - else: - retry_counter +=1 - logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter))) - logging.info("JSON fields "+str(json_data)[:100]) - if retry_counter<3: - appraise(mesg,data, agent_name,retry_counter) - else: - logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id))) - raise Exception - except Exception as e: - logging.error("Problem with appraiser for agent %s and pk %s of type %s" % (agent_name,str(mesg.id),type(e).__name__)) - logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id))) - results = get_safe(data,"message","results") - default_ordering_component = { - "novelty": 0, - "confidence": 0, - "clinical_evidence": 0 - } - if results is not None: - for result in results: - if 'ordering_components' not in result.keys(): - result['ordering_components']=default_ordering_component + with tracer.start_as_current_span("get_appraisal") as span: + try: + with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r: + logging.info("Appraiser being called at: "+APPRAISER_URL) + logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code)) + if r.status_code==200: + rj = r.json() + #for now, just update the whole message, but we could be more precise/efficient + logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id))) + data['message']['results']=rj['message']['results'] + logging.info("Updating message with appraiser data complete for "+str(mesg.id)) else: - continue - else: - logging.error('results returned from appraiser is None') - log_tuple =[ - "Error in Appraiser "+ str(e), - datetime.now().strftime('%H:%M:%S'), - "ERROR" - ] - add_log_entry(data,log_tuple) - mesg.save_compressed_dict(data) - mesg.status='E' - mesg.code = 422 - mesg.save(update_fields=['status','code']) - + retry_counter +=1 + logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter))) + logging.info("JSON fields "+str(json_data)[:100]) + if retry_counter<3: + appraise(mesg,data, agent_name,retry_counter) + else: + logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id))) + raise Exception + except Exception as e: + logging.error("Problem with appraiser for agent %s and pk %s of type %s" % (agent_name,str(mesg.id),type(e).__name__)) + logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id))) + span.set_attribute("error", True) + span.set_attribute("exception", str(e)) + results = get_safe(data,"message","results") + default_ordering_component = { + "novelty": 0, + "confidence": 0, + "clinical_evidence": 0 + } + if results is not None: + for result in results: + if 'ordering_components' not in result.keys(): + result['ordering_components']=default_ordering_component + else: + continue + else: + logging.error('results returned from appraiser is None') + log_tuple =[ + "Error in Appraiser "+ str(e), + datetime.now().strftime('%H:%M:%S'), + "ERROR" + ] + add_log_entry(data,log_tuple) + mesg.save_compressed_dict(data) + mesg.status='E' + mesg.code = 422 + mesg.save(update_fields=['status','code']) def annotate_nodes(mesg,data,agent_name): #TODO pull this URL from SmartAPI @@ -1025,31 +1028,38 @@ def annotate_nodes(mesg,data,agent_name): for key in invalid_nodes.keys(): del nodes_message['message']['knowledge_graph']['nodes'][key] - json_data = json.dumps(nodes_message) - try: - logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL) - # with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile: - # outfile.write(json_data) - r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers) - r.raise_for_status() - rj=r.json() - logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code)) - if r.status_code==200: - for key, value in rj.items(): - if 'attributes' in value.keys() and value['attributes'] is not None: - for attribute in value['attributes']: - if attribute is not None: - add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute) - - #Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness - if len(invalid_nodes)>0: - data['message']['knowledge_graph']['nodes'].update(invalid_nodes) - else: - post_processing_error(mesg,data,"Error in annotation of nodes") - except Exception as e: - logging.exception('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e))) - raise e + logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL) + # with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile: + # outfile.write(json_data) + with tracer.start_as_current_span("annotator") as span: + try: + r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers) + r.raise_for_status() + rj=r.json() + logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code)) + if r.status_code==200: + for key, value in rj.items(): + if 'attributes' in value.keys() and value['attributes'] is not None: + for attribute in value['attributes']: + if attribute is not None: + add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute) + + #Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness + if len(invalid_nodes)>0: + data['message']['knowledge_graph']['nodes'].update(invalid_nodes) + else: + post_processing_error(mesg,data,"Error in annotation of nodes") + except Exception as e: + logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e))) + logging.exception("error in node annotation internal function") + span.set_attribute("error", True) + span.set_attribute("exception", str(e)) + raise e + #else: + # with open(str(mesg.actor)+".json", "w") as outfile: + # outfile.write(json_data) + # post_processing_error(mesg,data,"Error in annotation of nodes") def normalize_scores(data,key, agent_name): res=get_safe(data,"message","results") @@ -1209,9 +1219,15 @@ def canonize(curies): "drug_chemical_conflate":True } logging.info('the normalizer_URL is %s' % NORMALIZER_URL) - r = requests.post(NORMALIZER_URL,json.dumps(j)) - rj=r.json() - return rj + with tracer.start_as_current_span("get_normalized_node") as span: + try: + r = requests.post(NORMALIZER_URL,json.dumps(j)) + rj=r.json() + return rj + except Exception as e: + span.set_attribute("error", True) + span.set_attribute("exception", str(e)) + raise def canonizeResults(results): diff --git a/tr_sys/tr_sys/celery.py b/tr_sys/tr_sys/celery.py index 069f5b91..019bff4e 100644 --- a/tr_sys/tr_sys/celery.py +++ b/tr_sys/tr_sys/celery.py @@ -4,7 +4,12 @@ from celery import Celery from celery.schedules import crontab - +from opentelemetry.instrumentation.celery import CeleryInstrumentor +# from celery.signals import worker_process_init +# +# @worker_process_init.connect(weak=False) +# def init_celery_tracing(*args, **kwargs): +# CeleryInstrumentor().instrument() # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tr_sys.settings') @@ -20,11 +25,11 @@ # Load task modules from all registered Django app configs. app.autodiscover_tasks() - @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) +CeleryInstrumentor().instrument() app.conf.beat_schedule = { #Excute the timeout fucntion every 3 min diff --git a/tr_sys/tr_sys/otel_config.py b/tr_sys/tr_sys/otel_config.py new file mode 100644 index 00000000..f83974b0 --- /dev/null +++ b/tr_sys/tr_sys/otel_config.py @@ -0,0 +1,52 @@ +import os +import logging +from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME as telemetery_service_name_key, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from opentelemetry.instrumentation.django import DjangoInstrumentor +from opentelemetry.exporter.jaeger.thrift import JaegerExporter +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from celery.signals import worker_process_init + + +def configure_opentelemetry(): + + #jaeger_host = os.environ.get('JAEGER_HOST', 'jaeger-otel-agent') + #jaeger_port = int(os.environ.get('JAEGER_PORT', '6831')) + logging.info('About to instrument ARS app for OTEL') + try: + jaeger_host= 'jaeger-otel-agent.sri' + jaeger_port= 6831 + service_name= 'ARS' + resource = Resource.create({telemetery_service_name_key: service_name}) + + trace.set_tracer_provider(TracerProvider(resource=resource)) + + tracer_provider = trace.get_tracer_provider() + + # Configure Jaeger Exporter + jaeger_exporter = JaegerExporter( + agent_host_name=jaeger_host, + agent_port=jaeger_port, + ) + + span_processor = BatchSpanProcessor(jaeger_exporter) + tracer_provider.add_span_processor(span_processor) + + # Optional: Console exporter for debugging + console_exporter = ConsoleSpanExporter() + tracer_provider.add_span_processor(BatchSpanProcessor(console_exporter)) + + DjangoInstrumentor().instrument() + RequestsInstrumentor().instrument() + + @worker_process_init.connect(weak=False) + def init_celery_tracing(*args, **kwargs): + CeleryInstrumentor().instrument() + + + logging.info('Finished instrumenting ARS app for OTEL') + except Exception as e: + logging.error('OTEL instrumentation failed because: %s'%str(e)) \ No newline at end of file diff --git a/tr_sys/tr_sys/settings.py b/tr_sys/tr_sys/settings.py index 8e99e65a..1d800c9c 100644 --- a/tr_sys/tr_sys/settings.py +++ b/tr_sys/tr_sys/settings.py @@ -11,7 +11,9 @@ """ import os +from .otel_config import configure_opentelemetry +configure_opentelemetry() # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))