Skip to content

Commit

Permalink
Merge pull request #642 from NCATSTranslator/annotator_fail_code
Browse files Browse the repository at this point in the history
modified error handling for node annotator
  • Loading branch information
ShervinAbd92 authored Jul 15, 2024
2 parents e7b37d4 + 9429b06 commit 9e17f00
Showing 1 changed file with 135 additions and 97 deletions.
232 changes: 135 additions & 97 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,93 +552,126 @@ def pre_merge_process(data,key, agent_name,inforesid):
logging.exception("Error in ARS score normalization")
raise e

def post_process(mesg,key, agent_name):

def post_process(data,key, agent_name):
code =200
mesg = get_object_or_404(Message.objects.filter(pk=key))
data = mesg.decompress_dict()
logging.info("Pre node annotation for agent %s pk: %s" % (agent_name, str(key)))
try:
annotate_nodes(mesg,data,agent_name)
logging.info("node annotation successful for agent %s and pk: %s" % (agent_name, str(key)))
except Exception as e:
post_processing_error(mesg,data,"Error in annotation of nodes")
logging.error("Error with node annotations for "+str(key))
logging.exception("problem with node annotation post process function")
raise e
status='E'
code=444
log_tuple =[
f'node annotation internal error: {str(e)}',
datetime.now().strftime('%H:%M:%S'),
"DEBUG"
]
add_log_entry(data,log_tuple)
logging.exception(f"problem with node annotation for agent: {agent_name} pk: {str(key)}")
mesg.status=status
mesg.code=code
mesg.save()

logging.info("pre scrub null for agent %s and pk %s" % (agent_name, str(key)))
try:
scrub_null_attributes(data)
except Exception as e:
logging.info("Problem with the second scrubbing of null attributes")
status='E'
code=444
logging.exception(f"Problem with the second scrubbing of null attributes for agent: {agent_name} pk: {str(key)}")
post_processing_error(mesg,data,"Error in second scrubbing of null attributes")
log_tuple =[
"Error in second scrubbing of null attributes",
datetime.now().strftime('%H:%M:%S'),
"DEBUG"
]
add_log_entry(data,log_tuple)
mesg.status=status
mesg.code=code
mesg.save()
logging.info("pre blocklist for "+str(key))
try:
remove_blocked(mesg, data)
except Exception as e:
status='E'
code=444
logging.info(e.__cause__)
logging.info("Problem with block list removal")
logging.exception(f"Problem with block list removal for agent: {agent_name} pk: {str(key)}")
mesg.status=status
mesg.code=code
mesg.save()

logging.info("pre appraiser for agent %s and pk %s" % (agent_name, str(key)))
try:
appraise(mesg,data,agent_name)
logging.info("appraiser successful for agent %s and pk %s" % (agent_name, str(key)))
except Exception as e:
code = 422
results = get_safe(data,"message","results")
default_ordering_component = {
"novelty": 0,
"confidence": 0,
"clinical_evidence": 0
}
if results is not None:
for result in results:
if 'ordering_components' not in result.keys():
result['ordering_components']=default_ordering_component
else:
continue
else:
logging.error('results returned from appraiser is None')
logging.ERROR("appraiser failed mesg for agent %s is %s: %s"% (agent_name, mesg.code, mesg.status))

#post_processing_error(mesg,data,"Error in appraiser")
logging.error("Error with appraise for "+str(key))
logging.exception("Error in appraiser post process function")
#raise e
try:
results = get_safe(data,"message","results")
if results is not None:
logging.info("+++ pre-scoring for agent: %s & pk: %s" % (agent_name, key))
new_res=scoring.compute_from_results(results)
data['message']['results']=new_res
logging.info("scoring succeeded for agent %s and pk %s" % (agent_name, key))
else:
logging.error('results from appraiser returns None, cant do the scoring')
print()
except Exception as e:
post_processing_error(mesg,data,"Error in f-score calculation")
logging.exception("Error in f-score calculation")
raise e
if mesg.code == 422:
return mesg, mesg.code, mesg.status
else:
try:
results = get_safe(data,"message","results")
if results is not None:
logging.info("+++ pre-scoring for agent: %s & pk: %s" % (agent_name, key))
new_res=scoring.compute_from_results(results)
data['message']['results']=new_res
logging.info("scoring succeeded for agent %s and pk %s" % (agent_name, key))
else:
logging.error('results from appraiser returns None, cant do the scoring')
print()
except Exception as e:
status='E'
code = 422
mesg.save(update_fields=['status','code'])
log_tuple =[
"Error in f-score calculation: "+ str(e),
datetime.now().strftime('%H:%M:%S'),
"ERROR"
]
add_log_entry(data,log_tuple)
logging.exception("Error in f-score calculation")
mesg.save_compressed_dict(data)
return mesg, code, status

try:
mesg.result_count = len(new_res)
mesg.result_stat = ScoreStatCalc(new_res)
logging.info("scoring stat calculation succeeded for agent %s and pk %s" % (agent_name, key))
except Exception as e:
logging.exception("Error in ScoreStatCalculation or result count")
post_processing_error(mesg,data,"Error in score stat calculation")
log_tuple =[
"Error in score stat calculation",
datetime.now().strftime('%H:%M:%S'),
"DEBUG"
]
add_log_entry(data,log_tuple)
status ='E'
code=400
mesg.save_compressed_dict(data)
return mesg, code, status

