Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wsgi instrumentation to pass along the correct traceid #672

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def submit(req):
span.set_attribute("http.method", req.method)
span.set_attribute("http.url", req.build_absolute_uri())
span.set_attribute("user.id", req.user.id if req.user.is_authenticated else "anonymous")

logger.debug(f"Submit span: {span}")
if req.method != 'POST':
return HttpResponse('Only POST is permitted!', status=405)
try:
Expand Down
18 changes: 11 additions & 7 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ def wrapper(*args, **kwargs):


@shared_task(name="send-message-to-actor")
@propagate_context
def send_message(actor_dict, mesg_dict, timeout=300):
span = trace.get_current_span()
logger.debug(f"CURRENT span before task execution: {span}")
tracer = trace.get_tracer(__name__)
infores=actor_dict['fields']['inforesid']
agent= infores.split(':')[1]
Expand Down Expand Up @@ -74,12 +71,18 @@ def send_message(actor_dict, mesg_dict, timeout=300):
span.set_attribute("pk", str(mesg.pk))
span.set_attribute("ref_pk", str(mesg.ref_id))
span.set_attribute("agent", agent)
headers={}
inject(headers)
# Make HTTP request and trace it
try:
logging.info('POSTing to agent %s pk:%s with header %s'% (agent,task_id, headers))
r = requests.post(url, json=data, headers=headers, timeout=timeout)
logger.debug(f"CURRENT span before request post call: {span}")
#having to manually generate the traceparent_id since the automatic generation is disabled
span_context = span.get_span_context()
trace_id = span_context.trace_id
span_id = span_context.span_id
trace_flags = span_context.trace_flags
# Format the traceparent header
traceparent_header = (f"00-{trace_id:032x}-{span_id:016x}-{trace_flags:02x}")
logging.info('POSTing to agent %s pk:%s with traceparent: %s '% (agent,task_id, traceparent_header))
r = requests.post(url, json=data, timeout=timeout)
span.set_attribute("http.url", url)
span.set_attribute("http.status_code", r.status_code)
span.set_attribute("http.method", "POST")
Expand Down Expand Up @@ -212,6 +215,7 @@ def send_message(actor_dict, mesg_dict, timeout=300):
#parent = get_object_or_404(Message.objects.filter(pk=parent_pk))
#logging.info(f'parent merged_versions_list before going into merge&post-process for pk: %s are %s' % (parent_pk,parent.merged_versions_list))
#utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name)
#utils.merge_and_post_process(parent_pk,message_to_merge['message'],agent_name)
utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name))
logger.info("post async call for agent %s" % agent_name)
else:
Expand Down
33 changes: 17 additions & 16 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
}

NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenorm.ci.transltr.io/get_normalized_nodes"
ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://biothings.ncats.io/annotator/"
ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://annotator.ci.transltr.io/curie"
APPRAISER_URL=os.getenv("TR_APPRAISE") if os.getenv("TR_APPRAISE") is not None else "https://answerappraiser.ci.transltr.io/get_appraisal"


Expand Down Expand Up @@ -1031,39 +1031,39 @@ def annotate_nodes(mesg,data,agent_name):
nodes = get_safe(data,"message","knowledge_graph","nodes")
if nodes is not None:
nodes_message = {
"message":
{
"knowledge_graph":{
"nodes":nodes
}
}
"ids": [node_key for node_key in nodes.keys() if nodes is not None]
}
#we have to scrub input for invalid CURIEs or we'll get a 500 back from the annotator
curie_pattern = re.compile("[\w\.]+:[\w\.]+")
invalid_nodes={}

for key in nodes_message['message']['knowledge_graph']['nodes'].keys():
for key in nodes_message['ids']:
if not curie_pattern.match(str(key)):
invalid_nodes[key]=nodes_message['message']['knowledge_graph']['nodes'][key]
invalid_nodes[key]=nodes[key]
if len(invalid_nodes)!=0:
for key in invalid_nodes.keys():
del nodes_message['message']['knowledge_graph']['nodes'][key]
nodes_message['ids'].remove(key)

json_data = json.dumps(nodes_message)
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
with tracer.start_as_current_span("annotator") as span:
try:
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
r = requests.post(ANNOTATOR_URL,json=nodes_message,headers=headers)
r.raise_for_status()
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
if r.status_code==200:
for key, value in rj.items():
if 'attributes' in value.keys() and value['attributes'] is not None:
for attribute in value['attributes']:
if attribute is not None:
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)

if isinstance(value, list) and 'notfound' in value[0].keys() and value[0]['notfound'] == True:
pass
elif isinstance(value, dict) and value == {}:
pass
else:
attribute={
"attribute_type_id": "biothings_annotations",
"value": value
}
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)
#Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness
if len(invalid_nodes)>0:
data['message']['knowledge_graph']['nodes'].update(invalid_nodes)
Expand All @@ -1072,6 +1072,7 @@ def annotate_nodes(mesg,data,agent_name):
except Exception as e:
logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e)))
logging.exception("error in node annotation internal function")
logging.info(f'response error %s'%(r.text))
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e
Expand Down
3 changes: 2 additions & 1 deletion tr_sys/tr_sys/wsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
"""

import os

from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware
from django.core.wsgi import get_wsgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tr_sys.settings")

application = get_wsgi_application()
application = OpenTelemetryMiddleware(application)