diff --git a/tr_sys/tr_ars/api.py b/tr_sys/tr_ars/api.py index 7a300d0c..a70e2875 100644 --- a/tr_sys/tr_ars/api.py +++ b/tr_sys/tr_ars/api.py @@ -90,7 +90,7 @@ def submit(req): span.set_attribute("http.method", req.method) span.set_attribute("http.url", req.build_absolute_uri()) span.set_attribute("user.id", req.user.id if req.user.is_authenticated else "anonymous") - + logger.debug(f"Submit span: {span}") if req.method != 'POST': return HttpResponse('Only POST is permitted!', status=405) try: diff --git a/tr_sys/tr_ars/tasks.py b/tr_sys/tr_ars/tasks.py index 81cbd221..ac019baf 100644 --- a/tr_sys/tr_ars/tasks.py +++ b/tr_sys/tr_ars/tasks.py @@ -30,10 +30,7 @@ def wrapper(*args, **kwargs): @shared_task(name="send-message-to-actor") -@propagate_context def send_message(actor_dict, mesg_dict, timeout=300): - span = trace.get_current_span() - logger.debug(f"CURRENT span before task execution: {span}") tracer = trace.get_tracer(__name__) infores=actor_dict['fields']['inforesid'] agent= infores.split(':')[1] @@ -74,12 +71,18 @@ def send_message(actor_dict, mesg_dict, timeout=300): span.set_attribute("pk", str(mesg.pk)) span.set_attribute("ref_pk", str(mesg.ref_id)) span.set_attribute("agent", agent) - headers={} - inject(headers) # Make HTTP request and trace it try: - logging.info('POSTing to agent %s pk:%s with header %s'% (agent,task_id, headers)) - r = requests.post(url, json=data, headers=headers, timeout=timeout) + logger.debug(f"CURRENT span before request post call: {span}") + #having to manually generate the traceparent_id since the automatic generation is disabled + span_context = span.get_span_context() + trace_id = span_context.trace_id + span_id = span_context.span_id + trace_flags = span_context.trace_flags + # Format the traceparent header + traceparent_header = (f"00-{trace_id:032x}-{span_id:016x}-{trace_flags:02x}") + logging.info('POSTing to agent %s pk:%s with traceparent: %s '% (agent,task_id, traceparent_header)) + r = requests.post(url, json=data, timeout=timeout) span.set_attribute("http.url", url) span.set_attribute("http.status_code", r.status_code) span.set_attribute("http.method", "POST") @@ -212,6 +215,7 @@ def send_message(actor_dict, mesg_dict, timeout=300): #parent = get_object_or_404(Message.objects.filter(pk=parent_pk)) #logging.info(f'parent merged_versions_list before going into merge&post-process for pk: %s are %s' % (parent_pk,parent.merged_versions_list)) #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) + #utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name) utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name)) logger.info("post async call for agent %s" % agent_name) else: diff --git a/tr_sys/tr_ars/utils.py b/tr_sys/tr_ars/utils.py index d6703b89..db5d95a8 100644 --- a/tr_sys/tr_ars/utils.py +++ b/tr_sys/tr_ars/utils.py @@ -45,7 +45,7 @@ } NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenorm.ci.transltr.io/get_normalized_nodes" -ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://biothings.ncats.io/annotator/" +ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://annotator.ci.transltr.io/curie" APPRAISER_URL=os.getenv("TR_APPRAISE") if os.getenv("TR_APPRAISE") is not None else "https://answerappraiser.ci.transltr.io/get_appraisal" @@ -1031,39 +1031,39 @@ def annotate_nodes(mesg,data,agent_name): nodes = get_safe(data,"message","knowledge_graph","nodes") if nodes is not None: nodes_message = { - "message": - { - "knowledge_graph":{ - "nodes":nodes - } - } + "ids": [node_key for node_key in nodes.keys() if nodes is not None] } #we have to scrub input for invalid CURIEs or we'll get a 500 back from the annotator curie_pattern = re.compile("[\w\.]+:[\w\.]+") invalid_nodes={} - for key in nodes_message['message']['knowledge_graph']['nodes'].keys(): + for key in nodes_message['ids']: if not curie_pattern.match(str(key)): - invalid_nodes[key]=nodes_message['message']['knowledge_graph']['nodes'][key] + invalid_nodes[key]=nodes[key] if len(invalid_nodes)!=0: for key in invalid_nodes.keys(): - del nodes_message['message']['knowledge_graph']['nodes'][key] + nodes_message['ids'].remove(key) json_data = json.dumps(nodes_message) logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL) with tracer.start_as_current_span("annotator") as span: try: - r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers) + r = requests.post(ANNOTATOR_URL,json=nodes_message,headers=headers) r.raise_for_status() rj=r.json() logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code)) if r.status_code==200: for key, value in rj.items(): - if 'attributes' in value.keys() and value['attributes'] is not None: - for attribute in value['attributes']: - if attribute is not None: - add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute) - + if isinstance(value, list) and 'notfound' in value[0].keys() and value[0]['notfound'] == True: + pass + elif isinstance(value, dict) and value == {}: + pass + else: + attribute={ + "attribute_type_id": "biothings_annotations", + "value": value + } + add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute) #Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness if len(invalid_nodes)>0: data['message']['knowledge_graph']['nodes'].update(invalid_nodes) @@ -1072,6 +1072,7 @@ def annotate_nodes(mesg,data,agent_name): except Exception as e: logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e))) logging.exception("error in node annotation internal function") + logging.info(f'response error %s'%(r.text)) span.set_attribute("error", True) span.set_attribute("exception", str(e)) raise e diff --git a/tr_sys/tr_sys/wsgi.py b/tr_sys/tr_sys/wsgi.py index af0aeb0b..d5879fa4 100644 --- a/tr_sys/tr_sys/wsgi.py +++ b/tr_sys/tr_sys/wsgi.py @@ -8,9 +8,10 @@ """ import os - +from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware from django.core.wsgi import get_wsgi_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tr_sys.settings") application = get_wsgi_application() +application = OpenTelemetryMiddleware(application) \ No newline at end of file