From 340d2252275e73801de515ab03813557b0496e15 Mon Sep 17 00:00:00 2001 From: James Kent Date: Tue, 21 Nov 2023 14:38:20 -0600 Subject: [PATCH] Ref/speed up ingestion (#616) * preload more attributes * wip: speed up PUT * add sqltap profiling agsi * do not update has_coordinates or has_images if irrelevant attribute updated * make openapi more permissive and style * remove unused import * be more selective when updating has_coordinates and has_images * refactor how records are looked up * preload analyses * handle loading of annotations * preload the correct attributes for annotations * catch more custom annotation loading * fix annotation loading attempt #1 * attempt #2 * attempt #3 * reassign q * remove extraneous command, and load studyset * style fixed * comment out unused bits --- store/neurostore/core.py | 51 +++++++- store/neurostore/models/data.py | 3 +- store/neurostore/models/event_listeners.py | 81 +++++++++++-- store/neurostore/openapi | 2 +- store/neurostore/resources/base.py | 116 ++++++++++++++----- store/neurostore/resources/data.py | 113 +++++++++++++++--- store/neurostore/resources/nested.py | 8 +- store/neurostore/schemas/data.py | 3 +- store/neurostore/tests/api/test_studies.py | 4 + store/neurostore/tests/api/test_studysets.py | 41 +++++++ store/neurostore/tests/conftest.py | 4 + 11 files changed, 361 insertions(+), 65 deletions(-) diff --git a/store/neurostore/core.py b/store/neurostore/core.py index 804a4f8ee..d90f04ebb 100644 --- a/store/neurostore/core.py +++ b/store/neurostore/core.py @@ -1,6 +1,5 @@ import os from pathlib import Path -from werkzeug.middleware.profiler import ProfilerMiddleware from connexion.middleware import MiddlewarePosition from starlette.middleware.cors import CORSMiddleware @@ -10,11 +9,45 @@ # from connexion.json_schema import default_handlers as json_schema_handlers from connexion.resolver import MethodResolver from flask_caching import Cache -import sqltap.wsgi from .or_json import ORJSONDecoder, ORJSONEncoder from .database import init_db +# from datetime import datetime + +# import sqltap.wsgi +# import sqltap +# import yappi + +# class SQLTapMiddleware: +# def __init__(self, app): +# self.app = app + +# async def __call__(self, scope, receive, send): +# profiler = sqltap.start() +# await self.app(scope, receive, send) +# statistics = profiler.collect() +# sqltap.report(statistics, "report.txt", report_format="text") + + +# class LineProfilerMiddleware: +# def __init__(self, app): +# self.app = app + +# async def __call__(self, scope, receive, send): +# yappi.start() +# await self.app(scope, receive, send) +# yappi.stop() +# filename = ( +# scope["path"].lstrip("/").rstrip("/").replace("/", "-") +# + "-" +# + scope["method"].lower() +# + str(datetime.now()) +# + ".prof" +# ) +# stats = yappi.get_func_stats() +# stats.save(filename, type="pstat") + connexion_app = connexion.FlaskApp(__name__, specification_dir="openapi/") @@ -45,6 +78,16 @@ allow_headers=["*"], ) +# add sqltap +# connexion_app.add_middleware( +# SQLTapMiddleware, +# ) + +# add profiling +# connexion_app.add_middleware( +# LineProfilerMiddleware +# ) + connexion_app.add_api( openapi_file, base_path="/api", @@ -68,9 +111,5 @@ }, ) -if app.debug: - app.wsgi_app = sqltap.wsgi.SQLTapMiddleware(app.wsgi_app, path="/api/__sqltap__") - app = ProfilerMiddleware(app) - app.json_encoder = ORJSONEncoder app.json_decoder = ORJSONDecoder diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index da23f355f..b309131ba 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -149,8 +149,7 @@ class BaseStudy(BaseMixin, db.Model): user = relationship("User", backref=backref("base_studies")) # retrieve versions of same study - versions = relationship( - "Study", backref=backref("base_study")) + versions = relationship("Study", backref=backref("base_study")) def update_has_images_and_points(self): # Calculate has_images and has_coordinates for the BaseStudy diff --git a/store/neurostore/models/event_listeners.py b/store/neurostore/models/event_listeners.py index 8b3a6bd4b..107dbc49d 100644 --- a/store/neurostore/models/event_listeners.py +++ b/store/neurostore/models/event_listeners.py @@ -1,10 +1,13 @@ from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import inspect +from sqlalchemy.orm import joinedload from flask_sqlalchemy.session import Session from sqlalchemy import event from .data import ( AnnotationAnalysis, Annotation, Studyset, + StudysetStudy, BaseStudy, Study, Analysis, @@ -12,6 +15,7 @@ Image, _check_type, ) + from ..database import db @@ -64,6 +68,27 @@ def create_blank_notes(studyset, annotation, initiator): def add_annotation_analyses_studyset(studyset, studies, collection_adapter): + if not (inspect(studyset).pending or inspect(studyset).transient): + studyset = ( + Studyset.query.filter_by(id=studyset.id) + .options( + joinedload(Studyset.studies).options(joinedload(Study.analyses)), + joinedload(Studyset.annotations), + ) + .one() + ) + all_studies = set(studyset.studies + studies) + existing_studies = [ + s for s in all_studies if not (inspect(s).pending or inspect(s).transient) + ] + study_query = ( + Study.query.filter(Study.id.in_([s.id for s in existing_studies])) + .options(joinedload(Study.analyses)) + .all() + ) + + all_studies.union(set(study_query)) + all_analyses = [analysis for study in studies for analysis in study.analyses] existing_analyses = [ analysis for study in studyset.studies for analysis in study.analyses @@ -91,6 +116,17 @@ def add_annotation_analyses_studyset(studyset, studies, collection_adapter): def add_annotation_analyses_study(study, analyses, collection_adapter): + if not (inspect(study).pending or inspect(study).transient): + study = ( + Study.query.filter_by(id=study.id) + .options( + joinedload(Study.analyses), + joinedload(Study.studyset_studies) + .joinedload(StudysetStudy.studyset) + .joinedload(Studyset.annotations), + ) + .one() + ) new_analyses = set(analyses) - set([a for a in study.analyses]) all_annotations = set( @@ -150,14 +186,31 @@ def get_nested_attr(obj, nested_attr): def get_base_study(obj): base_study = None + if isinstance(obj, (Point, Image)): - base_study = get_nested_attr(obj, "analysis.study.base_study") - if isinstance(obj, Analysis): - base_study = get_nested_attr(obj, "study.base_study") - if isinstance(obj, Study): - base_study = obj.base_study - if isinstance(obj, BaseStudy): - base_study = obj + if obj in session.new or session.deleted: + base_study = get_nested_attr(obj, "analysis.study.base_study") + elif isinstance(obj, Analysis): + relevant_attrs = ("study", "points", "images") + for attr in relevant_attrs: + attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history") + if attr_history.added or attr_history.deleted: + base_study = get_nested_attr(obj, "study.base_study") + break + elif isinstance(obj, Study): + relevant_attrs = ("base_study", "analyses") + for attr in relevant_attrs: + attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history") + if attr_history.added or attr_history.deleted: + base_study = obj.base_study + break + elif isinstance(obj, BaseStudy): + relevant_attrs = ("versions",) + for attr in relevant_attrs: + attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history") + if attr_history.added or attr_history.deleted: + base_study = obj + break return base_study @@ -169,4 +222,18 @@ def get_base_study(obj): # Update the has_images and has_points for each unique BaseStudy for base_study in unique_base_studies: + if ( + inspect(base_study).attrs.versions.history.added + and base_study.has_coordinates is True + and base_study.has_images is True + ): + continue + + if ( + inspect(base_study).attrs.versions.history.deleted + and base_study.has_coordinates is False + and base_study.has_images is False + ): + continue + base_study.update_has_images_and_points() diff --git a/store/neurostore/openapi b/store/neurostore/openapi index f69ab2ef7..aa3c8a874 160000 --- a/store/neurostore/openapi +++ b/store/neurostore/openapi @@ -1 +1 @@ -Subproject commit f69ab2ef79c26a20dc8922a189e95048a64493be +Subproject commit aa3c8a87463cef0e6ef7fa09e064c0be455ca93d diff --git a/store/neurostore/resources/base.py b/store/neurostore/resources/base.py index 322f6fc57..a16898156 100644 --- a/store/neurostore/resources/base.py +++ b/store/neurostore/resources/base.py @@ -72,8 +72,18 @@ def post_nested_record_update(record): """ return record + def after_update_or_create(self, record): + """ + Processing of a record after updating or creating (defined in specific classes). + """ + return record + @classmethod - def update_or_create(cls, data, id=None, commit=True): + def load_nested_records(cls, data, record=None): + return data + + @classmethod + def update_or_create(cls, data, id=None, user=None, record=None, commit=True): """ scenerios: 1. cloning a study @@ -91,7 +101,7 @@ def update_or_create(cls, data, id=None, commit=True): # Store all models so we can atomically update in one commit to_commit = [] - current_user = get_current_user() + current_user = user or get_current_user() if not current_user: current_user = create_user() @@ -104,31 +114,35 @@ def update_or_create(cls, data, id=None, commit=True): # allow compose bot to make changes compose_bot = current_app.config["COMPOSE_AUTH0_CLIENT_ID"] + "@clients" - if id is None: + if id is None and record is None: record = cls._model() record.user = current_user - else: + elif record is None: record = cls._model.query.filter_by(id=id).first() if record is None: abort(422) - elif ( - record.user_id != current_user.external_id - and not only_ids - and current_user.external_id != compose_bot - ): - abort(403) - elif only_ids: - to_commit.append(record) - - if commit: - db.session.add_all(to_commit) - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - abort(400) - - return record + + data = cls.load_nested_records(data, record) + + if ( + not sa.inspect(record).pending + and record.user != current_user + and not only_ids + and current_user.external_id != compose_bot + ): + abort(403) + elif only_ids: + to_commit.append(record) + + if commit: + db.session.add_all(to_commit) + try: + db.session.commit() + except SQLAlchemyError: + db.session.rollback() + abort(400) + + return record # Update all non-nested attributes for k, v in data.items(): @@ -151,7 +165,13 @@ def update_or_create(cls, data, id=None, commit=True): } else: query_args = {"id": v["id"]} - v = LnCls._model.query.filter_by(**query_args).first() + + if v.get("preloaded_data"): + v = v["preloaded_data"] + else: + q = LnCls._model.query.filter_by(**query_args) + v = q.first() + if v is None: abort(400) @@ -171,13 +191,40 @@ def update_or_create(cls, data, id=None, commit=True): ResCls = getattr(viewdata, res_name) if data.get(field) is not None: if isinstance(data.get(field), list): - nested = [ - ResCls.update_or_create(rec, commit=False) - for rec in data.get(field) - ] + nested = [] + for rec in data.get(field): + id = None + if isinstance(rec, dict) and rec.get("id"): + id = rec.get("id") + elif isinstance(rec, str): + id = rec + if data.get("preloaded_studies") and id: + nested_record = data["preloaded_studies"].get(id) + else: + nested_record = None + nested.append( + ResCls.update_or_create( + rec, + user=current_user, + record=nested_record, + commit=False, + ) + ) to_commit.extend(nested) else: - nested = ResCls.update_or_create(data.get(field), commit=False) + id = None + rec = data.get(field) + if isinstance(rec, dict) and rec.get("id"): + id = rec.get("id") + elif isinstance(rec, str): + id = rec + if data.get("preloaded_studies") and id: + nested_record = data["preloaded_studies"].get(id) + else: + nested_record = None + nested = ResCls.update_or_create( + rec, user=current_user, record=nested_record, commit=False + ) to_commit.append(nested) setattr(record, field, nested) @@ -298,7 +345,15 @@ def get(self, id): q = self._model.query if args["nested"] or self._model is Annotation: q = q.options(nested_load(self)) - + if self._model is Annotation: + q = q.options( + joinedload(Annotation.annotation_analyses).options( + joinedload(AnnotationAnalysis.analysis), + joinedload(AnnotationAnalysis.studyset_study).options( + joinedload(StudysetStudy.study) + ), + ) + ) record = q.filter_by(id=id).first_or_404() if self._model is Studyset and args["nested"]: snapshot = StudysetSnapshot() @@ -319,6 +374,7 @@ def put(self, id): with db.session.no_autoflush: record = self.__class__.update_or_create(data, id) + record = self.after_update_or_create(record) # clear relevant caches clear_cache(self.__class__, record, request.path) @@ -481,6 +537,8 @@ def post(self): with db.session.no_autoflush: record = self.__class__.update_or_create(data) + record = self.after_update_or_create(record) + # clear the cache for this endpoint clear_cache(self.__class__, record, request.path) diff --git a/store/neurostore/resources/data.py b/store/neurostore/resources/data.py index 67f00e8f3..495848a06 100644 --- a/store/neurostore/resources/data.py +++ b/store/neurostore/resources/data.py @@ -16,6 +16,7 @@ from ..models import ( Studyset, Study, + Annotation, Analysis, AnalysisConditions, AnnotationAnalysis, @@ -73,6 +74,31 @@ class StudysetsView(ObjectView, ListView): _multi_search = ("name", "description") _search_fields = ("name", "description", "publication", "doi", "pmid") + @classmethod + def load_nested_records(cls, data, record=None): + if not data or not data.get("studies"): + return data + studies = data.get("studies") + existing_studies = [] + for s in studies: + if isinstance(s, dict) and s.get("id"): + existing_studies.append(s.get("id")) + elif isinstance(s, str): + existing_studies.append(s) + study_results = ( + Study.query.filter(Study.id.in_(existing_studies)) + .options( + joinedload(Study.analyses), + joinedload(Study.user), + ) + .all() + ) + study_dict = {s.id: s for s in study_results} + # Modification of data in place + if study_dict: + data["preloaded_studies"] = study_dict + return data + def view_search(self, q, args): # check if results should be nested nested = True if args.get("nested") else False @@ -100,6 +126,53 @@ class AnnotationsView(ObjectView, ListView): _multi_search = ("name", "description") _search_fields = ("name", "description") + @classmethod + def load_nested_records(cls, data, record=None): + if not data: + return data + + studyset_id = data.get("studyset", {}).get("id") + if not studyset_id: + return data + q = Studyset.query.filter_by(id=studyset_id) + q = q.options( + joinedload(Studyset.studyset_studies) + .joinedload(StudysetStudy.study) + .joinedload(Study.analyses) + ) + studyset = q.first() + data["studyset"]["preloaded_data"] = studyset + studyset_studies = { + (s.studyset_id, s.study_id): s for s in studyset.studyset_studies + } + analyses = { + a.id: a for s in studyset_studies.values() for a in s.study.analyses + } + for aa in data.get("annotation_analyses", []): + analysis = analyses.get(aa.get("analysis").get("id")) + if analysis: + aa["analysis"]["preloaded_data"] = analysis + studyset_study = studyset_studies.get( + (studyset.id, aa.get("studyset_study").get("study").get("id")) + ) + if studyset_study: + aa["studyset_study"]["preloaded_data"] = studyset_study + return data + + def after_update_or_create(self, record): + q = Annotation.query.filter_by(id=record.id) + q = q.options(nested_load(self)) + q = q.options( + joinedload(Annotation.studyset), + joinedload(Annotation.annotation_analyses).options( + joinedload(AnnotationAnalysis.analysis), + joinedload(AnnotationAnalysis.studyset_study).options( + joinedload(StudysetStudy.study) + ), + ), + ) + return q.first() + def view_search(self, q, args): q = q.options(nested_load(self)) @@ -213,26 +286,32 @@ def post(self): # in the list scenerio, try to find an existing record # then return the best version and return that study id data = parser.parse(self.__class__._schema(many=True), request) - search_keys = ["pmid", "doi", "name"] base_studies = [] to_commit = [] - for study_data in data: - filter_params = { - k: study_data.get(k) for k in search_keys if study_data.get(k) - } - if "name" in filter_params and (set(filter_params) - {"name"}) != set(): - del filter_params["name"] - - record = ( - BaseStudy.query.filter_by(**filter_params) - .options( - joinedload(BaseStudy.versions) - .joinedload(Study.studyset_studies) - .joinedload(StudysetStudy.studyset) - ) - .one_or_none() + pmids = [sd["pmid"] for sd in data if sd.get("pmid")] + dois = [sd["doi"] for sd in data if sd.get("doi")] + names = [sd["name"] for sd in data if sd.get("name")] + results = ( + BaseStudy.query.filter( + (BaseStudy.doi.in_(dois)) + | (BaseStudy.pmid.in_(pmids)) + | (BaseStudy.name.in_(names)) ) - + .options( + joinedload(BaseStudy.versions).options( + joinedload(Study.studyset_studies).joinedload( + StudysetStudy.studyset + ), + joinedload(Study.user), + ), + joinedload(BaseStudy.user), + ) + .all() + ) + hashed_results = {(bs.doi or "") + (bs.pmid or ""): bs for bs in results} + for study_data in data: + lookup_hash = study_data.get("doi", "") + study_data.get("pmid", "") + record = hashed_results.get(lookup_hash) if record is None: with db.session.no_autoflush: record = self.__class__.update_or_create(study_data, commit=False) diff --git a/store/neurostore/resources/nested.py b/store/neurostore/resources/nested.py index 8d7b1cc8a..39ec4c295 100644 --- a/store/neurostore/resources/nested.py +++ b/store/neurostore/resources/nested.py @@ -6,12 +6,14 @@ from . import data -def nested_load(view, options=None): +def nested_load(view, options=None, include_linked=False): """ SQL: Change lazy loading to eager loading when accessing all nested attributes. """ nested_keys = list(view._nested.keys()) + if include_linked: + nested_keys.extend(view._linked.keys()) if "entities" in nested_keys: nested_keys.remove("entities") if len(nested_keys) == 1: @@ -25,7 +27,9 @@ def nested_load(view, options=None): elif len(nested_keys) > 1: nested_loads = [] for k in nested_keys: - nested_view = getattr(data, view._nested[k]) + nested_view = getattr(data, view._nested.get(k, ""), None) or getattr( + data, view._linked.get(k, "") + ) if nested_view._nested: nested_loads.append( nested_load(nested_view, subqueryload(getattr(view._model, k))) diff --git a/store/neurostore/schemas/data.py b/store/neurostore/schemas/data.py index 2116eb685..aa043b8b1 100644 --- a/store/neurostore/schemas/data.py +++ b/store/neurostore/schemas/data.py @@ -62,7 +62,8 @@ def _serialize(self, value, attr, obj, **ser_kwargs): "has_images", ] nested_schema = self.nested( - context=self.context, only=info_fields, exclude=[] + context=self.context, + only=info_fields, ) return nested_schema.dump(value, many=self.many) else: diff --git a/store/neurostore/tests/api/test_studies.py b/store/neurostore/tests/api/test_studies.py index e2bbbfda1..2a07f7d68 100644 --- a/store/neurostore/tests/api/test_studies.py +++ b/store/neurostore/tests/api/test_studies.py @@ -148,6 +148,10 @@ def test_post_studies(auth_client, ingest_neurosynth, session): auth_client.post("/api/studies/", data=my_study) + my_second_study = {"name": "asdfasfa", "pmid": "100000", "doi": "asdf;lds"} + + auth_client.post("/api/studies/", data=my_second_study) + def test_delete_studies(auth_client, ingest_neurosynth, session): study_db = Study.query.first() diff --git a/store/neurostore/tests/api/test_studysets.py b/store/neurostore/tests/api/test_studysets.py index 40ddd8c8d..a72e975c9 100644 --- a/store/neurostore/tests/api/test_studysets.py +++ b/store/neurostore/tests/api/test_studysets.py @@ -1,3 +1,5 @@ +import random +import string from neurostore.models import Studyset, Study @@ -21,6 +23,44 @@ def test_post_and_get_studysets(auth_client, ingest_neurosynth, session): ) +# @add_event_listeners +def test_add_many_studies_to_studyset(auth_client, ingest_neurosynth, session): + existing_studies = Study.query.all() + existing_study_ids = [s.id for s in existing_studies] + + # Function to generate a random DOI + def generate_doi(): + doi = "10." + "".join(random.choices(string.digits, k=4)) + "/" + doi += "".join(random.choices(string.ascii_lowercase, k=4)) + "." + doi += "".join(random.choices(string.ascii_lowercase, k=4)) + return doi + + # List comprehension to generate the desired structure + made_up_studies = [ + { + "pmid": str(random.randint(100000, 999999)), + "doi": generate_doi(), + "name": "".join(random.choices(string.ascii_letters, k=10)), + } + for _ in range(100) + ] + # create empty studyset + ss = auth_client.post("/api/studysets/", data={"name": "mixed_studyset"}) + + assert ss.status_code == 200 + + ss_id = ss.json()["id"] + + # combine made_up and created studies + all_studies = existing_study_ids + made_up_studies + + ss_update = auth_client.put( + f"/api/studysets/{ss_id}", data={"studies": all_studies} + ) + + assert ss_update.status_code == 200 + + def test_add_study_to_studyset(auth_client, ingest_neurosynth, session): payload = auth_client.get("/api/studies/").json() study_ids = [study["id"] for study in payload["results"]] @@ -62,6 +102,7 @@ def test_hot_swap_study_in_studyset(auth_client, ingest_neurosynth, session): # create studyset create_ss = auth_client.post("/api/studysets/", data={"name": "test"}) + assert create_ss.status_code == 200 ss_test = create_ss.json()["id"] # cache studyset endpoint auth_client.get(f"/api/studysets/{ss_test}") diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index 8558fd0c7..2073af332 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -20,6 +20,10 @@ from auth0.v3.authentication import GetToken import shortuuid +import logging + +LOGGER = logging.getLogger(__name__) + """ Test selection arguments