diff --git a/mtphandler/__init__.py b/mtphandler/__init__.py new file mode 100644 index 0000000..1f468a1 --- /dev/null +++ b/mtphandler/__init__.py @@ -0,0 +1,8 @@ +import json # noqa +import os # noqa + +from .mtp_logging import configure_logger +from .plate_manager import PlateManager # noqa + + +configure_logger() diff --git a/mtphandler/ioutils/__init__.py b/mtphandler/ioutils/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/mtphandler/ioutils/__init__.py @@ -0,0 +1 @@ + diff --git a/mtphandler/ioutils/calipytion.py b/mtphandler/ioutils/calipytion.py new file mode 100644 index 0000000..673a899 --- /dev/null +++ b/mtphandler/ioutils/calipytion.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import numpy as np +from calipytion import Calibrator +from calipytion.model import Sample, Standard, UnitDefinition + +from mtphandler.model import Plate, Well +from mtphandler.molecule import Molecule +from mtphandler.tools import ( + get_measurement, + get_species_condition, + measurement_is_blanked_for, + well_contains_species, +) + + +def _get_standard_wells( + plate: Plate, + protein_ids: list[str], + molecule: Molecule, + wavelength: float, +) -> list[Well]: + # Subset of wells, that contain specified species, do not contain a protein, and are blanked + + # get wells with only one component, that does not contribute to the signal + buffer_blank_wells = [] + standard_wells = [] + for well in plate.wells: + measurement = get_measurement(well, wavelength) + + # get all wells with one init condition that has a concentration grater than 0 + int_concs_creater_than_zero = [ + condition for condition in well.init_conditions if condition.init_conc > 0 + ] + + if len(int_concs_creater_than_zero) == 1: + buffer_blank_wells.append(well) + print("found buffer blank well", well.id) + + if not well_contains_species(well, molecule.id, conc_above_zero=True): + continue + + if any( + [ + well_contains_species(well, catalyst_id, conc_above_zero=True) + for catalyst_id in protein_ids + ] + ): + continue + + if measurement_is_blanked_for(measurement, molecule.id): + standard_wells.append(well) + + # Add wells with zero concentration to standard wells + if all( + [ + blank_state.contributes_to_signal is False + for blank_state in measurement.blank_states + ] + ): + standard_wells.append(well) + + print("found standard wells", len(standard_wells)) + + return standard_wells + buffer_blank_wells + + +def map_to_standard( + plate: Plate, + molecule: Molecule, + protein_ids: list[str], + wavelength: float, +) -> Standard: + standard_wells = _get_standard_wells( + plate=plate, + protein_ids=protein_ids, + molecule=molecule, + wavelength=wavelength, + ) + + # Map wells to samples of a standard + samples = [] + phs = [] + for well in standard_wells: + condition = get_species_condition(well, molecule.id) + measurement = get_measurement(well, wavelength) + + samples.append( + Sample( + # id=well.id, + concentration=condition.init_conc, + conc_unit=UnitDefinition(**condition.conc_unit.model_dump()), + signal=float(np.nanmean(measurement.absorption)), + ) + ) + phs.append(well.ph) + + # Check if all samples have the same pH + if not all([ph == phs[0] for ph in phs]): + raise ValueError( + f"Samples of standard {molecule.name} have different pH values: {phs}" + ) + ph = phs[0] + + temp_unit = UnitDefinition(**plate.temperature_unit.model_dump()) + + # Create standard + return Standard( + molecule_id=molecule.id, + molecule_symbol=molecule.id, + pubchem_cid=molecule.pubchem_cid, + molecule_name=molecule.name, + wavelength=wavelength, + samples=samples, + ph=ph, + temperature=plate.temperatures[0], + temp_unit=temp_unit, + ) + + +def initialize_calibrator( + plate: Plate, + wavelength: float, + molecule: Molecule, + protein_ids: list[str], + cutoff: float | None = None, +) -> Calibrator: + """ + Initialize a calibrator for a given species. + + Args: + plate (Plate): Plate with the wells. + wavelength (float): Wavelength of the measurements. + molecule (Molecule): Molecule to calibrate. + protein_ids (list[str]): IDs of the proteins that catalyze the reaction. + cutoff (float | None): Cutoff for the calibration. Calibration samples with + a signal above the cutoff are ignored. + """ + standard = map_to_standard( + plate=plate, + protein_ids=protein_ids, + molecule=molecule, + wavelength=wavelength, + ) + + return Calibrator.from_standard(standard, cutoff=cutoff) diff --git a/mtphandler/ioutils/pyenzyme.py b/mtphandler/ioutils/pyenzyme.py new file mode 100644 index 0000000..0e47f4e --- /dev/null +++ b/mtphandler/ioutils/pyenzyme.py @@ -0,0 +1,396 @@ +import numpy as np +import pyenzyme as pe +from calipytion.tools.calibrator import Calibrator +from loguru import logger +from pyenzyme.model import DataTypes +from pyenzyme.model import UnitDefinition as EnzML_UnitDef + +from mtphandler.model import InitCondition, PhotometricMeasurement, Plate, Well +from mtphandler.molecule import Molecule, Protein + + +class Plate_to_EnzymeMLDocument: + """Converts a Plate object along with associated molecules and proteins to an EnzymeMLDocument. + If `to_concentration=True`, the absorption data is converted to concentration data using the calibrator. + + + Raises: + ValueError: If the pH of a well is not defined. + ValueError: If no measurements were added to EnzymeML. + + Returns: + EnzymeMLDocument: The EnzymeMLDocument containing the converted data. + """ + + def __init__( + self, + name: str, + plate: Plate, + well_ids: list[str] | None, + molecules: list[Molecule], + detected_molecule: Molecule, + proteins: list[Protein], + wavelength: float | None, + wells_with_protein_only: bool, + to_concentration: bool, + extrapolate: bool, + silent: bool, + ) -> None: + self.name = name + self.plate = plate + self.well_ids = well_ids + self.molecules = molecules + self.detected_molecule = detected_molecule + self.proteins = proteins + self.wavelength = wavelength + self.calibrator_dict: dict[str, Calibrator] = {} + self.wells_with_protein_only = wells_with_protein_only + self.to_concentration = to_concentration + self.extrapolate = extrapolate + self.silent = silent + + # Initialize calibrators if concentration data is requested + if self.to_concentration: + self._init_calibrators() + + # Check if a wavelength was specified, otherwise set it to the only wavelength measured + self._handle_wavelength() + + def convert(self): + """ + Converts proteins, small molecules, and measurements to an EnzymeMLDocument. + + Returns: + EnzymeMLDocument: The EnzymeMLDocument containing the converted data. + """ + enzml_doc = pe.EnzymeMLDocument(name=self.name) + logger.debug(f"Initialized EnzymeMLDocument with name {self.name}") + + # Add proteins to EnzymeML document + enzml_doc.proteins = [self.map_protein(protein) for protein in self.proteins] + logger.debug(f"Added {len(self.proteins)} proteins to EnzymeMLDocument") + + # Add small molecules to EnzymeML document + enzml_doc.small_molecules = [ + self.map_small_molecule(molecule) for molecule in self.molecules + ] + logger.debug(f"Added {len(self.molecules)} small molecules to EnzymeMLDocument") + + # Add measurements to EnzymeML document + enzml_doc.measurements = self.wells_to_enzml_measurements() + + return enzml_doc + + def get_well_subset(self) -> list[Well]: + """ + Returns a subset of wells from the plate based on the well ids. + + Returns: + list[Well]: List of Well objects. + """ + if self.well_ids is None: + return self.plate.wells + + if isinstance(self.well_ids, str): + self.well_ids = [self.well_ids] + + self.well_ids = [well_id.upper() for well_id in self.well_ids] + subset = [well for well in self.plate.wells if well.id.upper() in self.well_ids] + + if len(subset) == 0: + raise ValueError("No wells found with the specified well ids.") + + return subset + + def wells_to_enzml_measurements(self) -> list[pe.Measurement]: + """Converts wells to EnzymeML measurements. + + Raises: + ValueError: If the pH of a well is not defined. + ValueError: If no measurements were added to EnzymeML. + + Returns: + list[pe.Measurement]: List of EnzymeML `Measurement` objects. + """ + meas_counter = 0 + measurements = [] + + for well in self.get_well_subset(): + photo_measurement = next( + ( + meas + for meas in well.measurements + if meas.wavelength == self.wavelength + ), + None, + ) + + # Skip wells without a measurement at the specified wavelength + if not photo_measurement: + continue + + # Skip wells without a protein if the flag is set + if self.wells_with_protein_only: + if not self.is_catalyzed( + well, photo_measurement, {p.id for p in self.proteins} + ): + continue + + # Ensure that the pH of the well is defined + if well.ph is None: + raise ValueError(f"pH of well {well.id} is not defined.") + + # Create EnzymeML measurement + enzml_meas = pe.Measurement( + id=well.id, + name="photometric measurement", + ph=well.ph, + temperature=self.temperature, + temperature_unit=EnzML_UnitDef( + **self.plate.temperature_unit.model_dump() + ), + ) + + logger.debug( + f"Contributing species in well {well.id}: {[(state.species_id ,state.contributes_to_signal) for state in photo_measurement.blank_states]}" + ) + + # Check if only one species contributes to the signal + measured_species = self.get_only_contributing_species( + photo_measurement, well.id, self.detected_molecule.id + ) + if not measured_species: + continue + + logger.debug( + f"Adding measurement from well {well.id} with species {measured_species}" + ) + + # Add species data to the measurement based on the initial conditions of the well + self.add_to_species_data(enzml_meas, well.init_conditions) + + # Add absorption data to the species data of the measurement + self.add_absorption_data( + measurement=enzml_meas, + photo_measurement=photo_measurement, + species_id=measured_species, + ) + + measurements.append(enzml_meas) + + meas_counter += 1 + + if meas_counter == 0: + raise ValueError("No measurements were added to EnzymeML.") + + if not self.silent: + mode = "concentration" if self.to_concentration else "absorbance" + print( + f"✅ Added measurements from {meas_counter} wells with {mode} values to EnzymeMLDocument" + ) + + return measurements + + def add_absorption_data( + self, + measurement: pe.Measurement, + photo_measurement: PhotometricMeasurement, + species_id: str, + ) -> None: + """Adds absorption data to the species data of the measurement. + Based in the `to_concentration` flag, the absorption data is converted to concentration data using the calibrator. + + Args: + measurement (pe.Measurement): EnzymeML `Measurement` object. + photo_measurement (PhotometricMeasurement): PhotometricMeasurement object. + species_id (str): Species ID. + + Raises: + ValueError: If the calibrator for the species is not defined. + """ + + species_data = next( + ( + data + for data in measurement.species_data + if data.species_id == species_id + ), + None, + ) + + assert ( + species_data is not None + ), f"Species {species_id} not found in measurement {measurement.id}." + + if self.to_concentration: + data_type = pe.DataTypes.CONCENTRATION + if species_id not in self.calibrator_dict: + raise ValueError( + f"Calibrator for species {species_id} is not defined. Set `to_concentration=False`, or define a standard for species {species_id}." + ) + + data = self.calibrator_dict[species_id].calculate_concentrations( + model=self.calibrator_dict[species_id].models[0], + signals=photo_measurement.absorption, + extrapolate=self.extrapolate, + ) + else: + data_type = pe.DataTypes.ABSORBANCE + data = photo_measurement.absorption + + species_data.data_type = data_type + species_data.data = data + species_data.time = photo_measurement.time + + if species_data.prepared is None: + species_data.prepared = species_data.initial + + species_data.initial = species_data.data[0] + + @property + def temperature(self) -> float: + return np.mean(self.plate.temperatures).tolist() + + @staticmethod + def get_only_contributing_species( + photo_measurement: PhotometricMeasurement, + well_id: str, + detected_molecule_id: str, + ) -> str | None: + # check that only one species contributes to the signal + contributing_species = set() + for state in photo_measurement.blank_states: + if state.species_id == detected_molecule_id: + contributing_species.add(state.species_id) + if state.contributes_to_signal: + contributing_species.add(state.species_id) + + if len(contributing_species) > 1: + raise ValueError( + f""" + Multiple species ({contributing_species}) contribute to the signal in well {well_id}. Only one species is allowed." + Either the plate was not blanked, or control measurements for determining the blank are missing. + Species can manually be specified not to contribute to the signal by setting the `contributes_to_signal=False` during + the assignment of well conditions. + """ + ) + + if len(contributing_species) == 0: + return None + + return contributing_species.pop() + + @staticmethod + def is_catalyzed( + well: Well, + photo_measurement: PhotometricMeasurement, + protein_ids: set[str], + ) -> bool: + """ + Checks if a well contains a catalyst and another species. + + Args: + well (Well): `Well` object + protein_ids (list[str]): List of protein ids + + Returns: + bool: True if the well contains a catalyst and another species, False otherwise + """ + + contains_protein = False + for condition in well.init_conditions: + if condition.species_id in protein_ids and condition.init_conc > 0: + contains_protein = True + + protein_contributes = False + for state in photo_measurement.blank_states: + if state.species_id in protein_ids: + if state.contributes_to_signal: + protein_contributes = True + + if contains_protein and not protein_contributes: + logger.debug(f"Well {well.id} contains a catalyst.") + return True + + return False + + @staticmethod + def add_to_species_data( + measurement: pe.Measurement, init_conditions: list[InitCondition] + ): + for condition in init_conditions: + measurement.add_to_species_data( + species_id=condition.species_id, + initial=condition.init_conc, + prepared=condition.init_conc, + data_unit=EnzML_UnitDef(**condition.conc_unit.model_dump()), + data_type=DataTypes.CONCENTRATION, + ) + + @staticmethod + def map_protein(protein: Protein) -> pe.Protein: + if protein.ld_id_url: + return pe.Protein( + id=protein.id, + ld_id=protein.ld_id_url, + name=protein.name, + constant=protein.constant, + sequence=protein.sequence, + ) + else: + return pe.Protein( + id=protein.id, + name=protein.name, + constant=protein.constant, + sequence=protein.sequence, + ) + + @staticmethod + def map_small_molecule(molecule: Molecule) -> pe.SmallMolecule: + if molecule.ld_id_url: + return pe.SmallMolecule( + id=molecule.id, + ld_id=molecule.ld_id_url, + name=molecule.name, + constant=molecule.constant, + ) + else: + return pe.SmallMolecule( + id=molecule.id, name=molecule.name, constant=molecule.constant + ) + + def _handle_wavelength(self): + """ + Checks if a wavelength was specified and if not, sets it to the only wavelength measured. + If multiple wavelengths were measured, an error is raised. + """ + if isinstance(self.wavelength, float): + return + + # check that all measurements in the wells have only one wavelength + wavelengths = set() + for well in self.plate.wells: + for meas in well.measurements: + wavelengths.add(meas.wavelength) + + if len(wavelengths) > 1: + raise ValueError("Multiple wavelengths were measured. Please specify one.") + + self.wavelength = wavelengths.pop() + + def _init_calibrators(self): + """Initializes calibrators for all molecules with a standard.""" + + for molecule in self.molecules: + assert ( + molecule.id not in self.calibrator_dict + ), f"Calibrator for molecule {molecule.id} already exists in calibrator_dict." + + if not molecule.standard: + continue + + calibrator = Calibrator.from_standard(molecule.standard) + + self.calibrator_dict[molecule.id] = calibrator + + logger.debug(f"Initialized calibrator for molecule {molecule.id}") diff --git a/mtphandler/model.py b/mtphandler/model.py new file mode 100644 index 0000000..b5e2f4c --- /dev/null +++ b/mtphandler/model.py @@ -0,0 +1,947 @@ +## This is a generated file. Do not modify it manually! + +from __future__ import annotations + +from enum import Enum +from typing import Generic, Optional, TypeVar +from uuid import uuid4 + +from pydantic import BaseModel, ConfigDict, Field + +# Filter Wrapper definition used to filter a list of objects +# based on their attributes +Cls = TypeVar("Cls") + + +class FilterWrapper(Generic[Cls]): + """Wrapper class to filter a list of objects based on their attributes""" + + def __init__(self, collection: list[Cls], **kwargs): + self.collection = collection + self.kwargs = kwargs + + def filter(self) -> list[Cls]: + for key, value in self.kwargs.items(): + self.collection = [ + item for item in self.collection if self._fetch_attr(key, item) == value + ] + return self.collection + + def _fetch_attr(self, name: str, item: Cls): + try: + return getattr(item, name) + except AttributeError: + raise AttributeError(f"{item} does not have attribute {name}") + + +# JSON-LD Helper Functions +def add_namespace(obj, prefix: str | None, iri: str | None): + """Adds a namespace to the JSON-LD context + + Args: + prefix (str): The prefix to add + iri (str): The IRI to add + """ + if prefix is None and iri is None: + return + elif prefix and iri is None: + raise ValueError("If prefix is provided, iri must also be provided") + elif iri and prefix is None: + raise ValueError("If iri is provided, prefix must also be provided") + + obj.ld_context[prefix] = iri # type: ignore + + +def validate_prefix(term: str | dict, prefix: str): + """Validates that a term is prefixed with a given prefix + + Args: + term (str): The term to validate + prefix (str): The prefix to validate against + + Returns: + bool: True if the term is prefixed with the prefix, False otherwise + """ + + if isinstance(term, dict) and not term["@id"].startswith(prefix + ":"): + raise ValueError(f"Term {term} is not prefixed with {prefix}") + elif isinstance(term, str) and not term.startswith(prefix + ":"): + raise ValueError(f"Term {term} is not prefixed with {prefix}") + + +# Model Definitions + + +class Plate(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + temperature_unit: UnitDefinition + id: Optional[str] = Field(default=None) + name: Optional[str] = Field(default=None) + wells: list[Well] = Field(default_factory=list) + date_measured: Optional[str] = Field(default=None) + temperatures: list[float] = Field(default_factory=list) + times: list[float] = Field(default_factory=list) + time_unit: Optional[UnitDefinition] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", default_factory=lambda: "md:Plate/" + str(uuid4()) + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:Plate", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def filter_wells(self, **kwargs) -> list[Well]: + """Filters the wells attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[Well]: The filtered list of Well objects + """ + + return FilterWrapper[Well](self.wells, **kwargs).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_wells( + self, + id: str, + x_pos: int, + y_pos: int, + ph: Optional[float] = None, + init_conditions: list[InitCondition] = [], + measurements: list[PhotometricMeasurement] = [], + volume: Optional[float] = None, + volume_unit: Optional[UnitDefinition] = None, + **kwargs, + ): + params = { + "id": id, + "x_pos": x_pos, + "y_pos": y_pos, + "ph": ph, + "init_conditions": init_conditions, + "measurements": measurements, + "volume": volume, + "volume_unit": volume_unit, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.wells.append(Well(**params)) + + return self.wells[-1] + + +class Well(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + id: str + x_pos: int + y_pos: int + ph: Optional[float] = Field(default=None) + init_conditions: list[InitCondition] = Field(default_factory=list) + measurements: list[PhotometricMeasurement] = Field(default_factory=list) + volume: Optional[float] = Field(default=None) + volume_unit: Optional[UnitDefinition] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", default_factory=lambda: "md:Well/" + str(uuid4()) + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:Well", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def filter_init_conditions(self, **kwargs) -> list[InitCondition]: + """Filters the init_conditions attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[InitCondition]: The filtered list of InitCondition objects + """ + + return FilterWrapper[InitCondition](self.init_conditions, **kwargs).filter() + + def filter_measurements(self, **kwargs) -> list[PhotometricMeasurement]: + """Filters the measurements attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[PhotometricMeasurement]: The filtered list of PhotometricMeasurement objects + """ + + return FilterWrapper[PhotometricMeasurement]( + self.measurements, **kwargs + ).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_init_conditions( + self, + species_id: str, + init_conc: float, + conc_unit: UnitDefinition, + **kwargs, + ): + params = { + "species_id": species_id, + "init_conc": init_conc, + "conc_unit": conc_unit, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.init_conditions.append(InitCondition(**params)) + + return self.init_conditions[-1] + + def add_to_measurements( + self, + wavelength: float, + time_unit: UnitDefinition, + absorption: list[float] = [], + time: list[float] = [], + blank_states: list[BlankState] = [], + **kwargs, + ): + params = { + "wavelength": wavelength, + "time_unit": time_unit, + "absorption": absorption, + "time": time, + "blank_states": blank_states, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.measurements.append(PhotometricMeasurement(**params)) + + return self.measurements[-1] + + +class PhotometricMeasurement(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + wavelength: float + time_unit: UnitDefinition + absorption: list[float] = Field(default_factory=list) + time: list[float] = Field(default_factory=list) + blank_states: list[BlankState] = Field(default_factory=list) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:PhotometricMeasurement/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:PhotometricMeasurement", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def filter_blank_states(self, **kwargs) -> list[BlankState]: + """Filters the blank_states attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[BlankState]: The filtered list of BlankState objects + """ + + return FilterWrapper[BlankState](self.blank_states, **kwargs).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_blank_states( + self, + species_id: str, + contributes_to_signal: bool = True, + **kwargs, + ): + params = { + "species_id": species_id, + "contributes_to_signal": contributes_to_signal, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.blank_states.append(BlankState(**params)) + + return self.blank_states[-1] + + +class InitCondition(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + species_id: str + init_conc: float + conc_unit: UnitDefinition + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:InitCondition/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:InitCondition", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class BlankState(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + ) # type: ignore + + species_id: str + contributes_to_signal: bool = True + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:BlankState/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:BlankState", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class UnitDefinition(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + use_enum_values=True, + ) # type: ignore + + id: Optional[str] = Field(default=None) + name: Optional[str] = Field(default=None) + base_units: list[BaseUnit] = Field(default_factory=list) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", + default_factory=lambda: "md:UnitDefinition/" + str(uuid4()), + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:UnitDefinition", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def filter_base_units(self, **kwargs) -> list[BaseUnit]: + """Filters the base_units attribute based on the given kwargs + + Args: + **kwargs: The attributes to filter by. + + Returns: + list[BaseUnit]: The filtered list of BaseUnit objects + """ + + return FilterWrapper[BaseUnit](self.base_units, **kwargs).filter() + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + def add_to_base_units( + self, + kind: UnitType, + exponent: int, + multiplier: Optional[float] = None, + scale: Optional[float] = None, + **kwargs, + ): + params = { + "kind": kind, + "exponent": exponent, + "multiplier": multiplier, + "scale": scale, + } + + if "id" in kwargs: + params["id"] = kwargs["id"] + + self.base_units.append(BaseUnit(**params)) + + return self.base_units[-1] + + +class BaseUnit(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + use_enum_values=True, + ) # type: ignore + + kind: UnitType + exponent: int + multiplier: Optional[float] = Field(default=None) + scale: Optional[float] = Field(default=None) + + # JSON-LD fields + ld_id: str = Field( + serialization_alias="@id", default_factory=lambda: "md:BaseUnit/" + str(uuid4()) + ) + ld_type: list[str] = Field( + serialization_alias="@type", + default_factory=lambda: [ + "md:BaseUnit", + ], + ) + ld_context: dict[str, str | dict] = Field( + serialization_alias="@context", + default_factory=lambda: { + "md": "https://github.com/FAIRChemistry/MTPHandler", + }, + ) + + def set_attr_term( + self, + attr: str, + term: str | dict, + prefix: str | None = None, + iri: str | None = None, + ): + """Sets the term for a given attribute in the JSON-LD object + + Example: + # Using an IRI term + >> obj.set_attr_term("name", "http://schema.org/givenName") + + # Using a prefix and term + >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") + + # Usinng a dictionary term + >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) + + Args: + attr (str): The attribute to set the term for + term (str | dict): The term to set for the attribute + + Raises: + AssertionError: If the attribute is not found in the model + """ + + assert ( + attr in self.model_fields + ), f"Attribute {attr} not found in {self.__class__.__name__}" + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_context[attr] = term + + def add_type_term( + self, term: str, prefix: str | None = None, iri: str | None = None + ): + """Adds a term to the @type field of the JSON-LD object + + Example: + # Using a term + >> obj.add_type_term("https://schema.org/Person") + + # Using a prefixed term + >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") + + Args: + term (str): The term to add to the @type field + prefix (str, optional): The prefix to use for the term. Defaults to None. + iri (str, optional): The IRI to use for the term prefix. Defaults to None. + + Raises: + ValueError: If prefix is provided but iri is not + ValueError: If iri is provided but prefix is not + """ + + if prefix: + validate_prefix(term, prefix) + + add_namespace(self, prefix, iri) + self.ld_type.append(term) + + +class UnitType(Enum): + AMPERE = "ampere" + AVOGADRO = "avogadro" + BECQUEREL = "becquerel" + CANDELA = "candela" + CELSIUS = "celsius" + COULOMB = "coulomb" + DIMENSIONLESS = "dimensionless" + FARAD = "farad" + GRAM = "gram" + GRAY = "gray" + HENRY = "henry" + HERTZ = "hertz" + ITEM = "item" + JOULE = "joule" + KATAL = "katal" + KELVIN = "kelvin" + KILOGRAM = "kilogram" + LITRE = "litre" + LUMEN = "lumen" + LUX = "lux" + METRE = "metre" + MOLE = "mole" + NEWTON = "newton" + OHM = "ohm" + PASCAL = "pascal" + RADIAN = "radian" + SECOND = "second" + SIEMENS = "siemens" + SIEVERT = "sievert" + STERADIAN = "steradian" + TESLA = "tesla" + VOLT = "volt" + WATT = "watt" + WEBER = "weber" diff --git a/mtphandler/molecule.py b/mtphandler/molecule.py new file mode 100644 index 0000000..e14026a --- /dev/null +++ b/mtphandler/molecule.py @@ -0,0 +1,166 @@ +import re + +from calipytion.model import Standard +from calipytion.model import UnitDefinition as CalUnit +from calipytion.tools.calibrator import Calibrator +from calipytion.units import C +from pydantic import BaseModel, ConfigDict, Field + +from mtphandler.model import UnitDefinition + + +class Molecule(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + use_enum_values=True, + ) # type: ignore + + id: str = Field( + description="ID of the molecule", + ) + pubchem_cid: int = Field( + description="PubChem CID of the molecule", + ) + name: str = Field( + description="Name of the molecule", + ) + standard: Standard | None = Field( + description="Standard instance associated with the molecule", default=None + ) + constant: bool = Field( + description="Boolean indicating whether the molecule concentration is constant throughout the experiment", + default=False, + ) + + # @model_validator(mode="before") + # @classmethod + # def get_molecule_name(cls, data: Any) -> Any: + # """Retrieves the molecule name from the PubChem database based on the PubChem CID.""" + + # if "name" not in data: + # data["molecule_name"] = pubchem_request_molecule_name(data["pubchem_cid"]) + # return data + + # # validator that if a standard is provided, the retention time must be defined and vice versa + + # @model_validator(mode="before") + # @classmethod + # def validate_standard_and_retention_time(cls, data: Any) -> Any: + # if data.get("standard") and data.get("retention_time"): + # assert data["standard"].retention_time == data["retention_time"], """ + # The retention time of the standard and the molecule must be the same. + # """ + + @property + def ld_id_url(self) -> str | None: + """Returns the URL of the PubChem page of the molecule based on the PubChem CID + + Returns: + str | None: URL of the PubChem page of the molecule if the PubChem CID is defined, None otherwise. + """ + + if self.pubchem_cid == -1: + return None + + return f"https://pubchem.ncbi.nlm.nih.gov/compound/{self.pubchem_cid}" + + @classmethod + def from_standard( + cls, standard: Standard, init_conc: float, conc_unit: UnitDefinition + ): + """Creates a Molecule instance from a Standard instance.""" + + assert standard.retention_time, """ + The retention time of the standard needs to be defined. + Specify the `retention_time` attribute of the standard. + """ + + return cls( + id=standard.molecule_id, + pubchem_cid=standard.pubchem_cid, + name=standard.molecule_name, + standard=standard, + ) + + def create_standard( + self, + areas: list[float], + concs: list[float], + conc_unit: UnitDefinition, + ph: float, + temperature: float, + temp_unit: CalUnit = C, + visualize: bool = True, + ) -> Standard: + """Creates a linear standard from the molecule's calibration data.""" + + calibrator = Calibrator( + molecule_id=self.id, + pubchem_cid=self.pubchem_cid, + molecule_name=self.name, + concentrations=concs, + conc_unit=CalUnit(**conc_unit.model_dump()), + signals=areas, + ) + calibrator.models = [] + model = calibrator.add_model( + name="linear", + signal_law=f"{self.id} * a", + ) + + calibrator.fit_models() + model.calibration_range.conc_lower = 0.0 + model.calibration_range.signal_lower = 0.0 + + if visualize: + calibrator.visualize() + + standard = calibrator.create_standard( + model=model, + ph=ph, + temperature=temperature, + temp_unit=CalUnit(**temp_unit.model_dump()), + ) + + self.standard = standard + + return standard + + +class Protein(BaseModel): + model_config: ConfigDict = ConfigDict( # type: ignore + validate_assigment=True, + use_enum_values=True, + ) # type: ignore + + id: str = Field( + description="ID of the Protein", + ) + name: str = Field( + description="Name of the protein", + ) + sequence: str | None = Field( + description="Amino acid sequence of the protein", + default=None, + ) + constant: bool = Field( + description="Boolean indicating whether the protein concentration is constant", + default=True, + ) + + @property + def ld_id_url(self) -> str | None: + """Returns the URL of the UniProt page of the protein based on the protein ID + + Returns: + str | None: URL of the UniProt page of the protein if the protein ID is defined, None otherwise. + """ + + uniprot_pattern = ( + r"[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}" + ) + + if re.fullmatch(uniprot_pattern, self.id) is None: + return None + else: + return f"https://www.uniprot.org/uniprotkb/{self.id}/entry" diff --git a/mtphandler/mtp_logging.py b/mtphandler/mtp_logging.py new file mode 100644 index 0000000..3e81585 --- /dev/null +++ b/mtphandler/mtp_logging.py @@ -0,0 +1,27 @@ +import sys + +from loguru import logger + +# Flag to check if the logger has already been configured +mtphandler_logger_configured = False + + +def configure_logger( + log_level_std: str = "INFO", + log_level_file: str = "DEBUG", + log_file: str = "mtp_handler.log", + log_file_rotation_MB: int = 1, +): + """Configures the logger severity level for the package.""" + global mtphandler_logger_configured + + if not mtphandler_logger_configured: + logger.remove() + + logger.add(sys.stdout, level=log_level_std) + logger.add( + log_file, + level=log_level_file, + rotation=f"{log_file_rotation_MB} MB", + ) + mtphandler_logger_configured = True diff --git a/mtphandler/plate_manager.py b/mtphandler/plate_manager.py new file mode 100644 index 0000000..64c954f --- /dev/null +++ b/mtphandler/plate_manager.py @@ -0,0 +1,1258 @@ +from __future__ import annotations + +from collections import defaultdict +from typing import Any, Literal, Optional, Tuple, get_args + +import numpy as np +import pandas as pd +from loguru import logger +from pydantic import BaseModel, Field, model_validator +from pyenzyme import EnzymeMLDocument +from rich import print + +from mtphandler.model import ( + BlankState, + PhotometricMeasurement, + Plate, + UnitDefinition, + Well, +) +from mtphandler.molecule import Molecule, Protein +from mtphandler.tools import ( + get_measurement, + get_species_condition, + handle_blank_status, + measurement_is_blanked_for, + pubchem_request_molecule_name, + well_contains_species, +) +from mtphandler.units import C +from mtphandler.visualize import visualize_plate + +ASSIGN_CASE = Literal["rows", "columns", "all", "all except"] +ASSIGN_CASE_VALUES: Tuple[ASSIGN_CASE, ...] = get_args(ASSIGN_CASE) + + +class PlateManager(BaseModel): + name: str = Field( + ..., + description="Name of the plate", + ) + plate: Plate = Field( + ..., + description="Plate object", + ) + molecules: list[Molecule] = Field( + default=[], + description="List of molecules", + ) + proteins: list[Protein] = Field( + default=[], + description="List of proteins", + ) + + @model_validator(mode="before") + @classmethod + def give_name_to_plate(cls, data: Any) -> Any: + if isinstance(data, dict): + if "name" not in data or data["name"] is None: + data["name"] = "MTP assay" + return data + + def define_molecule( + self, + id: str, + pubchem_cid: int, + name: str | None = None, + constant: bool = False, + ) -> Molecule: + """Defines a molecule which can be used to assign to wells on the plate. + If no name is provided, the molecule name is retrieved from the PubChem database. + If the molecule is not known in the PubChem database, please specify `pubchem_cid=-1`. + + Args: + id (str): Internal identifier of the molecule such as `s0` or `ABTS`. + pubchem_cid (int): PubChem CID of the molecule. + name (str | None, optional): Name of the molecule. Defaults to None. + constant (bool, optional): Indicates whether the molecule concentration is constant throughout the experiment. Defaults to False. + + Raises: + ValueError: If the PubChem CID is not an integer. + ValueError: If the name is not provided and the PubChem CID is not available. + + Returns: + Molecule: Molecule object. + """ + + logger.debug(f"Defining molecule {id} with PubChem CID {pubchem_cid}") + + if not isinstance(pubchem_cid, int): + raise ValueError("PubChem CID must be an integer.") + + if name is None: + if pubchem_cid != -1: + name = pubchem_request_molecule_name(pubchem_cid) + else: + raise ValueError( + "Name must be provided if PubChem CID is not available." + ) + + molecule = Molecule( + id=id, + pubchem_cid=pubchem_cid, + name=name, + constant=constant, + ) + + self._update_molecule(molecule) + + return molecule + + # Adders for species, molecules and proteins + def add_molecule( + self, + molecule: Molecule, + constant: bool | None = None, + ) -> None: + """Adds a molecule to the list of molecules. Allows to update the `constant` attribute of the molecule. + + Args: + molecule (Molecule): Molecule object to add to the list of molecules. + constant (bool | None, optional): Indicates whether the `constant` attribute of the molecule should be updated. Defaults to None. + """ + if constant is not None: + molecule = molecule.model_copy(update={"constant": constant}) + + self._update_molecule(molecule) + + def _update_molecule(self, molecule) -> None: + """Updates the molecule if it already exists in the list of molecules. + Otherwise, the molecule is added to the list of species.""" + for idx, mol in enumerate(self.molecules): + if mol.id == molecule.id: + self.molecules[idx] = molecule + assert self.molecules[idx] is molecule + return + + self.molecules.append(molecule) + + def define_protein( + self, + id: str, + name: str, + sequence: str | None = None, + constant: bool = True, + ) -> Protein: + """Defines a protein which can be used to assign to wells on the plate. + + Args: + id (str): Internal identifier of the protein such as `p0`, `MAT_K78M` or `GFP`. + name (str): Name of the protein. + sequence (str | None, optional): Amino acid sequence of the protein. Defaults to None. + constant (bool, optional): Indicates whether the protein concentration is constant throughout the experiment. Defaults to True. + + Returns: + Protein: Protein object. + """ + protein = Protein( + id=id, + name=name, + sequence=sequence, + constant=constant, + ) + + self._update_protein(protein) + + return protein + + def add_protein( + self, + protein: Protein, + constant: bool | None = None, + ) -> None: + """Adds a protein to the list of proteins. Allows to update the `constant` attribute of the protein. + + Args: + protein (Protein): Protein object to add to the list of proteins. + constant (bool | None, optional): Indicates whether the `constant` attribute of the protein should be updated. Defaults to None. + """ + if constant is not None: + protein = protein.model_copy(update={"constant": constant}) + + self._update_protein(protein) + + def _update_protein(self, protein) -> None: + """Updates the protein if it already exists in the list of proteins.""" + for idx, prot in enumerate(self.proteins): + if prot.id == protein.id: + self.proteins[idx] = protein + assert self.proteins[idx] is protein + return + + self.proteins.append(protein) + + # Assign species and conditions to wells + def assign_init_conditions( + self, + species: Molecule | Protein, + init_conc: float | list[float], + conc_unit: UnitDefinition, + to: ASSIGN_CASE, + ids: Optional[str | int | list[str] | list[int]] = None, + contributes_to_signal: Optional[bool] = None, + silent: bool = False, + ): + """ + Assigns a `Molecule` or `Protein` to specific wells on the plate based on the provided criteria. + In this way the initial concentration of the species can be set for the respective wells in a row, + column, all wells or all wells except for the specified. During the assignment, either an array of + initial concentrations or a single initial concentration can be provided. If a single initial + concentration is provided, it is assigned to all wells of e.g., a row or column. + If an array of initial concentrations is provided, the length of the array must match the number of + wells in the row or column. + + Tip: + For complex assignment scenarios, consider using the `assign_init_conditions_from_spreadsheet` function. + + Args: + species (Molecule | Protein): The species to assign to the wells. + init_conc (float | list[float]): The initial concentration(s) of the species. + conc_unit (UnitDefinition): The unit of concentration. + to (ASSIGN_CASE): The target location(s) for assigning the species. It should be one of the allowed cases. + ids (str | int | list[str] | list[int], optional): The ID(s) of the target wells, rows, or columns. Defaults to None. + contributes_to_signal (bool, optional): Indicates if the assigned species contributes to the signal. + Defaults to None. + silent (bool, optional): If True, no output is printed. Defaults to False. + + Raises: + AttributeError: If the species does not exist in the list of molecules or proteins. + AttributeError: If the 'to' argument is not a valid `ASSIGN_CASE`. + + Returns: + None + """ + + # Handle species + if isinstance(species, str): + species = self.get_species(species) + elif isinstance(species, (Molecule, Protein)): + pass + else: + raise AttributeError( + """Argument 'species' must reference an `id` of a molecule or protein from the list of molecules or proteins of the `MTPHandler`.""" + ) + + if to not in ASSIGN_CASE_VALUES: + raise AttributeError(f"Argument 'to' must be one of {ASSIGN_CASE_VALUES}.") + + if not isinstance(init_conc, list): + init_conc = [init_conc] + + if not isinstance(ids, list) and isinstance(ids, (int, str)): + ids = [ids] # type: ignore + + if to == "all": + if isinstance(init_conc, list): + if len(init_conc) == 1: + init_conc = init_conc[0] + assert isinstance( + init_conc, (float, int) + ), "Argument 'init_conc' must be a float or an integer." + + self._assign_to_all( + species=species, + init_conc=float(init_conc), + conc_unit=conc_unit, + contributes_to_signal=contributes_to_signal, + silent=silent, + ) + + elif to == "columns": + assert ( + isinstance(ids, list) and all(isinstance(i, int) for i in ids) + ), "Argument 'ids' must be a list of integers when 'to' is set to 'columns'." + + self._assign_to_columns( + column_ids=ids, + species=species, + init_concs=init_conc, + conc_unit=conc_unit, + contributes_to_signal=contributes_to_signal, + silent=silent, + ) + + elif to == "rows": + assert isinstance(ids, list) and all( + isinstance(i, str) for i in ids + ), "Argument 'ids' must be a list of strings when 'to' is set to 'rows'." + + self._assign_species_to_rows( + row_ids=ids, + species=species, + init_concs=init_conc, + conc_unit=conc_unit, + contributes_to_signal=contributes_to_signal, + silent=silent, + ) + + else: + if isinstance(init_conc, list): + if len(init_conc) == 1: + init_conc = init_conc[0] + + assert isinstance( + init_conc, float + ), "Argument 'init_conc' must be a float when 'to' is set to 'all_except'." + + self._assign_species_to_all_except( + well_ids=ids, + species=species, + init_conc=init_conc, + conc_unit=conc_unit, + contributes_to_signal=contributes_to_signal, + silent=silent, + ) + + def _assign_to_all( + self, + species: Molecule | Protein, + init_conc: float, + conc_unit: UnitDefinition, + contributes_to_signal: bool | None, + silent: bool, + ): + for well in self.plate.wells: + well.add_to_init_conditions( + species_id=species.id, + init_conc=init_conc, + conc_unit=conc_unit, + ) + + handle_blank_status(well, species.id, init_conc, contributes_to_signal) + + if not silent: + print( + f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" + f" {init_conc} {conc_unit} to all wells." + ) + + def get_calibrator( + self, + molecule: Molecule, + cutoff: float | None = None, + wavelength: float | None = None, + ): + from mtphandler.ioutils.calipytion import initialize_calibrator + + if wavelength is None: + wavelength = self._handle_wavelength() + + return initialize_calibrator( + plate=self.plate, + wavelength=wavelength, + molecule=molecule, + protein_ids=[protein.id for protein in self.proteins], + cutoff=cutoff, + ) + + def _assign_to_columns( + self, + column_ids: list[int], + species: Molecule | Protein, + init_concs: list[float], + conc_unit: UnitDefinition, + contributes_to_signal: bool | None, + silent: bool, + ): + # Handle column_ids + if not all([isinstance(column_id, int) for column_id in column_ids]): + raise AttributeError("Argument 'column_ids' must be a list of integers.") + + columns = [] + for column_id in column_ids: + wells = [well for well in self.plate.wells if well.x_pos + 1 == column_id] + wells = sorted(wells, key=lambda x: x.y_pos) + columns.append(wells) + + # assert thal all columns are the same size + assert all([len(column) == len(columns[0]) for column in columns]), ( + "All columns must be the same size. " "" + ) + + # Handle init_concs + if len(init_concs) == 1: + init_concs = init_concs * len(columns[0]) + + for wells in columns: + assert len(init_concs) == len(wells), f""" + Number of initial concentrations ({len(init_concs)}) does not match number + of wells ({len(wells)}) in columns ({column_ids}). + """ + + for well, init_conc in zip(wells, init_concs): + well.add_to_init_conditions( + species_id=species.id, + init_conc=init_conc, + conc_unit=conc_unit, + ) + + handle_blank_status(well, species.id, init_conc, contributes_to_signal) + + if not silent: + print( + f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" + f" concentrations of {init_concs} {conc_unit} to columns {column_ids}." + ) + + def _assign_species_to_rows( + self, + row_ids: list[str], + species: Molecule | Protein, + init_concs: list[float], + conc_unit: UnitDefinition, + contributes_to_signal: bool | None, + silent: bool, + ): + # Handle row_ids + + if isinstance(row_ids, str): + row_ids = [row_ids] + + if not all([isinstance(row_id, str) for row_id in row_ids]): + raise AttributeError("Argument 'row_ids' must be a list of strings.") + + rows = [] + for row_id in row_ids: + wells = [well for well in self.plate.wells if row_id in well.id] + wells = sorted(wells, key=lambda x: x.x_pos) + rows.append(wells) + + for wells in rows: + assert len(init_concs) == len(wells), f""" + Number of initial concentrations ({len(init_concs)}) does not match number + of wells ({len(wells)}) in rows ({row_ids}). + """ + + for well, init_conc in zip(wells, init_concs): + well.add_to_init_conditions( + species_id=species.id, + init_conc=init_conc, + conc_unit=conc_unit, + ) + + handle_blank_status(well, species.id, init_conc, contributes_to_signal) + + if not silent: + print( + f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" + f" {init_concs} {conc_unit} to rows {row_ids}." + ) + + def _assign_species_to_all_except( + self, + well_ids: list[str], + species: Molecule | Protein, + init_conc: float, + conc_unit: UnitDefinition, + contributes_to_signal: bool | None, + silent: bool, + ): + # validate all well_id exist + for well_id in well_ids: + if not self._well_id_exists(well_id): + raise AttributeError(f"Well ID '{well_id}' not found on the plate.") + + wells = (well for well in self.plate.wells if well.id not in well_ids) + for well in wells: + well.add_to_init_conditions( + species_id=species.id, + init_conc=init_conc, + conc_unit=conc_unit, + ) + + handle_blank_status(well, species.id, init_conc, contributes_to_signal) + + if not silent: + print( + f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" + f" {init_conc} {conc_unit} to all wells except {well_ids}." + ) + + def assign_init_conditions_from_spreadsheet( + self, + conc_unit: UnitDefinition, + path: str, + header: int = 0, + index: int = 0, + silent: bool = False, + ): + """Assign initial concentrations from an Excel spreadsheet to the wells on the plate. + + Note: + This function goes through the sheets in an excel spreadsheet. If the sheet name + matches the id of a protein or molecule defined for the plate, the initial concentration + form the plate map in the excel spreadsheet is assigned to the respective well. + + The excel spreadsheet must have the following structure: + + - The first row must contain the column numbers from 1 to 12. + - The first column must contain the row letters from A to H. + - If a cell is left empty for a species, the species is not assigned to the well. + - If the initial concentration is `0`, the species is added to the well. This is useful for + specifying a product which is not present in the initial reaction mixture, but is formed + during the reaction. + + Args: + conc_unit (UnitDefinition): The unit of concentration. + path (str): Path to the Excel spreadsheet. + header (int, optional): Row to use as the column names. Defaults to 0. + index (int, optional): Column to use as the row labels. Defaults to 0. + silent (bool, optional): If True, no output is printed. Defaults to False. + """ + # get excel sheet names + count = 0 + sheet_names = pd.ExcelFile(path).sheet_names + + species_matches: set[str] = set() + for protein in self.proteins: + if protein.id.lower() in [sheet.lower() for sheet in sheet_names]: + species_matches.add(protein.id) + for molecule in self.molecules: + if molecule.id.lower() in [sheet.lower() for sheet in sheet_names]: + species_matches.add(molecule.id) + + for species_id in species_matches: + df = pd.read_excel( + io=path, header=header, index_col=index, sheet_name=species_id + ) + for well in self.plate.wells: + init_conc = df.iloc[well.y_pos, well.x_pos] + + if np.isnan(init_conc): + continue + + well.add_to_init_conditions( + species_id=species_id, + init_conc=init_conc, + conc_unit=conc_unit, + ) + count += 1 + + handle_blank_status( + well, species_id, init_conc, contributes_to_signal=None + ) + + if not silent: + print( + f"📍 Assigned {count} initial concentrations coditions for [bold magenta]{list(species_matches)}[/]" + f" from {path} to the plate." + ) + + def set_species_contribututes_to_signal( + self, + species: Molecule | Protein, + contributes_to_signal: bool, + wavelength: float | None = None, + silent: bool = False, + ): + """Set the contribution of a species to the signal in all wells. + + Args: + species (Molecule | Protein): The species for which to set the contribution to the signal. + contributes_to_signal (bool): If True, the species contributes to the signal. If False, the species does not contribute to the signal. + wavelength (float | None, optional): The wavelength at which to set the contribution to the signal. Defaults to None. + silent (bool, optional): If True, no output is printed. Defaults to False. + """ + if wavelength is None: + try: + wavelength = self._handle_wavelength() + except ValueError: + raise ValueError( + "Multiple wavelengths were measured. Please specify one." + ) + + for well in self.plate.wells: + if not well_contains_species(well, species.id): + print("Species not found in well.") + continue + + for measurement in well.measurements: + if measurement.wavelength != wavelength: + continue + + for state in measurement.blank_states: + if state.species_id == species.id: + state.contributes_to_signal = contributes_to_signal + + if not silent: + print( + f"Set contribution to signal of [bold magenta]{species.name}[/] ({species.id}) at" + f" {wavelength} nm to {contributes_to_signal}." + ) + + def get_well(self, id: str) -> Well: + """Get a well from the plate by its id. + + Args: + id (str): The id of the well. + + Raises: + ValueError: If the well with the given id is not found. + + Returns: + Well: The well object. + """ + + for well in self.plate.wells: + if well.id.lower() == id.lower(): + return well + + raise ValueError(f"Well {id} not found") + + def get_species(self, id: str) -> Protein | Molecule: + """Get a species from the list of molecules and proteins by its id. + + Args: + id (str): The id of the species. + + Raises: + ValueError: If the species with the given id is not found. + + Returns: + Protein | Molecule: The species object. + """ + for protein in self.proteins: + if protein.id == id: + return protein + for molecule in self.molecules: + if molecule.id == id: + return molecule + + raise ValueError(f"Species {id} not found") + + def visualize( + self, + zoom: bool = False, + wavelengths: list[float] = [], + darkmode: bool = False, + ): + """Visualize the plate. + + Args: + zoom (bool, optional): If False, the scaling of the signal (y-axis) is the same for all wells. + If True, the scaling is adjusted for each well. Defaults to False. + wavelengths (list[float], optional): Only visualize the signal at the specified wavelengths. + If not specified, all wavelengths are visualized. Defaults to []. + darkmode (bool, optional): If True, the plot is displayed in dark mode. Defaults to False. + """ + + visualize_plate( + self.plate, + zoom=zoom, + wavelengths=wavelengths, + darkmode=darkmode, + name=self.name, + ) + + def _handle_wavelength(self) -> float: + """ + If only one wavelength was measured, the wavelength is returned. + If multiple wavelengths were measured, an error is raised. + """ + + # check that all measurements in the wells have only one wavelength + wavelengths = set() + for well in self.plate.wells: + for meas in well.measurements: + wavelengths.add(meas.wavelength) + + if len(wavelengths) > 1: + raise ValueError("Multiple wavelengths were measured. Please specify one.") + + return wavelengths.pop() + + def _find_blanking_wells( + self, + target: Molecule | Protein, + wavelength: float, + ) -> list[Well]: + wells = [] + wavelength = self._handle_wavelength() + + protein_ids = [protein.id for protein in self.proteins] + molecules_ids = [molecule.id for molecule in self.molecules] + + # find wells that contain the target species with a concentration above zero + for well in self.plate.wells: + if not well_contains_species(well, target.id, conc_above_zero=True): + continue + + # Molecule controls can not include proteins + if target.id in molecules_ids and any( + [ + well_contains_species(well, protein_id, conc_above_zero=True) + for protein_id in protein_ids + ] + ): + continue + + for measurement in well.measurements: + if measurement.wavelength != wavelength: + continue + + # sanity check, species should be present in blank states + assert target.id in [ + state.species_id for state in measurement.blank_states + ], f"Species {target.id} not found in well {well.id}." + + # check is species contributes to signal (== is already blanked) + if measurement_is_blanked_for(measurement, target.id): + wells.append(well) + + return wells + + def slice_data( + self, + start: float, + end: float, + ): + """Slices the time and absorption data of all wells in the plate + that only contains the data between the start and end time. + + Args: + start (float): Start time of the slice. + end (float): End time of the slice. + """ + + for well in self.plate.wells: + for meas in well.measurements: + # find the index of the start and end time + start_idx = np.where(np.array(meas.time) >= start)[0][0] + end_idx = np.where(np.array(meas.time) <= end)[0][-1] + + # slice the time and absorption data + meas.time = meas.time[start_idx:end_idx] + meas.absorption = meas.absorption[start_idx:end_idx] + + def blank_species( + self, + species: Molecule | Protein, + wavelength: float | None = None, + silent: bool = False, + ): + """Blank the signal contribution of a species at a given wavelength. + Therefore, control wells of that species must be present on the plate. + + Args: + species (Molecule | Protein): The species to blank. + wavelength (float): The wavelength at which to blank the species. + silent (bool, optional): If True, no output is printed. Defaults to False. + + Raises: + ValueError: If no wells are found to calculate the absorption contribution of the species. + """ + + wavelength = self._handle_wavelength() + + blanking_wells = self._find_blanking_wells( + target=species, wavelength=wavelength + ) + if not blanking_wells: + print( + "No wells found to calculate the absorption contribution of the species." + ) + return + + # get mapping of concentration to blank wells + conc_blank_mapping = self._get_conc_blank_mapping( + wells=blanking_wells, species=species, wavelength=wavelength + ) + + self._apply_blank( + species=species, + conc_blank_mapping=conc_blank_mapping, + wavelength=wavelength, + ) + + def _apply_blank( + self, + species: Molecule | Protein, + conc_blank_mapping: dict[float, float], + wavelength: float, + ): + """Apply the blanking to the absorption data of a well. + + Args: + species (Molecule | Protein): The species to blank. + conc_blank_mapping (dict[float, float]): Mapping of init concentration of the species to mean absorption. + wavelength (float): The wavelength at which to blank the species. + """ + well_blanked_count = 0 + + for well_id, well in enumerate(self.plate.wells): + for meas_id, measurement in enumerate(well.measurements): + if measurement.wavelength != wavelength: + continue + + try: + init_condition = get_species_condition(well, species.id) + except ValueError: + continue + + for state_id, blank_state in enumerate(measurement.blank_states): + if blank_state.species_id != species.id: + continue + + if blank_state.contributes_to_signal: + self.plate.wells[well_id].measurements[meas_id].absorption = [ + absorption - conc_blank_mapping[init_condition.init_conc] + for absorption in measurement.absorption + ] + + self.plate.wells[well_id].measurements[meas_id].blank_states[ + state_id + ].contributes_to_signal = False + + well_blanked_count += 1 + + print(f"Blanked {well_blanked_count} wells containing {species.name}.\n") + + def to_enzymeml( + self, + detected_molecule: Molecule, + well_ids: list[str] | None = None, + wells_with_protein_only: bool = True, + name: str | None = None, + to_concentration: bool = False, + extrapolate: bool = False, + wavelength: float | None = None, + silent: bool = False, + ) -> EnzymeMLDocument: + """Convert the plate to an EnzymeML document. + + + Args: + name (str | None, optional): Name of the EnzymeML document. Defaults to the name of the plate. + detected_molecule (Molecule): The molecule that was detected in the wells. + well_ids (list[str] | None, optional): List of well ids to include in the EnzymeML document. + If not provided, all wells are included. Defaults to None. + to_concentration (bool, optional): If True, the signal is converted to concentration. Therefore, + a calibrator must be defined for the respective molecule. Defaults to False. + extrapolate (bool, optional): If True, and `to_concentration` is True, measured absorption values + that are outside the range of the calibrator are extrapolated. Defaults to False. + wells_with_protein_only (bool, optional): If True, only wells with protein are included in the + EnzymeML document. This assumes that wells with a protein are catalyzed wells. Defaults to True. + wavelength (float | None, optional): If multiple wavelengths were measured, the wavelength for + which to convert the signal to concentration needs to be specified. Defaults to None. + silent (bool, optional): If True, no output is printed. Defaults to False. + + Returns: + EnzymeMLDocument: [`pyenzyme`](https://github.com/EnzymeML/PyEnzyme) `EnzymeMLDocument` object. + """ + from mtphandler.ioutils.pyenzyme import Plate_to_EnzymeMLDocument + + if name is None: + name = self.name + + converter = Plate_to_EnzymeMLDocument( + name=name, + plate=self.plate, + well_ids=well_ids, + molecules=self.molecules, + detected_molecule=detected_molecule, + proteins=self.proteins, + to_concentration=to_concentration, + extrapolate=extrapolate, + wells_with_protein_only=wells_with_protein_only, + wavelength=wavelength, + silent=silent, + ) + + return converter.convert() + + def _well_id_exists(self, well_id: str) -> bool: + """Check if a well with the given id exists in the plate.""" + return any([well_id in well.id for well in self.plate.wells]) + + def _get_conc_blank_mapping( + self, + wells: list[Well], + species: Protein | Molecule, + wavelength: float, + ) -> dict[float, float]: + """Calculate the mean absorption of a species at different concentrations. + + Args: + wells (list[Well]): List of wells to calculate the mean absorption for. + species (Protein | Molecule): The species for which to calculate the mean absorption. + wavelength (float): The wavelength at which to calculate the mean absorption. + + Returns: + dict[float, float]: Mapping of concentration to mean absorption. + """ + conc_to_absorptions = defaultdict(list) + + # Collect all absorption data per concentration + for well in wells: + condition = get_species_condition(well, species.id) + absorption = get_measurement(well, wavelength).absorption + conc_to_absorptions[condition.init_conc].append(absorption) + + conc_mean_blank_mapping = {} + + # Calculate mean absorption and standard deviation + for conc, absorptions in conc_to_absorptions.items(): + mean_absorption = np.nanmean(absorptions) + std_absorption = np.nanstd(absorptions) + + # Handle case where mean_absorption is zero to avoid division by zero + if mean_absorption != 0: + std_perc = float(abs(std_absorption / mean_absorption) * 100) + else: + std_perc = 0.0 + + # Print formatted information + print( + f"Mean absorption of [bold magenta]{species.name}[/] ({species.id}) at" + f" {conc} {condition.conc_unit.name}: {mean_absorption:.4f} ±" + f" {std_perc:.0f}% calculated based on wells" + f" {[well.id for well in wells]}." + ) + + conc_mean_blank_mapping[conc] = mean_absorption.tolist() + + return conc_mean_blank_mapping + + @staticmethod + def _species_contibutes( + measurement: PhotometricMeasurement, species_id: str + ) -> bool: + species_contributes = [ + state.contributes_to_signal + for state in measurement.blank_states + if state.species_id == species_id + ][0] + + return species_contributes + + @staticmethod + def _get_blank_state( + measurement: PhotometricMeasurement, species_id: str + ) -> BlankState: + for state in measurement.blank_states: + if state.species_id == species_id: + return state + + raise ValueError(f"Species {species_id} is not present in this well.") + + @classmethod + def read_spectra_max_190( + cls, + path: str, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.txt` file exported from a SpectraMax 190 software and create a PlateManager object. + + Args: + path (str): Path to the SpectraMax 190 file. + ph (float | None, optional): The pH value of the measurements. Defaults to None. + name (str | None, optional): Name of the plate. Defaults to None. + + Returns: + PlateManager: PlateManager object. + """ + from mtphandler.readers import read_spectra_max_190 as reader + + data: dict[str, Any] = {"plate": reader(path, ph)} + + if name is not None: + data["name"] = name + + return cls(**data) + + @classmethod + def read_multiskan_spectrum_1500( + cls, + path: str, + time: list[float], + time_unit: UnitDefinition, + temperature: float, + temperature_unit: UnitDefinition = C, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.txt` file exported from a Multiskan Spectrum 1500 and create a PlateManager object. + + Args: + name (str): Name of the plate. + path (str): Path to the Multiskan Spectrum 1500 file. + time (list[float]): List of time points. + time_unit (UnitDefinition): Unit of time. + temperature (float): Temperature of the measurements. + temperature_unit (UnitDefinition, optional): Unit of temperature. Defaults to C. + ph (float | None, optional): The pH value of the measurements. Defaults to None. + + Returns: + _type_: _description_ + """ + from mtphandler.readers import read_multiskan_spectrum_1500 as reader + + data: dict[str, Any] = { + "plate": reader( + path=path, + time=time, + time_unit=time_unit, + temperature=temperature, + temperature_unit=temperature_unit, + ph=ph, + ) + } + + if name is not None: + data["name"] = name + + return cls(**data) + + @classmethod + def read_tecan_spark( + cls, + path: str, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.xlsx` TECAN Spark file and create a PlateManager object. + + Args: + path (str): Path to the TECAN Spark file. + ph (float | None, optional): The pH value of the measurements. Defaults to None. + name (str | None, optional): Name of the plate. Defaults to None. + + Returns: + PlateManager: PlateManager object. + """ + from mtphandler.readers import read_tekan_spark as reader + + data: dict[str, Any] = {"plate": reader(path, ph)} + + if name is not None: + data["name"] = name + + return cls(**data) + + @classmethod + def read_biotek( + cls, + path: str, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.xlsx` file exported from a BioTek Epoch 2 software and create a PlateManager object. + + Args: + path (str): Path to the BioTek Epoch 2 file. + ph (float | None, optional): The pH value of the measurements. Defaults to None. + name (str | None, optional): Name of the plate. Defaults to None. + + Returns: + PlateManager: PlateManager object. + """ + from mtphandler.readers import read_biotek as reader + + data: dict[str, Any] = {"plate": reader(path, ph)} + + if name is not None: + data["name"] = name + + return cls(**data) + + @classmethod + def read_tekan_magellan( + cls, + path: str, + wavelength: float, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.xlsx` file exported from a TECAN Magellan software and create a PlateManager object. + + Args: + path (str): Path to the Magellan file. + wavelength (float): The wavelength of the measurements. + ph (Optional[float], optional): The pH value of the measurements. Defaults to None. + name (Optional[str], optional): Name of the plate. Defaults to None. + + Returns: + PlateManager: PlateManager object. + """ + from mtphandler.readers import read_tekan_magellan as reader + + data: dict[str, Any] = {"plate": reader(path, wavelength, ph)} + + if name is not None: + data["name"] = name + + return cls(**data) + + @classmethod + def read_multiskan_sky( + cls, + path: str, + ph: float | None = None, + name: str | None = None, + ) -> PlateManager: + """Read a `*.xlsx` file exported from a Multiskan Sky and create a PlateManager object. + + Args: + path (str): Path to the Multiskan Sky file. + ph (float | None, optional): The pH value of the measurements. Defaults to None. + name (str | None, optional): Name of the plate. Defaults to None. + + Returns: + PlateManager: _description_ + """ + from mtphandler.readers import read_multiskan_sky as reader + + data: dict[str, Any] = {"plate": reader(path, ph)} + + if name is not None: + data["name"] = name + + return cls(**data) + + +if __name__ == "__main__": + # path = ( + # "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/spectra_max_190.txt" + # ) + + # pm = PlateManager.read_spectra_max_190(path, ph=6.9) + # pm.visualize() + + # pm + + # h1 = pm.get_well("H1") + + # print(h1.id) + # print(h1.x_pos) + # print(h1.y_pos) + # print(h1.measurements[0].absorption[0]) + # print(h1.measurements[0].absorption[-1]) + + # path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/tekan_spark.xlsx" + # from mtphandler.model import Plate + + # p = PlateManager.read_tecan_spark(path, 7.4) + + # print(p.plate.temperatures) + + # p.visualize() + + # path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/magellan.xlsx" + + # plate = PlateManager.read_tekan_magellan(path, wavelength=600, ph=7) + + # plate.visualize() + + # path = ( + # "/Users/max/Documents/training_course/jules/Spectramax190 molecular Devices.txt" + # ) + + # from mtphandler.units import mM + + # pm = PlateManager.read_spectra_max_190(path, ph=7) + # # pm.visualize() + # testo = pm.define_molecule("testo", 60857, "testosterone") + # mpi = pm.define_molecule("mpi", 60961, "methylparaben") + + # pm.assign_species( + # species=testo, + # init_conc=0.1, + # conc_unit=mM, + # to="all", + # ) + + # path = ( + # "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/ BioTek_Epoch2.xlsx" + # ) + + # from mtphandler.units import mM + + # p = PlateManager.read_biotek(path, ph=7.4) + # p.visualize() + + # testo = p.define_molecule("testo", 60857, "testosterone") + # aldolase = p.define_protein("aldolase", "Aldolase") + + # p.assign_init_conditions( + # species=testo, + # init_conc=0.1, + # conc_unit=mM, + # to="all", + # ) + + # aldo = p.define_protein("aldolase", "Aldolase") + # p.assign_init_conditions( + # species=aldo, init_conc=0.1, conc_unit=mM, to="all", contributes_to_signal=False + # ) + + # enz = p.to_enzymeml(name="Test EnzymeML", wells_with_protein_only=False) + + # with open("enz.json", "w") as f: + # f.write(enz.model_dump_json(indent=4)) + # print(len(enz.measurements)) + + # from mtphandler.units import C, min + + # path = "docs/examples/data/multiskan_spectrum_1500.txt" + + # ph = 7.0 + # wavelength = 450.0 + + # time = np.arange(0, 15.5, 0.5).tolist() + # print(f"the thime is {time}") + + # plate = PlateManager.read_multiskan_spectrum_1500( + # path=path, + # ph=ph, + # time=time, + # time_unit=min, + # temperature=37.0, + # temperature_unit=C, + # ) + # plate.visualize() + + path = ( + "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/Multiskan Sky.xlsx" + ) + + p = PlateManager.read_multiskan_sky(path, ph=None) + p.visualize(darkmode=True) + print(p) + + +class BlankResult(BaseModel): + species_id: str = Field( + description="The id of the species for which the blank was calculated.", + default=None, + ) + wavelength: float = Field( + description="The wavelength at which the blank was calculated.", + default=None, + ) + control_well_ids: list[str] = Field( + description="The ids of the wells used to calculate the blank.", + default=[], + ) + mean_contribution: float = Field( + description="The mean contribution of the species to the signal.", + default=None, + ) + std_contribution: float = Field( + description="The standard deviation of the contribution of the species to the signal.", + default=None, + ) + applied_to_well_ids: list[str] = Field( + description="The ids of the wells to which the blank was applied.", + default=[], + ) diff --git a/mtphandler/readers/__init__.py b/mtphandler/readers/__init__.py new file mode 100644 index 0000000..2aecda4 --- /dev/null +++ b/mtphandler/readers/__init__.py @@ -0,0 +1,7 @@ +from .biotek import read_biotek # noqa +from .multiskan_spectrum_parser import read_multiskan_spectrum_1500 # noqa +from .spectra_max_190 import read_spectra_max_190 # noqa +from .spectramax_parser import read_spectramax # noqa +from .tekan_magellan import read_tekan_magellan # noqa +from .tekan_spark import read_tekan_spark # noqa +from .multiskan_sky import read_multiskan_sky # noqa diff --git a/mtphandler/readers/biotek.py b/mtphandler/readers/biotek.py new file mode 100644 index 0000000..067a098 --- /dev/null +++ b/mtphandler/readers/biotek.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import re +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd + +from mtphandler.model import Plate +from mtphandler.readers.utils import id_to_xy +from mtphandler.tools import get_well +from mtphandler.units import C, second + +PATTERN_WAVELENGTH = r"Wavelengths:\s+(\d{1,4})([\s,;]+\d{1,4})*" + + +def extract_integers(s: str) -> list[int]: + # Find all sequences of digits in the string + matches = re.findall(r"\d+", s) + # Convert the matched strings to integers + return [int(match) for match in matches] + + +def parse_measurement_interval(s: str) -> float: + # Flexible regex pattern to match time formats + time_pattern = r"(\d{1,4}):(\d{1,2}):(\d{1,2})" + + interval_match = re.search(r"Interval\s+" + time_pattern, s, re.IGNORECASE) + + if not interval_match: + raise ValueError("Measurement interval not found.") + + interval = timedelta( + hours=int(interval_match.group(1)), + minutes=int(interval_match.group(2)), + seconds=int(interval_match.group(3)), + ) + + # interval in minutes + return interval.total_seconds() / 60 + + +def read_biotek( + path: str, + ph: float | None, +) -> Plate: + df = pd.read_excel(path) + + date = get_row_by_value(df, "Date")[-1] + time = get_row_by_value(df, "Time")[-1] + timestamp = datetime.combine(date, time.time()).isoformat() + + row_index_int_map = df.iloc[:, 0].apply(lambda cell: isinstance(cell, int)) + data_block_starts = [ + index for index, value in enumerate(row_index_int_map) if value + ] + + wavelengths_cell = str(df.iloc[19, 1]) + wavelengths = extract_integers(wavelengths_cell) + + measurement_int_cell = str(df.iloc[15, 1]) + measurement_interval = parse_measurement_interval(measurement_int_cell) + + plate = Plate( + date_measured=timestamp, + time_unit=second, + temperature_unit=C, + ) + + for row_index, (block_start, wavelength) in enumerate( + zip(data_block_starts, wavelengths) + ): + try: + block = df.iloc[block_start + 2 : data_block_starts[row_index + 1], :] + except IndexError: + block = df.iloc[block_start + 2 :, :] + + block = block.drop("Unnamed: 0", axis=1).reset_index(drop=True) + + all_nan_rows = block.isna().all(axis=1) + first_all_nan_index = all_nan_rows.idxmax() if all_nan_rows.any() else None + + # drop rows if any of the values are NaN + block.iloc[:first_all_nan_index, :] + block = block.dropna(how="any", axis=0) + column_names = block.iloc[0, :].tolist() + block.columns = column_names + block = block[1:].reset_index(drop=True) + + # Temperature + temperature = block.pop(column_names[1]).values + + for column_name in column_names[2:]: + x, y = id_to_xy(column_name) + + try: + well = get_well(plate, column_name) + except ValueError: + well = plate.add_to_wells(id=column_name, x_pos=x, y_pos=y, ph=ph) + + data = block[column_name].values.tolist() + time = np.arange( + 0, len(data) * measurement_interval, measurement_interval + ).tolist() + + well.add_to_measurements( + wavelength=wavelength, + absorption=data, + time=time, + time_unit=second, + ) + + plate.temperatures = temperature.tolist() + plate.times = time + + # assert that all plate -> well -> measurement -> absorption have the same length + for well in plate.wells: + for measurement in well.measurements: + assert len(measurement.absorption) == len(measurement.time), ( + f"Absorption and time data for well {well.id} and wavelength " + f"{measurement.wavelength} do not have the same length." + ) + + return plate + + +def get_row_by_value(df: pd.DataFrame, value: str) -> list: + row_df = df[df.iloc[:, 0].values == value] + row_df = row_df.reset_index(drop=True) + row_df = row_df.dropna(axis=1, how="all") + return row_df.loc[0].values.tolist() + + +if __name__ == "__main__": + path = ( + "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/ BioTek_Epoch2.xlsx" + ) + + print(read_biotek(path, ph=7.4)) diff --git a/mtphandler/readers/multiskan_sky.py b/mtphandler/readers/multiskan_sky.py new file mode 100644 index 0000000..33ab267 --- /dev/null +++ b/mtphandler/readers/multiskan_sky.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import pandas as pd + +from mtphandler.model import Plate, Well +from mtphandler.readers.utils import id_to_xy +from mtphandler.units import C, second + + +def read_multiskan_sky( + path: str, + ph: float | None, +) -> Plate: + sheetnames = pd.ExcelFile(path).sheet_names + + RAW_DATA = next(sheet for sheet in sheetnames if "Raw data" in sheet), None + GENERAL_INFO = next(sheet for sheet in sheetnames if "General info" in sheet), None + + if RAW_DATA is None or GENERAL_INFO is None: + raise ValueError( + "The provided Excel file does not contain the expected sheets." + ) + + df, timestamp = raw_data_to_df(RAW_DATA[0], path) + temperature = get_temperature(GENERAL_INFO[0], path) + + wells = df_to_wells(df, ph) + + return Plate( + wells=wells, + date_measured=timestamp, + temperatures=[temperature], + temperature_unit=C, + ) + + +def get_temperature(sheetname: str, path: str) -> float: + df = pd.read_excel(path, sheet_name=sheetname) + + return float(df.iloc[6, 3].split(" ")[0]) + + +def raw_data_to_df(sheetname: str, path: str) -> pd.DataFrame: + # Read the sheet from the Excel file + df = pd.read_excel(path, sheet_name=sheetname) + + # Extract the timestamp from the first row + timestamp = str(df.iloc[0, 0]) + + # Identify the row number where 'Well' is located + well_row = df[df.iloc[:, 0] == "Well"].index[0] + + # Extract the data from the identified row onwards + data_df = df.iloc[well_row:, :] + + # Set the first row as column names + data_df.columns = data_df.iloc[0] + + # Drop the row with the column names as it is now set as the header + data_df = data_df.drop(data_df.index[0]) + + # Reset the index and set a MultiIndex with 'Well' and 'Wavelength(s) [nm]' + data_df.set_index(["Well", "Wavelength(s) [nm]"], inplace=True) + + return data_df, timestamp + + +def df_to_wells(df: pd.DataFrame, ph: float | None) -> pd.DataFrame: + wells = [] + existing_well_ids = set() + + for well_id in df.index.get_level_values("Well").unique(): + df_well = df.loc[well_id] + well_id = well_id.replace(" ", "").strip() + + for wavelength in df_well.index.get_level_values("Wavelength(s) [nm]").unique(): + df_wavelength = df_well.loc[wavelength] + df_wavelength = df_wavelength.sort_values(by="Measurement time(s)") + + if well_id not in existing_well_ids: + x, y = id_to_xy(well_id) + well = Well( + id=well_id, + x_pos=x, + y_pos=y, + ph=ph, + ) + existing_well_ids.add(well_id) + + well.add_to_measurements( + wavelength=wavelength, + absorption=df_wavelength["Raw absorbance"].tolist(), + time=df_wavelength["Measurement time(s)"] / 60, + time_unit=second, + ) + + wells.append(well) + + return wells + + +if __name__ == "__main__": + path = ( + "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/Multiskan Sky.xlsx" + ) + + print(read_multiskan_sky(path, ph=None)) diff --git a/mtphandler/readers/multiskan_spectrum_parser.py b/mtphandler/readers/multiskan_spectrum_parser.py new file mode 100644 index 0000000..1c7d844 --- /dev/null +++ b/mtphandler/readers/multiskan_spectrum_parser.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import re +from collections import defaultdict + +import numpy as np +import pandas as pd + +from mtphandler.model import Plate, UnitDefinition +from mtphandler.units import C, nm + + +def read_multiskan_spectrum_1500( + path: str, + time: list[float], + time_unit: UnitDefinition, + temperature_unit: UnitDefinition, + ph: float | None = None, + temperature: float | None = None, +) -> Plate: + # Extract temperature from path + if not temperature: + TEMP_PATTERN = r"\d{1,3}deg" + temperature = re.findall(TEMP_PATTERN, path)[0] + temperature = re.split("(\d+)", temperature)[1] + if not temperature: + raise ValueError("Could not find pH in path. Please specify 'ph'.") + + if not temperature_unit: + temperature_unit = C + + if isinstance(time, np.ndarray): + time = time.tolist() + + # Extract pH from path + if not ph: + PH_PATTERN = r"pH\d+\.\d+" + ph = re.findall(PH_PATTERN, path)[0] + ph = float(re.search(r"\d+\.\d+", ph).group()) + if not ph: + raise ValueError("Could not find pH in path. Please specify pH.") + + # Read file + data_dict = extract_data(pd.read_csv(path).reset_index()) + + # Extract plate dimensions and number of measured timepoints + n_rows, n_columns, n_timepoints = next(iter(data_dict.values())).shape + + if len(time) != n_timepoints: + raise ValueError( + f"Number of timepoints in data set ({n_timepoints}) does not match " + f"number of timpoints in provided 'time' array ({len(time)})." + ) + + # Create plate + plate = Plate( + temperatures=[temperature], + temperature_unit=temperature_unit, + times=time, + time_unit=time_unit, + ) + + # Add wells to plate + for wavelength, data in data_dict.items(): + for row_id, row in enumerate(data): + for column_id, column in enumerate(row): + id = _coordinates_to_id(column_id, row_id) + if id not in [well.id for well in plate.wells]: + plate.add_to_wells( + id=id, + x_pos=column_id, + y_pos=row_id, + ph=ph, + ) + + well = [well for well in plate.wells if well.id == id][0] + well.add_to_measurements( + wavelength=wavelength, + wavelength_unit=nm, + absorption=column, + time=time, + time_unit=time_unit, + ) + + return plate + + +def extract_data(df: pd.DataFrame) -> dict[int, list[list[float]]]: + # Get slices of the data corresponding each iteration of measurement + wavelength_df_dict = _get_plate_dfs(df) + + # Extract data from each iteration + wavelength_data_dict = defaultdict(list) + for wavelength, dfs in wavelength_df_dict.items(): + for df in dfs: + data_of_iteration = df.apply(_extract_row_data, axis=1) + data_of_iteration = np.array(data_of_iteration.to_list()) + + wavelength_data_dict[wavelength].append(data_of_iteration) + + # Convert data to numpy arrays + for wavelength, data in wavelength_data_dict.items(): + wavelength_data_dict[wavelength] = np.array(data).swapaxes(0, 2).swapaxes(0, 1) + + return wavelength_data_dict + + +def _get_plate_dfs(df: pd.DataFrame) -> dict[int, list[pd.DataFrame]]: + wavelength_slices_dict = defaultdict(list) + + # Get data by wavelength + wavelengths = df.apply(_get_wavelengths, axis=1).dropna().astype(int).tolist() + + wavelength_dfs = _segment_dataframe(df, "Wavelength") + + for wavelength, wavelength_df in zip(wavelengths, wavelength_dfs): + iteration_dfs = _segment_dataframe(wavelength_df, "Iteration") + + for iteration_df in iteration_dfs: + wavelength_slices_dict[wavelength].append(iteration_df) + + return wavelength_slices_dict + + +def _extract_row_data(row: pd.Series): + value = row.values[1] + return [float(x) for x in value.split("\t") if len(x) != 0] + + +def _segment_dataframe(df: pd.DataFrame, key: str): + def filter_function(row: pd.Series): + row_id, value = row.values + if key in value: + return row_id + + slice_ids = df.apply(filter_function, axis=1).dropna().astype(int).values.tolist() + + list_of_slices = _list_to_slices(slice_ids) + + return (df.loc[slc] for slc in list_of_slices) + + +def _list_to_slices(_list: list[int]) -> list[slice]: + slices = [] + for idx, element in enumerate(_list): + try: + start_data_slice = element + 1 + end_data_slice = _list[idx + 1] - 1 + slices.append(slice(start_data_slice, end_data_slice)) + + except IndexError: + start_data_slice = element + 1 + end_data_slice = None + slices.append(slice(start_data_slice, end_data_slice)) + + return slices + + +def _get_wavelengths(row: pd.Series): + value = row.values[1] + + if "Wavelength" in value: + return re.findall(r"Wavelength:(.*)", value)[0] + + +def _coordinates_to_id(x: int, y: int) -> str: + return f"{chr(y + 65)}{x+1}" + + +def id_to_xy(well_id: str): + return ord(well_id[0].upper()) - 65, int(well_id[1:]) - 1 + + +if __name__ == "__main__": + import numpy as np + from devtools import pprint + + from mtphandler.model import Plate + from mtphandler.units import C, min + + path = "docs/examples/data/multiskan_spectrum_1500.txt" + + ph = 7.0 + wavelength = 450.0 + + time = np.arange(0, 15.5, 0.5).tolist() + print(f"the thime is {time}") + + pprint( + read_multiskan_spectrum_1500( + path=path, + ph=ph, + time=time, + time_unit=min, + temperature=37.0, + temperature_unit=C, + ) + ) diff --git a/mtphandler/readers/spectra_max_190.py b/mtphandler/readers/spectra_max_190.py new file mode 100644 index 0000000..f14f972 --- /dev/null +++ b/mtphandler/readers/spectra_max_190.py @@ -0,0 +1,267 @@ +import re +from io import StringIO + +import numpy as np +import pandas as pd +from loguru import logger + +from mtphandler.model import Plate, Well +from mtphandler.readers.utils import id_to_xy, xy_to_id +from mtphandler.units import C, second + + +class WrongParserError(Exception): + """Exception raised when the wrong parser is used to read a file.""" + + def __init__(self, parser_name, expected_format): + self.parser_name = parser_name + self.expected_format = expected_format + super().__init__(self._generate_message()) + + def _generate_message(self): + return ( + f"Error in {self.parser_name}: Expected format '{self.expected_format}', " + ) + + +def read_spectra_max_190(path, ph: float | None) -> Plate: + """ + Reads SpectraMax 190 data from a file and returns a plate object. + + Args: + path (str): The path to the file containing the SpectraMax 190 data. + ph (float | None, optional): The pH value. Defaults to None. + + Returns: + plate: The plate object containing the data. + + Raises: + WrongParserError: If the file format is not SpectraMax 190. + ValueError: If the wavelengths could not be extracted or the data blocks are not of equal shape. + + """ + + iso_encoding = "ISO-8859-1" + utf16_encoding = "utf-16" + + try: + lines = open_file(path, iso_encoding) + if "##BLOCKS" not in lines[0]: + raise ValueError + except ValueError: + lines = open_file(path, utf16_encoding) + if "##BLOCKS" not in lines[0]: + raise WrongParserError( + parser_name="read_spectra_max_190", + expected_format="SpectraMax 190", + ) + + wavelength_pattern = r"(?:[^\t]*\t){15}(\d+)" + + # Extract the wavelength + try: + wavelength = float(re.findall(wavelength_pattern, lines[1])[0]) + except ValueError: + raise ValueError("Wavelengths could not be extracted.") + + try: + blocks = identify_blocks(lines) + times, temperatures, blocks = sanitize_blocks(blocks) + try: + data_matrix = np.array(blocks) + data_matrix = data_matrix.swapaxes(0, 2) + except ValueError: + raise ValueError( + "Data blocks are not of equal shape, file seems corrupted." + ) + plate = map_to_plate(data_matrix, times, temperatures, ph, wavelength) + + return plate + + except IndexError: + for line_id, line in enumerate(lines): + if line.startswith("Time"): + start_id = line_id + if line.startswith("\n"): + end_id = line_id + break + + data = lines[start_id:end_id] + # make pandas df from öist of strings + data_str = "\n".join(data) + + # Use StringIO to simulate a file object + data_io = StringIO(data_str) + + # Use pd.read_csv with sep='\t' to read the data into a DataFrame + df = pd.read_csv(data_io, sep="\t") + # drop unnamed columns + df = df.loc[:, ~df.columns.str.contains("^Unnamed")] + print(df.index) + time = df.pop("Time") + time = [time_to_min_float(t) for t in time] + temperatures = df.pop("Temperature(¡C)").values.tolist() + print(df) + + # iterate over the columns and create wells + wells = [] + for column in df.columns: + x, y = id_to_xy(column) + well = Well( + id=column, + x_pos=x, + y_pos=y, + ph=ph, + ) + well.add_to_measurements( + wavelength=wavelength, + absorption=df[column].values.tolist(), + time=time, + time_unit=second, + ) + wells.append(well) + + plate = Plate( + time_unit=second, + temperatures=temperatures, + temperature_unit=C, + wells=wells, + ) + + return plate + + +def map_to_plate( + data_matrix: np.ndarray, + times: list[float], + temperatures: list[float], + ph: float | None, + wavelength: float, +): + """ + Maps a data matrix to a Plate object. + + Args: + data_matrix (np.ndarray): The data matrix containing the measurements. + times (list[float]): The list of time values. + temperatures (list[float]): The list of temperature values. + ph (float | None): The pH value or None if not applicable. + wavelength (float): The wavelength value. + + Returns: + Plate: The Plate object containing the mapped data. + """ + + wells = [] + for column_id in range(data_matrix.shape[0]): + for row_id in range(data_matrix.shape[1]): + well = Well( + id=xy_to_id(column_id, row_id), + ph=ph, + x_pos=column_id, + y_pos=row_id, + ) + assert ( + len(times) == data_matrix[column_id, row_id].size + ), "Time and data length mismatch." + well.add_to_measurements( + wavelength=wavelength, + absorption=data_matrix[column_id, row_id].tolist(), + time=times, + time_unit=second, + ) + wells.append(well) + + # Create plate + plate = Plate( + time_unit=second, + temperatures=temperatures, + temperature_unit=C, + wells=wells, + ) + + return plate + + +def identify_blocks(lines): + """Identify blocks in the file.""" + blocks = [] + current_block = [] + time_pattern = re.compile( + r"^\d{1,2}:\d{2}(?::\d{2})?(?:\s|$)" + ) # Pattern to match h:mm or hh:mm:ss format + + # get line number is which time pattern is found + section_starts = np.array( + [i for i, line in enumerate(lines) if time_pattern.match(line)] + ) + + # check if the distance between the time pattern is consistent + if not np.all(np.diff(section_starts) == section_starts[1] - section_starts[0]): + logger.debug("Inconsistent time pattern found in file.") + # get the number of lines in each section + section_length = section_starts[1] - section_starts[0] + section_ends = section_starts + section_length - 1 + + # build slices for each section for extraction into list of blocks + slices = [slice(start, end) for start, end in zip(section_starts, section_ends)] + + for s in slices: + current_block = [line for line in lines[s]] + if len(current_block[0].split(",")) == 2: + continue + blocks.append(current_block) + + return blocks + + +def is_increasing_by_one(lst): + return all(lst[i] + 1 == lst[i + 1] for i in range(len(lst) - 1)) + + +def sanitize_blocks(blocks): + times, temperatures = [], [] + + for block_id, block in enumerate(blocks): + for line_id, line in enumerate(block): + if line_id == 0: + time, temp, line = line.split("\t", 2) + times.append(time_to_min_float(time)) + temperatures.append(float(temp.replace(",", "."))) + + line = line.strip() + line = line.replace(",", ".") + line = [float(entry) for entry in line.split("\t") if entry != ""] + + blocks[block_id][line_id] = line + + return times, temperatures, blocks + + +def time_to_min_float(time_str: str): + time_parts = time_str.split(":") + # Calculate time since zero in minutes + if len(time_parts) == 3: # hh:mm:ss format + h, m, s = time_parts + return float(h) * 60 + float(m) + float(s) / 60 + elif len(time_parts) == 2: # h:mm format + m, s = time_parts + return float(m) + float(s) / 60 + else: + raise ValueError(f"Unexpected time format: '{time_str}'") + + +def open_file(path: str, encoding: str): + with open(path, "r", encoding=encoding) as file: + lines = file.readlines() + return lines + + +if __name__ == "__main__": + path = ( + "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/spectra_max_190.txt" + ) + + plate = read_spectra_max_190(path, ph=6.9) + + print(plate.wells[0].measurements[0].absorption) diff --git a/mtphandler/readers/spectramax_parser.py b/mtphandler/readers/spectramax_parser.py new file mode 100644 index 0000000..062979d --- /dev/null +++ b/mtphandler/readers/spectramax_parser.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import re +from datetime import datetime + +import numpy as np +import pandas as pd +from pyenzyme.model import UnitDefinition + +from mtphandler.model import Plate, Well +from mtphandler.units import C, nm + + +def read_spectramax( + path: str, + time_unit: UnitDefinition, + ph: float | None = None, +): + df = pd.read_csv( + path, + sep="delimiter", + encoding="utf-16", + engine="python", + skiprows=15, + ) + + df = df.map(lambda x: x.split("\t")) + + # Get date of measurement + last_saved = re.findall( + r"\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} [APMapm]{2}", df.iloc[-1, 0][0] + )[0] + created = datetime.strptime(last_saved, "%Y/%m/%d %I:%M:%S %p").isoformat() + + wavelengths = df.iloc[0, 0][-6] + wavelengths = [ + float(wavelength) for wavelength in wavelengths.split(" ") if wavelength != "" + ] + + # data + datas = df.iloc[2:-9]["~End"].tolist() + time_pattern = r"\d{2}:\d{2}:\d{2}" + times = [] + temperatures = [] + block_start_ids = [] + for data in datas: + if re.match(time_pattern, data[0]): + times.append(to_time(data[0], time_unit)) + temperatures.append(float(data[1])) + block_start_ids.append(datas.index(data)) + + time_blocks = [] + for index, start_id in enumerate(block_start_ids): + try: + time_block = datas[start_id : block_start_ids[index + 1]] + except IndexError: + time_block = datas[start_id:] + + time_block[0] = time_block[0][2:] # remove time and temperature from block + for row_id, row in enumerate(time_block): + if "" not in row: + continue + + wavelength_entry = [] + wavelength_entries = [] + for item in row: + if item == "": + if wavelength_entry: + wavelength_entries.append(wavelength_entry) + wavelength_entry = [] # reset + + else: + wavelength_entry.append(item) + + if item != "": + wavelength_entries.append(wavelength_entry) + + time_block[row_id] = wavelength_entries + + time_blocks.append(time_block) + + # Swap dimensions: rows, columns, wavelengths, timecourse + data = np.array(time_blocks).astype(float) + data = data.swapaxes(0, 3) + data = data.swapaxes(0, 1) + + # create wells + wells = [] + for row_id, row in enumerate(data): + for column_id, column in enumerate(row): + well = Well( + id=_coordinates_to_id(column_id, row_id), + ph=ph, + x_pos=column_id, + y_pos=row_id, + ) + for wavelength_id, wavelength in enumerate(column): + well.add_to_measurements( + wavelength=wavelengths[wavelength_id], + wavelength_unit=nm, + absorption=wavelength.tolist(), + time=times, + ) + wells.append(well) + + # Create plate + plate = Plate( + date_measured=created, + time_unit=time_unit, + temperatures=temperatures, + temperature_unit=C, + wells=wells, + ) + + return plate + + +def _coordinates_to_id(x: int, y: int) -> str: + return f"{chr(y + 65)}{x+1}" + + +if __name__ == "__main__": + from mtphandler.units import s + + path = "tests/data/ABTS_EnzymeML_340nm_420nm_2.5x_pH3_25deg.txt" + + print(read_spectramax(path, ph=6.9, time_unit=s)) diff --git a/mtphandler/readers/tekan_magellan.py b/mtphandler/readers/tekan_magellan.py new file mode 100644 index 0000000..062b3f0 --- /dev/null +++ b/mtphandler/readers/tekan_magellan.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import math +import re +from collections import defaultdict +from datetime import datetime + +import pandas as pd + +from mtphandler.model import Plate +from mtphandler.readers.utils import WELL_ID_PATTERN, id_to_xy +from mtphandler.units import C, second + + +def read_tekan_magellan( + path: str, + wavelength: float, + ph: float | None, +) -> Plate: + df = pd.read_excel(path, header=None) + + # Define the format of the input datetime string + date_format = "%A, %B %d, %Y: %H:%M:%S" + + data = defaultdict(list) + temperatures = [] + times = [] + dates = [] + for row in df.iterrows(): + timecourser_data = row[1].values[0] + if not isinstance(timecourser_data, str): + break + else: + date_str, time_str, temperature_str = timecourser_data.split("/") + temp_value, _ = temperature_str.strip().split("°") + temperatures.append(float(temp_value)) + time, time_unit = time_str[1:-1].split(" ") + + times.append(float(time)) + dates.append(datetime.strptime(date_str.strip(), date_format)) + + created = dates[0] + print(times) + + df = df.dropna(how="all") + + for row in df.iterrows(): + first_cell = str(row[1].values[0]) + if not re.findall(WELL_ID_PATTERN, first_cell): + continue + + key = None + for element in row[1].values: + if isinstance(element, str): + key = element + elif math.isnan(element): + continue + else: + data[key].append(element) + + plate = Plate( + date_measured=str(created), + temperature_unit=C, + temperatures=temperatures, + time_unit=second, + times=times, + ) + + for well_id, abso_list in data.items(): + if well_id is not None: + x_pos, y_pos = id_to_xy(well_id) + else: + raise ValueError("Well ID not found in the data.") + + well = plate.add_to_wells( + ph=ph, + id=well_id, + x_pos=x_pos, + y_pos=y_pos, + ) + well.add_to_measurements( + wavelength=wavelength, + absorption=abso_list, + time_unit=second, + time=times, + ) + + return plate + + +if __name__ == "__main__": + path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/magellan.xlsx" + from devtools import pprint + + plate = read_tekan_magellan(path, wavelength=600, ph=7) + pprint(plate.wells[0]) diff --git a/mtphandler/readers/tekan_spark.py b/mtphandler/readers/tekan_spark.py new file mode 100644 index 0000000..af73fb7 --- /dev/null +++ b/mtphandler/readers/tekan_spark.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from datetime import datetime + +import pandas as pd +from typing_extensions import Optional + +from mtphandler.model import Plate +from mtphandler.readers.utils import id_to_xy +from mtphandler.units import C, second + + +def read_tekan_spark( + path: str, + ph: Optional[float], +) -> Plate: + df = pd.read_excel(path) + + if not df.iloc[1, 0] == "Device: Spark": + raise ValueError("The file does not seem to be a Tekan Spark file.") + + cycle_no_row_index = df[df.iloc[:, 0].str.contains("Cycle Nr.", na=False)].index[0] + meta_df = df.iloc[:cycle_no_row_index, :] + data_df = df.iloc[cycle_no_row_index:, :].reset_index(drop=True) + + meta_df = ( + meta_df.dropna(how="all") + .dropna(axis=1, how="all") + .set_index(meta_df.columns[0]) + ) + time_measured = meta_df.loc["Start Time"].dropna(axis=1, how="all").values[0][0] + time_measured = datetime.strptime(time_measured, "%Y-%m-%d %H:%M:%S") + + wavelength = meta_df.loc["Measurement wavelength"].dropna().iloc[0] + + data_df = data_df.set_index(data_df.columns[0]) + column_names = data_df.iloc[0, :].tolist() + data_df.columns = column_names + data_df = data_df[1:].reset_index(drop=True).dropna(axis=1, how="all") + first_nan_index = data_df.isna().any(axis=1).idxmax() + data_df = data_df.iloc[:first_nan_index, :] + + time_series = data_df.pop("Time [s]") / 60 + temp_series = data_df.pop("Temp. [°C]") + + plate = Plate( + date_measured=str(time_measured), + temperatures=temp_series.values.tolist(), + temperature_unit=C, + time_unit=second, + times=time_series.values.tolist(), + ) + + for column in data_df.columns: + x, y = id_to_xy(column) + well = plate.add_to_wells( + id=column, + x_pos=x, + y_pos=y, + ph=ph, + ) + + well.add_to_measurements( + wavelength=wavelength, + wavelength_unit="nm", + absorption=data_df[column].values.tolist(), + time=time_series.values.tolist(), + time_unit=second, + ) + + return plate + + +if __name__ == "__main__": + from devtools import pprint + + path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/tekan_spark.xlsx" + from mtphandler.model import Plate + + p = read_tekan_spark(path, 7.4) + + pprint(p.wells[0]) diff --git a/mtphandler/readers/utils.py b/mtphandler/readers/utils.py new file mode 100644 index 0000000..889e7ac --- /dev/null +++ b/mtphandler/readers/utils.py @@ -0,0 +1,28 @@ +from mtphandler.model import Well + +# regex patterns +WELL_ID_PATTERN = r"[A-H][0-9]{1,2}" + + +def xy_to_id(x: int, y: int) -> str: + """Well coordinates to well ID""" + return f"{chr(y + 65)}{x+1}" + + +def id_to_xy(well_id: str) -> tuple[int, int]: + """Well ID to well coordinates""" + return int(well_id[1:]) - 1, ord(well_id[0].upper()) - 65 + + def get_well(self, id: str) -> Well: + for well in self.plate.wells: + if well.id.lower() == id.lower(): + return well + + raise ValueError(f"Well {id} not found") + + +if __name__ == "__main__": + print(xy_to_id(0, 0)) + print(id_to_xy("A1")) + print(id_to_xy("H12")) + print(id_to_xy("C3")) diff --git a/mtphandler/tools.py b/mtphandler/tools.py new file mode 100644 index 0000000..75cea22 --- /dev/null +++ b/mtphandler/tools.py @@ -0,0 +1,158 @@ +import importlib.resources as pkg_resources + +import httpx +import toml + +from mtphandler.model import InitCondition, PhotometricMeasurement, Plate, Well + + +def read_static_file(path, filename: str): + """Reads a static file from the specified library path. + + Args: + path (Module): Import path of the library. + filename (str): The name of the file to read. + + Returns: + dict: The contents of the file as a dictionary. + """ + + source = pkg_resources.files(path).joinpath(filename) + with pkg_resources.as_file(source) as file: + return toml.load(file) + + +def get_measurement(well: Well, wavelength: float) -> PhotometricMeasurement: + """ + Get the measurement object for a given well and wavelength. + + Args: + well (Well): The well object. + wavelength (float): The wavelength of the measurement. + + Returns: + PhotometricMeasurement: The measurement object. + + Raises: + ValueError: If no measurement is found for the given well and wavelength. + """ + + for measurement in well.measurements: + if measurement.wavelength == wavelength: + return measurement + + raise ValueError( + f"No measurement found for well {well.id} at wavelength {wavelength}." + ) + + +def well_contains_species( + well: Well, species_id: str, conc_above_zero: bool = False +) -> bool: + """Check if a well contains a species with the given ID, and optionally, if its concentration is above zero. + + Args: + well (Well): The well to check. + species_id (str): The ID of the species. + conc_above_zero (bool): If True, checks if the species' concentration is above zero. + + Returns: + bool: True if the species is present in the well (and has a concentration above zero if conc_above_zero is True), otherwise False. + """ + for condition in well.init_conditions: + if condition.species_id == species_id: + # If conc_above_zero is True, check if concentration is > 0 + if conc_above_zero: + return condition.init_conc > 0 + # Otherwise, just return True if species is present + return True + + return False + + +def handle_blank_status( + well: Well, + species_id: str, + init_conc: float, + contributes_to_signal: bool | None, +): + """Add blank status to the measurements of a well. + If the concentration is 0, the species does not contribute to the signal. + If the concentration is not 0, the species contributes to the signal unless + overwriten by the `contributes_to_signal` argument. + + Args: + well (Well): Well for which to add blank status. + species_id (str): ID of the species. + init_conc (float): Initial concentration of the species. + contributes_to_signal (bool | None): Whether the species contributes to the signal. + """ + if contributes_to_signal is None: + if init_conc == 0: + contributes = False + else: + contributes = True + else: + contributes = contributes_to_signal + + for measurement in well.measurements: + measurement.add_to_blank_states( + species_id=species_id, + contributes_to_signal=contributes, + ) + + +def measurement_is_blanked_for( + measurement: PhotometricMeasurement, target_id: str +) -> bool: + """Checks if a the measurement is blanked for a given species target.""" + + target_contributes = None + others_contribute = [] + + for state in measurement.blank_states: + if state.species_id == target_id: + target_contributes = state.contributes_to_signal + + else: + others_contribute.append(state.contributes_to_signal) + + if target_contributes is None: + raise ValueError(f"Species {target_id} not found in blank states") + + return target_contributes and not any(others_contribute) + + +def get_species_condition(well: Well, species_id: str) -> InitCondition: + for condition in well.init_conditions: + if condition.species_id == species_id: + return condition + + raise ValueError(f"Species {species_id} not found in well {well.id}") + + +def pubchem_request_molecule_name(pubchem_cid: int) -> str: + """Retrieves molecule name from PubChem database based on CID.""" + + url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/{pubchem_cid}/property/Title/JSON" + response = httpx.get(url) + + if response.status_code == 200: + res_dict = response.json() + try: + molecule_name = res_dict["PropertyTable"]["Properties"][0]["Title"] + return molecule_name + except (KeyError, IndexError): + raise ValueError( + "Unexpected response structure while retrieving molecule name from PubChem" + ) + else: + raise ValueError("Failed to retrieve molecule name from PubChem") + + +def get_well(plate: Plate, well_id: str) -> Well: + for well in plate.wells: + if well.id.lower() == well_id.lower(): + return well + + raise ValueError(f"Well {well_id} not found") diff --git a/mtphandler/units/__init__.py b/mtphandler/units/__init__.py new file mode 100644 index 0000000..2590388 --- /dev/null +++ b/mtphandler/units/__init__.py @@ -0,0 +1 @@ +from .predefined import * # noqa: F403 diff --git a/mtphandler/units/ontomaps.toml b/mtphandler/units/ontomaps.toml new file mode 100644 index 0000000..12dffaf --- /dev/null +++ b/mtphandler/units/ontomaps.toml @@ -0,0 +1,45 @@ +[substance] + +mol = "OBO:UO_0000013" +mmol = "OBO:UO_0000040" +umol = "OBO:UO_0000039" +nmol = "OBO:UO_0000041" + +[molarity] + +M = "OBO:UO_0000062" +mM = "OBO:UO_0000063" +uM = "OBO:UO_0000064" +nM = "OBO:UO_0000025" + +[time] + +s = "OBO:UO_0000010" +min = "OBO:UO_0000031" +hour = "OBO:UO_0000032" +day = "OBO:UO_0000033" + +[temperature] + +K = "OBO:UO_0000012" +C = "OBO:UO_0000027" + +[mass] + +kg = "OBO:UO_0000009" +g = "OBO:UO_0000021" +mg = "OBO:UO_0000022" +ug = "OBO:UO_0000023" +ng = "OBO:UO_0000024" + +[volume] + +litre = "OBO:UO_0000099" +ml = "OBO:UO_0000098" +ul = "OBO:UO_0000101" +nl = "OBO:UO_0000102" + +[length] + +metre = "OBO:UO_0000008" +nm = "OBO:UO_0000018" diff --git a/mtphandler/units/predefined.py b/mtphandler/units/predefined.py new file mode 100644 index 0000000..3da7786 --- /dev/null +++ b/mtphandler/units/predefined.py @@ -0,0 +1,166 @@ +from mtphandler.model import UnitType +from mtphandler.tools import read_static_file + +from .units import BaseUnit, Prefix, UnitDefinition + +BaseUnit.model_rebuild() +UnitDefinition.model_rebuild() + +ONTOMAPS = read_static_file("mtphandler.units", "ontomaps.toml") + + +class Unit: + @staticmethod + def mol(): + return BaseUnit(kind=UnitType.MOLE, exponent=1, scale=1) + + @staticmethod + def litre(): + return BaseUnit(kind=UnitType.LITRE, exponent=1, scale=1) + + @staticmethod + def second(): + return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1) + + @staticmethod + def minute(): + return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=60) + + @staticmethod + def hour(): + hour = 60 * 60 + return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=hour) + + @staticmethod + def day(): + day = 60**2 * 24 + return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=day) + + @staticmethod + def gram(): + return BaseUnit(kind=UnitType.GRAM, exponent=1, scale=1) + + @staticmethod + def kelvin(): + return BaseUnit(kind=UnitType.KELVIN, exponent=1, scale=1) + + @staticmethod + def celsius(): + return BaseUnit(kind=UnitType.CELSIUS, exponent=1, scale=1) + + @staticmethod + def dimensionless(): + return BaseUnit(kind=UnitType.DIMENSIONLESS, exponent=1, scale=1) + + @staticmethod + def metre(): + return BaseUnit(kind=UnitType.METRE, exponent=1, scale=1) + + +###### Single Prefixes ###### + +k = Prefix.k +m = Prefix.m +u = Prefix.u +n = Prefix.n + +##### Predefined units ##### + +# Dimensionless +dimensionless = UnitDefinition(base_units=[Unit.dimensionless()]) + +# Molarity +M = Unit.mol() / Unit.litre() +mM = m * Unit.mol() / Unit.litre() +uM = u * Unit.mol() / Unit.litre() +nM = n * Unit.mol() / Unit.litre() + +## Ontology +M.ld_id = ONTOMAPS["molarity"]["M"] +mM.ld_id = ONTOMAPS["molarity"]["mM"] +uM.ld_id = ONTOMAPS["molarity"]["uM"] +nM.ld_id = ONTOMAPS["molarity"]["nM"] + +# Substance +mol = UnitDefinition(base_units=[Unit.mol()])._get_name() +mmol = UnitDefinition(base_units=[m * Unit.mol()])._get_name() +umol = UnitDefinition(base_units=[u * Unit.mol()])._get_name() +nmol = UnitDefinition(base_units=[n * Unit.mol()])._get_name() + +## Ontology +mol.ld_id = ONTOMAPS["substance"]["mol"] +mmol.ld_id = ONTOMAPS["substance"]["mmol"] +umol.ld_id = ONTOMAPS["substance"]["umol"] +nmol.ld_id = ONTOMAPS["substance"]["nmol"] + +# Mass +gram = UnitDefinition(base_units=[Unit.gram()])._get_name() +g = UnitDefinition(base_units=[Unit.gram()])._get_name() +mg = UnitDefinition(base_units=[m * Unit.gram()])._get_name() +ug = UnitDefinition(base_units=[u * Unit.gram()])._get_name() +ng = UnitDefinition(base_units=[n * Unit.gram()])._get_name() +kg = UnitDefinition(base_units=[k * Unit.gram()])._get_name() + +## Ontology +g.ld_id = ONTOMAPS["mass"]["g"] +gram.ld_id = ONTOMAPS["mass"]["g"] +mg.ld_id = ONTOMAPS["mass"]["mg"] +ug.ld_id = ONTOMAPS["mass"]["ug"] +ng.ld_id = ONTOMAPS["mass"]["ng"] + +# Volume +litre = UnitDefinition(base_units=[Unit.litre()])._get_name() +l = UnitDefinition(base_units=[Unit.litre()])._get_name() # noqa: E741 +ml = UnitDefinition(base_units=[m * Unit.litre()])._get_name() +ul = UnitDefinition(base_units=[u * Unit.litre()])._get_name() +nl = UnitDefinition(base_units=[n * Unit.litre()])._get_name() + +## Ontology + +l.ld_id = ONTOMAPS["volume"]["litre"] +litre.ld_id = ONTOMAPS["volume"]["litre"] +ml.ld_id = ONTOMAPS["volume"]["ml"] +ul.ld_id = ONTOMAPS["volume"]["ul"] +nl.ld_id = ONTOMAPS["volume"]["nl"] + +# Time +second = UnitDefinition(base_units=[Unit.second()])._get_name() +s = UnitDefinition(base_units=[Unit.second()])._get_name() +second = UnitDefinition(base_units=[Unit.minute()])._get_name() +minute = UnitDefinition(base_units=[Unit.minute()])._get_name() +hour = UnitDefinition(base_units=[Unit.hour()])._get_name() +h = UnitDefinition(base_units=[Unit.hour()])._get_name() +day = UnitDefinition(base_units=[Unit.day()])._get_name() +d = UnitDefinition(base_units=[Unit.day()])._get_name() + +## Ontology +s.ld_id = ONTOMAPS["time"]["s"] +second.ld_id = ONTOMAPS["time"]["s"] +second.ld_id = ONTOMAPS["time"]["min"] +minute.ld_id = ONTOMAPS["time"]["min"] +hour.ld_id = ONTOMAPS["time"]["hour"] +h.ld_id = ONTOMAPS["time"]["hour"] +day.ld_id = ONTOMAPS["time"]["day"] +d.ld_id = ONTOMAPS["time"]["day"] + +# Temperature + +kelvin = UnitDefinition(base_units=[Unit.kelvin()])._get_name() +K = UnitDefinition(base_units=[Unit.kelvin()])._get_name() +celsius = UnitDefinition(base_units=[Unit.celsius()])._get_name() +C = UnitDefinition(base_units=[Unit.celsius()])._get_name() + +## Ontology + +K.ld_id = ONTOMAPS["temperature"]["K"] +kelvin.ld_id = ONTOMAPS["temperature"]["K"] + +# Length + +metre = UnitDefinition(base_units=[Unit.metre()])._get_name() +nm = UnitDefinition(base_units=[n * Unit.metre()])._get_name() + +## Ontology + +metre.ld_id = ONTOMAPS["length"]["metre"] +nm.ld_id = ONTOMAPS["length"]["nm"] diff --git a/mtphandler/units/units.py b/mtphandler/units/units.py new file mode 100644 index 0000000..f7f0541 --- /dev/null +++ b/mtphandler/units/units.py @@ -0,0 +1,374 @@ +from enum import Enum +from functools import partial + +from pydantic import model_validator + +from mtphandler.model import ( + BaseUnit as _BaseUnit, +) +from mtphandler.model import ( + UnitDefinition as _UnitDefinition, +) +from mtphandler.model import ( + UnitType, +) + +UNIT_OF_MEAS_TYPE = "OBO:UO_0000000" +NAME_MAPS = { + UnitType.LITRE.value: "l", + UnitType.MOLE.value: "mol", + UnitType.SECOND.value: "s", + UnitType.GRAM.value: "g", + UnitType.KELVIN.value: "K", +} + + +def _is_unit(other: object) -> bool: + """Check if the given object is an instance of 'unit'. + + Args: + other (object): The object to check. + + Returns: + bool: True if the object is an instance of 'unit', False otherwise. + """ + return other.__class__.__name__ == "unit" + + +def set_scale(unit: _BaseUnit, scale: int) -> _BaseUnit: + """Set the scale of a unit. + + Args: + unit (_BaseUnit): The unit to set the scale for. + scale (int): The scale value to set. + + Returns: + _BaseUnit: The unit with the updated scale. + """ + unit.scale = scale + return unit + + +class Prefix(Enum): + """Enumeration for unit prefixes with corresponding scales.""" + + k = partial(set_scale, scale=3) + m = partial(set_scale, scale=-3) + u = partial(set_scale, scale=-6) + n = partial(set_scale, scale=-9) + + def __mul__(self, other: _BaseUnit) -> _BaseUnit: + """Multiply prefix with a BaseUnit. + + When multiplying a prefix with a BaseUnit, the scale of the BaseUnit is updated. + + Args: + other (_BaseUnit): The other operand, which should be a BaseUnit. + + Returns: + _BaseUnit: The resulting unit with the prefix applied. + + Raises: + TypeError: If the other operand is not a BaseUnit. + """ + if isinstance(other, _BaseUnit): + return self.value(other) + + raise TypeError( + f"unsupported operand type(s) for *: 'Prefix' and '{type(other)}'" + ) + + +class UnitDefinition(_UnitDefinition): + """Extended UnitDefinition class with additional operations.""" + + @model_validator(mode="after") + def set_name_and_type(self): + """Initialize the UnitDefinition object.""" + self._get_name() + self.ld_type = [UNIT_OF_MEAS_TYPE] + return self + + def __rtruediv__(self, other: object) -> "UnitDefinition": + """Right division operation to handle unit division. + + If the other operand is a UnitDefinition, the base units are appended to the current unit. + If the other operand is a BaseUnit, the base unit is appended to the current unit. + + Args: + other (object): The numerator in the division. + + Returns: + UnitDefinition: The resulting unit after division. + + Raises: + TypeError: If the other operand type is unsupported. + """ + for base in self.base_units: + base.exponent = -abs(base.exponent) + + if isinstance(other, UnitDefinition): + self.base_units.extend(other.base_units) + elif isinstance(other, _BaseUnit): + self.base_units.append(other) + + self._get_name() + + return self + + def __truediv__(self, other: object) -> "UnitDefinition": + """Division operation to handle unit division. + + If the other operand is a UnitDefinition, the base units are appended to the current unit. + If the other operand is a BaseUnit, the base unit is appended to the current unit. + + Args: + other (object): The numerator in the + + Returns: + UnitDefinition: The resulting unit after division. + + Raises: + TypeError: If the other operand type is unsupported. + + """ + + if isinstance(other, UnitDefinition): + for base in other.base_units: + base.exponent = -abs(base.exponent) + self.base_units.extend(other.base_units) + elif isinstance(other, _BaseUnit): + other.exponent = -abs(other.exponent) + self.base_units.append(other) + + self._get_name() + + return self + + def __mul__(self, other: object) -> "UnitDefinition": + """Multiplication operation to handle unit multiplication. + + Args: + other (object): The multiplier in the multiplication. + + Returns: + UnitDefinition: The resulting unit after multiplication. + + Raises: + TypeError: If the other operand type is unsupported. + """ + if isinstance(other, (int, float)): + for base in self.base_units: + if base.multiplier: + base.multiplier *= other + else: + base.multiplier = other + + self._get_name() + + return self + + raise TypeError( + f"unsupported operand type(s) for *: 'UnitDefinition' and '{type(other)}'" + ) + + def _get_name(self): + """Get the name of the unit based on the base units.""" + self.name = str(self) + + return self + + def __str__(self) -> str: + """String representation of the UnitDefinition. + + Returns: + str: The string representation of the unit. + + Raises: + ValueError: If no base units are found. + """ + + numerator = [ + self._map_prefix(base.scale) + + self._map_name(base.kind) + + self._exponent(base.exponent) + for base in self.base_units + if base.exponent > 0 + ] + denominator = [ + self._map_prefix(base.scale) + + self._map_name(base.kind) + + self._exponent(base.exponent) + for base in self.base_units + if base.exponent < 0 + ] + + numerator_str = " ".join(numerator) if numerator else "" + denominator_str = " ".join(denominator) if denominator else "" + + if numerator_str and denominator_str: + return f"{numerator_str} / {denominator_str}" + elif numerator_str: + return numerator_str + elif denominator_str: + return f"1 / {denominator_str}" + + raise ValueError("No base units found") + + @staticmethod + def _map_prefix(scale: int | None) -> str: + """Map a scale to its corresponding prefix. + + Args: + scale (int): The scale value to map. + + Returns: + str: The corresponding prefix. + """ + + if scale is None: + return "" + + mapping = { + 3: "k", + -3: "m", + -6: "u", + -9: "n", + } + + return mapping.get(scale, "") + + @staticmethod + def _map_name(kind: str) -> str: + if isinstance(kind, str): # TODO: find issue of incorrect enum usage + return NAME_MAPS.get(kind, kind.capitalize()) + return NAME_MAPS.get(kind, kind.name.capitalize()) + + @staticmethod + def _exponent(exponent: int) -> str: + """Format the exponent for display. + + Args: + exponent (int): The exponent value to format. + + Returns: + str: The formatted exponent string. + """ + if abs(exponent) == 1: + return "" + + return f"^{abs(exponent)}" + + +class BaseUnit(_BaseUnit): + """Extended BaseUnit class with additional operations.""" + + def __rtruediv__(self, other: object) -> "UnitDefinition | BaseUnit": + """Right division operation to handle unit division. + + Args: + other (object): The numerator in the division. + + Returns: + UnitDefinition: The resulting unit after division. + + Raises: + TypeError: If the other operand type is unsupported. + """ + if isinstance(other, UnitDefinition): + self.exponent = -self.exponent + other.base_units.append(self) + + other._get_name() + + return other + elif isinstance(other, (int, float)): + self.exponent = -self.exponent + return self + + raise TypeError( + f"unsupported operand type(s) for /: 'BaseUnit' and '{type(other)}'" + ) + + def __truediv__(self, other: object) -> "UnitDefinition": + """Division operation to handle unit division. + + Args: + other (object): The denominator in the division. + + Returns: + UnitDefinition: The resulting unit after division. + + Raises: + TypeError: If the other operand type is unsupported. + """ + if isinstance(other, BaseUnit): + other.exponent = -other.exponent + return UnitDefinition(base_units=[self, other])._get_name() + elif isinstance(other, UnitDefinition): + for base_unit in other.base_units: + base_unit.exponent = -base_unit.exponent + other.base_units.append(self) + other._get_name() + + return other + + raise TypeError( + f"unsupported operand type(s) for /: 'BaseUnit' and '{type(other)}'" + ) + + def __pow__(self, other: int) -> "_BaseUnit": + """Exponentiation operation to handle unit exponentiation. + + Args: + other (int): The exponent value. + + Returns: + _BaseUnit: The resulting unit after exponentiation. + + Raises: + TypeError: If the exponent is not an integer. + """ + if isinstance(other, int): + self.exponent = other + return self + + raise TypeError( + f"unsupported operand type(s) for **: 'BaseUnit' and '{type(other)}'" + ) + + def __mul__(self, other: object) -> object: + """Multiplication operation to handle unit multiplication. + + Args: + other (object): The multiplier in the multiplication. + + Returns: + object: The resulting unit after multiplication. + + Raises: + TypeError: If the other operand type is unsupported. + """ + if isinstance(other, BaseUnit): + if self.exponent < 0 or other.exponent < 0: + self.exponent = abs(self.exponent) + other.exponent = abs(other.exponent) + + return UnitDefinition(base_units=[self, other])._get_name() + elif isinstance(other, UnitDefinition): + other.base_units.append(self) + other._get_name() + + return other + elif isinstance(other, Prefix): + return other * self + elif isinstance(other, (int, float)): + if self.multiplier: + self.multiplier *= other + else: + self.multiplier = other + return self + + raise TypeError( + f"unsupported operand type(s) for *: 'BaseUnit' and '{type(other)}'" + ) diff --git a/mtphandler/visualize.py b/mtphandler/visualize.py new file mode 100644 index 0000000..835a88d --- /dev/null +++ b/mtphandler/visualize.py @@ -0,0 +1,110 @@ +import itertools as it + +import plotly.express as px +import plotly.graph_objects as go +from plotly.subplots import make_subplots + +from mtphandler.model import Plate + + +def visualize_plate( + plate: Plate, + name: str, + zoom: bool = False, + wavelengths: list[float] = [], + static: bool = False, + darkmode: bool = False, +): + """Visualize a plate with all its wells and measurements.""" + + if darkmode: + theme = "plotly_dark" + plot_bgcolor = "#1e1e1e" # Dark background color for subplots + paper_bgcolor = "#1e1e1e" + gridcolor = plot_bgcolor # Grid color for dark mode + font_color = "#e5e5e5" # Lighter text for dark mode + else: + theme = "plotly_white" + plot_bgcolor = "white" # Light background for subplots + paper_bgcolor = "white" + gridcolor = plot_bgcolor # Light grid color for white mode + font_color = "#000000" + + if zoom: + shared_yaxes = False + else: + shared_yaxes = True + + if not wavelengths: + wavelengths = [plate.wells[0].measurements[0].wavelength] + + if not isinstance(wavelengths, list): + wavelengths = [wavelengths] + + fig = make_subplots( + rows=8, + cols=12, + shared_xaxes=True, + subplot_titles=_generate_possible_well_ids(), + shared_yaxes=shared_yaxes, + ) + colors = px.colors.qualitative.Plotly + + for well in plate.wells: + for measurement, color in zip(well.measurements, colors): + if measurement.wavelength not in wavelengths: + continue + + fig.add_trace( + go.Scatter( + x=measurement.time, + y=measurement.absorption, + name=f"{measurement.wavelength} nm", + mode="lines", + showlegend=False, + line=dict(color=color), + hovertemplate="%{y:.2f}
", + ), + col=well.x_pos + 1, + row=well.y_pos + 1, + ) + + # Update x and y axes for dark mode or light mode + fig.update_xaxes( + showticklabels=False, gridcolor=gridcolor, zeroline=False, showline=False + ) + fig.update_yaxes( + showticklabels=False, gridcolor=gridcolor, zeroline=False, showline=False + ) + + # Update subplot backgrounds and layout + fig.update_layout( + plot_bgcolor=plot_bgcolor, + paper_bgcolor=paper_bgcolor, + font=dict(color=font_color), + hovermode="x", + title=dict( + text=name, + font=dict(color=font_color), + ), + margin=dict(l=20, r=20, t=100, b=20), + template=theme, + ) + + if static: + fig.show("png") + + fig.show() + + +def _generate_possible_well_ids() -> list[str]: + characters = "ABCDEFGH" + integers = range(1, 13) # 1 to 12 + + sub_char = characters[:8] + sub_int = integers[:12] + + # Generate combinations of characters and integers + combinations = ["".join(item) for item in it.product(sub_char, map(str, sub_int))] + + return combinations