Skip to content

Commit

Permalink
Merge branch 'master' into annotator_fail_code
Browse files Browse the repository at this point in the history
  • Loading branch information
ShervinAbd92 authored Jul 15, 2024
2 parents cc83044 + e7b37d4 commit 35cb272
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 346 deletions.
3 changes: 3 additions & 0 deletions config/tr_sys_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"""

import os
from tr_sys.tr_sys.otel_config import configure_opentelemetry

configure_opentelemetry()

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down
2 changes: 2 additions & 0 deletions deploy/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ spec:
value: {{ .Values.arsserver.env.TR_NORMALIZER }}
- name: TR_ANNOTATOR
value: {{ .Values.arsserver.env.TR_ANNOTATOR }}

{{- with .Values.arsserver.resources }}
resources:
{{- toYaml . | nindent 12 }}
Expand All @@ -59,6 +60,7 @@ spec:
value: {{ .Values.celeryworker.env.TR_NORMALIZER }}
- name: TR_ANNOTATOR
value: {{ .Values.celeryworker.env.TR_ANNOTATOR }}

{{- with .Values.celeryworker.resources }}
resources:
{{- toYaml . | nindent 12 }}
Expand Down
8 changes: 8 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ statistics
sympy
objsize
reasoner-pydantic
opentelemetry-api
opentelemetry-sdk
opentelemetry-instrumentation
opentelemetry-instrumentation-django
opentelemetry-exporter-jaeger
opentelemetry-instrumentation-requests
opentelemetry-instrumentation-celery

287 changes: 151 additions & 136 deletions tr_sys/tr_ars/api.py

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions tr_sys/tr_ars/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from django.core import serializers
import sys, logging, json, threading, queue, requests
from .models import Message
from tr_ars.tasks import send_message
from .tasks import send_message
from django.utils import timezone
from django.conf import settings

from opentelemetry import trace

logger = logging.getLogger(__name__)

Expand All @@ -22,6 +22,8 @@ def send_messages(actors, messages):
logger.debug("Skipping actor %s/%s; it's inactive..." % (
actor.agent, actor.url()))
elif settings.USE_CELERY:
span = trace.get_current_span()
logger.debug(f"CURRENT span before Celery task submission: {span}")
result = send_message.delay(actor.to_dict(), mesg.to_dict())
#logger.debug('>>>> task future: %s' % result)
result.forget()
Expand Down
284 changes: 158 additions & 126 deletions tr_sys/tr_ars/tasks.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion tr_sys/tr_ars/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
re_path(r'^filters/?$', api.filters, name='ars-filters'),
path('filter/<uuid:key>', api.filter, name='ars-filter'),
path('reports/<inforesid>',api.get_report,name='ars-report'),
re_path(r'^status/?$', api.status, name='ars-status'),
re_path(r'^timeoutTest/?$', api.timeoutTest, name='ars-timeout'),
path('merge/<uuid:key>', api.merge, name='ars-merge'),
path('retain/<uuid:key>', api.retain, name='ars-retain'),
Expand Down
174 changes: 95 additions & 79 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
Response as vResponse
)
from pydantic import ValidationError

from opentelemetry import trace
tracer = trace.get_tracer(__name__)

ARS_ACTOR = {
'channel': [],
Expand All @@ -43,7 +44,7 @@
'inforesid': 'ARS'
}

NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenormalization-sri.renci.org/1.4/get_normalized_nodes"
NORMALIZER_URL=os.getenv("TR_NORMALIZER") if os.getenv("TR_NORMALIZER") is not None else "https://nodenorm.ci.transltr.io/get_normalized_nodes"
ANNOTATOR_URL=os.getenv("TR_ANNOTATOR") if os.getenv("TR_ANNOTATOR") is not None else "https://biothings.ncats.io/annotator/"
APPRAISER_URL=os.getenv("TR_APPRAISE") if os.getenv("TR_APPRAISE") is not None else "http://localhost:9096/get_appraisal"

Expand Down Expand Up @@ -707,16 +708,15 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0):
merged.status='E'
merged.code = 422
merged.save()

else:
#If there is currently a merge happening, we wait until it finishes to do our merge
if counter < 5:
logging.debug("Merged_version locked for %s. Attempt %s:" % (agent_name, str(counter)))
logging.info("Merged_version locked for %s. Attempt %s:" % (agent_name, str(counter)))
sleeptime.sleep(5)
counter = counter + 1
merge_and_post_process(parent_pk,message_to_merge, agent_name, counter)
else:
logging.debug("Merging failed for %s %s" % (agent_name, str(parent_pk)))
logging.info("Merging failed for %s %s" % (agent_name, str(parent_pk)))


if merged is not None:
logging.info('merged data for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id)))
Expand All @@ -726,6 +726,7 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0):
merged.code = code
merged.save()


def remove_blocked(mesg, data, blocklist=None):
try:
#logging.info("Getting the length of the dictionary in {} bytes".format(get_deep_size(data)))
Expand Down Expand Up @@ -953,53 +954,55 @@ def appraise(mesg,data, agent_name,retry_counter=0):
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
json_data = json.dumps(data)
logging.info('sending data for agent: %s to APPRAISER URL: %s' % (agent_name, APPRAISER_URL))
try:
with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r:
logging.info("Appraiser being called at: "+APPRAISER_URL)
logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code))
if r.status_code==200:
rj = r.json()
#for now, just update the whole message, but we could be more precise/efficient
logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id)))
data['message']['results']=rj['message']['results']
logging.info("Updating message with appraiser data complete for "+str(mesg.id))
else:
retry_counter +=1
logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter)))
logging.info("JSON fields "+str(json_data)[:100])
if retry_counter<3:
appraise(mesg,data, agent_name,retry_counter)
else:
logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise Exception
except Exception as e:
logging.error("Problem with appraiser for agent %s and pk %s of type %s" % (agent_name,str(mesg.id),type(e).__name__))
logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id)))
results = get_safe(data,"message","results")
default_ordering_component = {
"novelty": 0,
"confidence": 0,
"clinical_evidence": 0
}
if results is not None:
for result in results:
if 'ordering_components' not in result.keys():
result['ordering_components']=default_ordering_component
with tracer.start_as_current_span("get_appraisal") as span:
try:
with requests.post(APPRAISER_URL,data=json_data,headers=headers, stream=True) as r:
logging.info("Appraiser being called at: "+APPRAISER_URL)
logging.info('the response for agent %s to appraiser code is: %s' % (agent_name, r.status_code))
if r.status_code==200:
rj = r.json()
#for now, just update the whole message, but we could be more precise/efficient
logging.info("Updating message with appraiser data for agent %s and pk %s " % (agent_name, str(mesg.id)))
data['message']['results']=rj['message']['results']
logging.info("Updating message with appraiser data complete for "+str(mesg.id))
else:
continue
else:
logging.error('results returned from appraiser is None')
log_tuple =[
"Error in Appraiser "+ str(e),
datetime.now().strftime('%H:%M:%S'),
"ERROR"
]
add_log_entry(data,log_tuple)
mesg.save_compressed_dict(data)
mesg.status='E'
mesg.code = 422
mesg.save(update_fields=['status','code'])

retry_counter +=1
logging.info("Received Error state from appraiser for agent %s and pk %s Code %s Attempt %s" % (agent_name,str(mesg.id),str(r.status_code),str(retry_counter)))
logging.info("JSON fields "+str(json_data)[:100])
if retry_counter<3:
appraise(mesg,data, agent_name,retry_counter)
else:
logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise Exception
except Exception as e:
logging.error("Problem with appraiser for agent %s and pk %s of type %s" % (agent_name,str(mesg.id),type(e).__name__))
logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id)))
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
results = get_safe(data,"message","results")
default_ordering_component = {
"novelty": 0,
"confidence": 0,
"clinical_evidence": 0
}
if results is not None:
for result in results:
if 'ordering_components' not in result.keys():
result['ordering_components']=default_ordering_component
else:
continue
else:
logging.error('results returned from appraiser is None')
log_tuple =[
"Error in Appraiser "+ str(e),
datetime.now().strftime('%H:%M:%S'),
"ERROR"
]
add_log_entry(data,log_tuple)
mesg.save_compressed_dict(data)
mesg.status='E'
mesg.code = 422
mesg.save(update_fields=['status','code'])

def annotate_nodes(mesg,data,agent_name):
#TODO pull this URL from SmartAPI
Expand All @@ -1025,31 +1028,38 @@ def annotate_nodes(mesg,data,agent_name):
for key in invalid_nodes.keys():
del nodes_message['message']['knowledge_graph']['nodes'][key]


json_data = json.dumps(nodes_message)
try:
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
# with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile:
# outfile.write(json_data)
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
r.raise_for_status()
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
if r.status_code==200:
for key, value in rj.items():
if 'attributes' in value.keys() and value['attributes'] is not None:
for attribute in value['attributes']:
if attribute is not None:
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)

#Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness
if len(invalid_nodes)>0:
data['message']['knowledge_graph']['nodes'].update(invalid_nodes)
else:
post_processing_error(mesg,data,"Error in annotation of nodes")
except Exception as e:
logging.exception('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e)))
raise e
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
# with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile:
# outfile.write(json_data)
with tracer.start_as_current_span("annotator") as span:
try:
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
r.raise_for_status()
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
if r.status_code==200:
for key, value in rj.items():
if 'attributes' in value.keys() and value['attributes'] is not None:
for attribute in value['attributes']:
if attribute is not None:
add_attribute(data['message']['knowledge_graph']['nodes'][key],attribute)

#Not sure about adding back clearly borked nodes, but it is in keeping with policy of non-destructiveness
if len(invalid_nodes)>0:
data['message']['knowledge_graph']['nodes'].update(invalid_nodes)
else:
post_processing_error(mesg,data,"Error in annotation of nodes")
except Exception as e:
logging.info('node annotation internal error msg is for agent %s with pk: %s is %s' % (agent_name,str(mesg.pk),str(e)))
logging.exception("error in node annotation internal function")
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e
#else:
# with open(str(mesg.actor)+".json", "w") as outfile:
# outfile.write(json_data)
# post_processing_error(mesg,data,"Error in annotation of nodes")

def normalize_scores(data,key, agent_name):
res=get_safe(data,"message","results")
Expand Down Expand Up @@ -1209,9 +1219,15 @@ def canonize(curies):
"drug_chemical_conflate":True
}
logging.info('the normalizer_URL is %s' % NORMALIZER_URL)
r = requests.post(NORMALIZER_URL,json.dumps(j))
rj=r.json()
return rj
with tracer.start_as_current_span("get_normalized_node") as span:
try:
r = requests.post(NORMALIZER_URL,json.dumps(j))
rj=r.json()
return rj
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise


def canonizeResults(results):
Expand Down
9 changes: 7 additions & 2 deletions tr_sys/tr_sys/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from celery import Celery
from celery.schedules import crontab

from opentelemetry.instrumentation.celery import CeleryInstrumentor
# from celery.signals import worker_process_init
#
# @worker_process_init.connect(weak=False)
# def init_celery_tracing(*args, **kwargs):
# CeleryInstrumentor().instrument()

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tr_sys.settings')
Expand All @@ -20,11 +25,11 @@
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

CeleryInstrumentor().instrument()

app.conf.beat_schedule = {
#Excute the timeout fucntion every 3 min
Expand Down
52 changes: 52 additions & 0 deletions tr_sys/tr_sys/otel_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import logging
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME as telemetery_service_name_key, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from celery.signals import worker_process_init


def configure_opentelemetry():

#jaeger_host = os.environ.get('JAEGER_HOST', 'jaeger-otel-agent')
#jaeger_port = int(os.environ.get('JAEGER_PORT', '6831'))
logging.info('About to instrument ARS app for OTEL')
try:
jaeger_host= 'jaeger-otel-agent.sri'
jaeger_port= 6831
service_name= 'ARS'
resource = Resource.create({telemetery_service_name_key: service_name})

trace.set_tracer_provider(TracerProvider(resource=resource))

tracer_provider = trace.get_tracer_provider()

# Configure Jaeger Exporter
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_host,
agent_port=jaeger_port,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)

# Optional: Console exporter for debugging
console_exporter = ConsoleSpanExporter()
tracer_provider.add_span_processor(BatchSpanProcessor(console_exporter))

DjangoInstrumentor().instrument()
RequestsInstrumentor().instrument()

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()


logging.info('Finished instrumenting ARS app for OTEL')
except Exception as e:
logging.error('OTEL instrumentation failed because: %s'%str(e))
2 changes: 2 additions & 0 deletions tr_sys/tr_sys/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"""

import os
from .otel_config import configure_opentelemetry

configure_opentelemetry()
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

Expand Down

0 comments on commit 35cb272

Please sign in to comment.