try:
mesg.save_compressed_dict(data)
logging.info("Time before save")
logging.info('the mesg before save code: %s and status: %s'%(mesg.code, mesg.status))
with transaction.atomic():
if mesg.code == 202:
code = 200
status='D'
mesg.code=code
mesg.status=status
mesg.save()
logging.info("Time after save")

except DatabaseError as e:
status ='E'
code=422
logging.error("Final save failed")
return mesg, code, status

try:
mesg.result_count = len(new_res)
mesg.result_stat = ScoreStatCalc(new_res)
except Exception as e:
logging.exception("Error in ScoreStatCalculation or result count")
raise e
mesg.status ='E'
mesg.code=400
mesg.save(update_fields=['status','code'])
try:
mesg.status='D'
mesg.code=200
mesg.save_compressed_dict(data)
logging.info("Time before save")
with transaction.atomic():
mesg.save()
logging.info("Time after save")
except DatabaseError as e:
mesg.status ='E'
mesg.code=422
logging.error("Final save failed")
mesg.save(update_fields=['status','code'])

def lock_merge(message):
pass
if message.merge_semaphore is True:
Expand All @@ -650,7 +683,7 @@ def lock_merge(message):

@shared_task(name="merge_and_post_process")
def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0):

merged=None
logging.info(f"Starting merge for %s with parent PK: %s"% (agent_name,parent_pk))
logging.info(f"Before atomic transaction for %s with parent PK: %s"% (agent_name,parent_pk))
with transaction.atomic():
Expand Down Expand Up @@ -684,23 +717,13 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name, counter=0):
else:
logging.info("Merging failed for %s %s" % (agent_name, str(parent_pk)))

if merged:
if merged is not None:
try:
logging.info('merged data for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id)))
merged_data = merged.decompress_dict()
post_process(merged_data,merged.id, agent_name)
logging.info('post processing complete for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id)))

except Exception as e:
logging.info("Problem with post processing for agent %s pk: %s " % (agent_name, (parent_pk)))
logging.info(e, exc_info=True)
logging.info('error message %s' % str(e))
merged.status='E'
merged.code = 422
merged.save()
else:
pass
if merged is not None:
logging.info('merged data for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id)))
merged, code, status = post_process(merged,merged.id, agent_name)
logging.info('post processing complete for agent %s with pk %s is returned & ready to be preprocessed' % (agent_name, str(merged.id)))
merged.status = status
merged.code = code
merged.save()

def remove_blocked(mesg, data, blocklist=None):
try:
Expand Down Expand Up @@ -949,15 +972,37 @@ def appraise(mesg,data, agent_name,retry_counter=0):
else:
logging.error("3 consecutive Errors from appraise for agent %s and pk %s " % (agent_name,str(mesg.id)))
raise Exception

except Exception as e:
logging.error("Problem with appraiser for agent %s and pk %s " % (agent_name,str(mesg.id)))
logging.error(type(e).__name__)
logging.error(e.args)
logging.error("Problem with appraiser for agent %s and pk %s of type %s" % (agent_name,str(mesg.id),type(e).__name__))
logging.error("Adding default ordering_components for agent %s and pk %s " % (agent_name,str(mesg.id)))
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e

results = get_safe(data,"message","results")
default_ordering_component = {
"novelty": 0,
"confidence": 0,
"clinical_evidence": 0
}
if results is not None:
for result in results:
if 'ordering_components' not in result.keys():
result['ordering_components']=default_ordering_component
else:
continue
else:
logging.error('results returned from appraiser is None')
log_tuple =[
"Error in Appraiser "+ str(e),
datetime.now().strftime('%H:%M:%S'),
"ERROR"
]
add_log_entry(data,log_tuple)
mesg.save_compressed_dict(data)
mesg.status='E'
mesg.code = 422
mesg.save(update_fields=['status','code'])


def annotate_nodes(mesg,data,agent_name):
#TODO pull this URL from SmartAPI
Expand Down Expand Up @@ -985,8 +1030,6 @@ def annotate_nodes(mesg,data,agent_name):

json_data = json.dumps(nodes_message)
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
# with open(str(mesg.pk)+'_'+agent_name+"_KG_nodes_annotator.json", "w") as outfile:
# outfile.write(json_data)
with tracer.start_as_current_span("annotator") as span:
try:
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
Expand All @@ -1011,11 +1054,6 @@ def annotate_nodes(mesg,data,agent_name):
span.set_attribute("error", True)
span.set_attribute("exception", str(e))
raise e
#else:
# with open(str(mesg.actor)+".json", "w") as outfile:
# outfile.write(json_data)
# post_processing_error(mesg,data,"Error in annotation of nodes")


def normalize_scores(data,key, agent_name):
res=get_safe(data,"message","results")
Expand Down Expand Up @@ -1085,7 +1123,7 @@ def post_processing_error(mesg,data,text):
mesg.code = 206
log_tuple=[text,
(mesg.updated_at).strftime('%H:%M:%S'),
"WARNING"]
"DEBUG"]
logging.info(f'the log_tuple is %s'% log_tuple)
add_log_entry(data,log_tuple)

Expand Down

0 comments on commit 9e17f00

Please sign in to comment.