From be847a6846dafea1ae5cc3f5f34cdcea0022a489 Mon Sep 17 00:00:00 2001 From: haeussma <83341109+haeussma@users.noreply.github.com> Date: Sat, 21 Sep 2024 10:06:01 +0200 Subject: [PATCH] Refactor temp_handler package and remove unused modules --- temp_handler/__init__.py | 8 - temp_handler/ioutils/__init__.py | 1 - temp_handler/ioutils/calipytion.py | 146 -- temp_handler/ioutils/pyenzyme.py | 396 ------ temp_handler/model.py | 947 ------------- temp_handler/molecule.py | 166 --- temp_handler/mtp_logging.py | 27 - temp_handler/plate_manager.py | 1258 ----------------- temp_handler/readers/__init__.py | 7 - temp_handler/readers/biotek.py | 139 -- temp_handler/readers/multiskan_sky.py | 107 -- .../readers/multiskan_spectrum_parser.py | 198 --- temp_handler/readers/spectra_max_190.py | 267 ---- temp_handler/readers/spectramax_parser.py | 127 -- temp_handler/readers/tekan_magellan.py | 96 -- temp_handler/readers/tekan_spark.py | 82 -- temp_handler/readers/utils.py | 28 - temp_handler/tools.py | 158 --- temp_handler/units/__init__.py | 1 - temp_handler/units/ontomaps.toml | 45 - temp_handler/units/predefined.py | 166 --- temp_handler/units/units.py | 374 ----- temp_handler/visualize.py | 110 -- 23 files changed, 4854 deletions(-) delete mode 100644 temp_handler/__init__.py delete mode 100644 temp_handler/ioutils/__init__.py delete mode 100644 temp_handler/ioutils/calipytion.py delete mode 100644 temp_handler/ioutils/pyenzyme.py delete mode 100644 temp_handler/model.py delete mode 100644 temp_handler/molecule.py delete mode 100644 temp_handler/mtp_logging.py delete mode 100644 temp_handler/plate_manager.py delete mode 100644 temp_handler/readers/__init__.py delete mode 100644 temp_handler/readers/biotek.py delete mode 100644 temp_handler/readers/multiskan_sky.py delete mode 100644 temp_handler/readers/multiskan_spectrum_parser.py delete mode 100644 temp_handler/readers/spectra_max_190.py delete mode 100644 temp_handler/readers/spectramax_parser.py delete mode 100644 temp_handler/readers/tekan_magellan.py delete mode 100644 temp_handler/readers/tekan_spark.py delete mode 100644 temp_handler/readers/utils.py delete mode 100644 temp_handler/tools.py delete mode 100644 temp_handler/units/__init__.py delete mode 100644 temp_handler/units/ontomaps.toml delete mode 100644 temp_handler/units/predefined.py delete mode 100644 temp_handler/units/units.py delete mode 100644 temp_handler/visualize.py diff --git a/temp_handler/__init__.py b/temp_handler/__init__.py deleted file mode 100644 index 1f468a1..0000000 --- a/temp_handler/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -import json # noqa -import os # noqa - -from .mtp_logging import configure_logger -from .plate_manager import PlateManager # noqa - - -configure_logger() diff --git a/temp_handler/ioutils/__init__.py b/temp_handler/ioutils/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/temp_handler/ioutils/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/temp_handler/ioutils/calipytion.py b/temp_handler/ioutils/calipytion.py deleted file mode 100644 index 673a899..0000000 --- a/temp_handler/ioutils/calipytion.py +++ /dev/null @@ -1,146 +0,0 @@ -from __future__ import annotations - -import numpy as np -from calipytion import Calibrator -from calipytion.model import Sample, Standard, UnitDefinition - -from mtphandler.model import Plate, Well -from mtphandler.molecule import Molecule -from mtphandler.tools import ( - get_measurement, - get_species_condition, - measurement_is_blanked_for, - well_contains_species, -) - - -def _get_standard_wells( - plate: Plate, - protein_ids: list[str], - molecule: Molecule, - wavelength: float, -) -> list[Well]: - # Subset of wells, that contain specified species, do not contain a protein, and are blanked - - # get wells with only one component, that does not contribute to the signal - buffer_blank_wells = [] - standard_wells = [] - for well in plate.wells: - measurement = get_measurement(well, wavelength) - - # get all wells with one init condition that has a concentration grater than 0 - int_concs_creater_than_zero = [ - condition for condition in well.init_conditions if condition.init_conc > 0 - ] - - if len(int_concs_creater_than_zero) == 1: - buffer_blank_wells.append(well) - print("found buffer blank well", well.id) - - if not well_contains_species(well, molecule.id, conc_above_zero=True): - continue - - if any( - [ - well_contains_species(well, catalyst_id, conc_above_zero=True) - for catalyst_id in protein_ids - ] - ): - continue - - if measurement_is_blanked_for(measurement, molecule.id): - standard_wells.append(well) - - # Add wells with zero concentration to standard wells - if all( - [ - blank_state.contributes_to_signal is False - for blank_state in measurement.blank_states - ] - ): - standard_wells.append(well) - - print("found standard wells", len(standard_wells)) - - return standard_wells + buffer_blank_wells - - -def map_to_standard( - plate: Plate, - molecule: Molecule, - protein_ids: list[str], - wavelength: float, -) -> Standard: - standard_wells = _get_standard_wells( - plate=plate, - protein_ids=protein_ids, - molecule=molecule, - wavelength=wavelength, - ) - - # Map wells to samples of a standard - samples = [] - phs = [] - for well in standard_wells: - condition = get_species_condition(well, molecule.id) - measurement = get_measurement(well, wavelength) - - samples.append( - Sample( - # id=well.id, - concentration=condition.init_conc, - conc_unit=UnitDefinition(**condition.conc_unit.model_dump()), - signal=float(np.nanmean(measurement.absorption)), - ) - ) - phs.append(well.ph) - - # Check if all samples have the same pH - if not all([ph == phs[0] for ph in phs]): - raise ValueError( - f"Samples of standard {molecule.name} have different pH values: {phs}" - ) - ph = phs[0] - - temp_unit = UnitDefinition(**plate.temperature_unit.model_dump()) - - # Create standard - return Standard( - molecule_id=molecule.id, - molecule_symbol=molecule.id, - pubchem_cid=molecule.pubchem_cid, - molecule_name=molecule.name, - wavelength=wavelength, - samples=samples, - ph=ph, - temperature=plate.temperatures[0], - temp_unit=temp_unit, - ) - - -def initialize_calibrator( - plate: Plate, - wavelength: float, - molecule: Molecule, - protein_ids: list[str], - cutoff: float | None = None, -) -> Calibrator: - """ - Initialize a calibrator for a given species. - - Args: - plate (Plate): Plate with the wells. - wavelength (float): Wavelength of the measurements. - molecule (Molecule): Molecule to calibrate. - protein_ids (list[str]): IDs of the proteins that catalyze the reaction. - cutoff (float | None): Cutoff for the calibration. Calibration samples with - a signal above the cutoff are ignored. - """ - standard = map_to_standard( - plate=plate, - protein_ids=protein_ids, - molecule=molecule, - wavelength=wavelength, - ) - - return Calibrator.from_standard(standard, cutoff=cutoff) diff --git a/temp_handler/ioutils/pyenzyme.py b/temp_handler/ioutils/pyenzyme.py deleted file mode 100644 index 0e47f4e..0000000 --- a/temp_handler/ioutils/pyenzyme.py +++ /dev/null @@ -1,396 +0,0 @@ -import numpy as np -import pyenzyme as pe -from calipytion.tools.calibrator import Calibrator -from loguru import logger -from pyenzyme.model import DataTypes -from pyenzyme.model import UnitDefinition as EnzML_UnitDef - -from mtphandler.model import InitCondition, PhotometricMeasurement, Plate, Well -from mtphandler.molecule import Molecule, Protein - - -class Plate_to_EnzymeMLDocument: - """Converts a Plate object along with associated molecules and proteins to an EnzymeMLDocument. - If `to_concentration=True`, the absorption data is converted to concentration data using the calibrator. - - - Raises: - ValueError: If the pH of a well is not defined. - ValueError: If no measurements were added to EnzymeML. - - Returns: - EnzymeMLDocument: The EnzymeMLDocument containing the converted data. - """ - - def __init__( - self, - name: str, - plate: Plate, - well_ids: list[str] | None, - molecules: list[Molecule], - detected_molecule: Molecule, - proteins: list[Protein], - wavelength: float | None, - wells_with_protein_only: bool, - to_concentration: bool, - extrapolate: bool, - silent: bool, - ) -> None: - self.name = name - self.plate = plate - self.well_ids = well_ids - self.molecules = molecules - self.detected_molecule = detected_molecule - self.proteins = proteins - self.wavelength = wavelength - self.calibrator_dict: dict[str, Calibrator] = {} - self.wells_with_protein_only = wells_with_protein_only - self.to_concentration = to_concentration - self.extrapolate = extrapolate - self.silent = silent - - # Initialize calibrators if concentration data is requested - if self.to_concentration: - self._init_calibrators() - - # Check if a wavelength was specified, otherwise set it to the only wavelength measured - self._handle_wavelength() - - def convert(self): - """ - Converts proteins, small molecules, and measurements to an EnzymeMLDocument. - - Returns: - EnzymeMLDocument: The EnzymeMLDocument containing the converted data. - """ - enzml_doc = pe.EnzymeMLDocument(name=self.name) - logger.debug(f"Initialized EnzymeMLDocument with name {self.name}") - - # Add proteins to EnzymeML document - enzml_doc.proteins = [self.map_protein(protein) for protein in self.proteins] - logger.debug(f"Added {len(self.proteins)} proteins to EnzymeMLDocument") - - # Add small molecules to EnzymeML document - enzml_doc.small_molecules = [ - self.map_small_molecule(molecule) for molecule in self.molecules - ] - logger.debug(f"Added {len(self.molecules)} small molecules to EnzymeMLDocument") - - # Add measurements to EnzymeML document - enzml_doc.measurements = self.wells_to_enzml_measurements() - - return enzml_doc - - def get_well_subset(self) -> list[Well]: - """ - Returns a subset of wells from the plate based on the well ids. - - Returns: - list[Well]: List of Well objects. - """ - if self.well_ids is None: - return self.plate.wells - - if isinstance(self.well_ids, str): - self.well_ids = [self.well_ids] - - self.well_ids = [well_id.upper() for well_id in self.well_ids] - subset = [well for well in self.plate.wells if well.id.upper() in self.well_ids] - - if len(subset) == 0: - raise ValueError("No wells found with the specified well ids.") - - return subset - - def wells_to_enzml_measurements(self) -> list[pe.Measurement]: - """Converts wells to EnzymeML measurements. - - Raises: - ValueError: If the pH of a well is not defined. - ValueError: If no measurements were added to EnzymeML. - - Returns: - list[pe.Measurement]: List of EnzymeML `Measurement` objects. - """ - meas_counter = 0 - measurements = [] - - for well in self.get_well_subset(): - photo_measurement = next( - ( - meas - for meas in well.measurements - if meas.wavelength == self.wavelength - ), - None, - ) - - # Skip wells without a measurement at the specified wavelength - if not photo_measurement: - continue - - # Skip wells without a protein if the flag is set - if self.wells_with_protein_only: - if not self.is_catalyzed( - well, photo_measurement, {p.id for p in self.proteins} - ): - continue - - # Ensure that the pH of the well is defined - if well.ph is None: - raise ValueError(f"pH of well {well.id} is not defined.") - - # Create EnzymeML measurement - enzml_meas = pe.Measurement( - id=well.id, - name="photometric measurement", - ph=well.ph, - temperature=self.temperature, - temperature_unit=EnzML_UnitDef( - **self.plate.temperature_unit.model_dump() - ), - ) - - logger.debug( - f"Contributing species in well {well.id}: {[(state.species_id ,state.contributes_to_signal) for state in photo_measurement.blank_states]}" - ) - - # Check if only one species contributes to the signal - measured_species = self.get_only_contributing_species( - photo_measurement, well.id, self.detected_molecule.id - ) - if not measured_species: - continue - - logger.debug( - f"Adding measurement from well {well.id} with species {measured_species}" - ) - - # Add species data to the measurement based on the initial conditions of the well - self.add_to_species_data(enzml_meas, well.init_conditions) - - # Add absorption data to the species data of the measurement - self.add_absorption_data( - measurement=enzml_meas, - photo_measurement=photo_measurement, - species_id=measured_species, - ) - - measurements.append(enzml_meas) - - meas_counter += 1 - - if meas_counter == 0: - raise ValueError("No measurements were added to EnzymeML.") - - if not self.silent: - mode = "concentration" if self.to_concentration else "absorbance" - print( - f"✅ Added measurements from {meas_counter} wells with {mode} values to EnzymeMLDocument" - ) - - return measurements - - def add_absorption_data( - self, - measurement: pe.Measurement, - photo_measurement: PhotometricMeasurement, - species_id: str, - ) -> None: - """Adds absorption data to the species data of the measurement. - Based in the `to_concentration` flag, the absorption data is converted to concentration data using the calibrator. - - Args: - measurement (pe.Measurement): EnzymeML `Measurement` object. - photo_measurement (PhotometricMeasurement): PhotometricMeasurement object. - species_id (str): Species ID. - - Raises: - ValueError: If the calibrator for the species is not defined. - """ - - species_data = next( - ( - data - for data in measurement.species_data - if data.species_id == species_id - ), - None, - ) - - assert ( - species_data is not None - ), f"Species {species_id} not found in measurement {measurement.id}." - - if self.to_concentration: - data_type = pe.DataTypes.CONCENTRATION - if species_id not in self.calibrator_dict: - raise ValueError( - f"Calibrator for species {species_id} is not defined. Set `to_concentration=False`, or define a standard for species {species_id}." - ) - - data = self.calibrator_dict[species_id].calculate_concentrations( - model=self.calibrator_dict[species_id].models[0], - signals=photo_measurement.absorption, - extrapolate=self.extrapolate, - ) - else: - data_type = pe.DataTypes.ABSORBANCE - data = photo_measurement.absorption - - species_data.data_type = data_type - species_data.data = data - species_data.time = photo_measurement.time - - if species_data.prepared is None: - species_data.prepared = species_data.initial - - species_data.initial = species_data.data[0] - - @property - def temperature(self) -> float: - return np.mean(self.plate.temperatures).tolist() - - @staticmethod - def get_only_contributing_species( - photo_measurement: PhotometricMeasurement, - well_id: str, - detected_molecule_id: str, - ) -> str | None: - # check that only one species contributes to the signal - contributing_species = set() - for state in photo_measurement.blank_states: - if state.species_id == detected_molecule_id: - contributing_species.add(state.species_id) - if state.contributes_to_signal: - contributing_species.add(state.species_id) - - if len(contributing_species) > 1: - raise ValueError( - f""" - Multiple species ({contributing_species}) contribute to the signal in well {well_id}. Only one species is allowed." - Either the plate was not blanked, or control measurements for determining the blank are missing. - Species can manually be specified not to contribute to the signal by setting the `contributes_to_signal=False` during - the assignment of well conditions. - """ - ) - - if len(contributing_species) == 0: - return None - - return contributing_species.pop() - - @staticmethod - def is_catalyzed( - well: Well, - photo_measurement: PhotometricMeasurement, - protein_ids: set[str], - ) -> bool: - """ - Checks if a well contains a catalyst and another species. - - Args: - well (Well): `Well` object - protein_ids (list[str]): List of protein ids - - Returns: - bool: True if the well contains a catalyst and another species, False otherwise - """ - - contains_protein = False - for condition in well.init_conditions: - if condition.species_id in protein_ids and condition.init_conc > 0: - contains_protein = True - - protein_contributes = False - for state in photo_measurement.blank_states: - if state.species_id in protein_ids: - if state.contributes_to_signal: - protein_contributes = True - - if contains_protein and not protein_contributes: - logger.debug(f"Well {well.id} contains a catalyst.") - return True - - return False - - @staticmethod - def add_to_species_data( - measurement: pe.Measurement, init_conditions: list[InitCondition] - ): - for condition in init_conditions: - measurement.add_to_species_data( - species_id=condition.species_id, - initial=condition.init_conc, - prepared=condition.init_conc, - data_unit=EnzML_UnitDef(**condition.conc_unit.model_dump()), - data_type=DataTypes.CONCENTRATION, - ) - - @staticmethod - def map_protein(protein: Protein) -> pe.Protein: - if protein.ld_id_url: - return pe.Protein( - id=protein.id, - ld_id=protein.ld_id_url, - name=protein.name, - constant=protein.constant, - sequence=protein.sequence, - ) - else: - return pe.Protein( - id=protein.id, - name=protein.name, - constant=protein.constant, - sequence=protein.sequence, - ) - - @staticmethod - def map_small_molecule(molecule: Molecule) -> pe.SmallMolecule: - if molecule.ld_id_url: - return pe.SmallMolecule( - id=molecule.id, - ld_id=molecule.ld_id_url, - name=molecule.name, - constant=molecule.constant, - ) - else: - return pe.SmallMolecule( - id=molecule.id, name=molecule.name, constant=molecule.constant - ) - - def _handle_wavelength(self): - """ - Checks if a wavelength was specified and if not, sets it to the only wavelength measured. - If multiple wavelengths were measured, an error is raised. - """ - if isinstance(self.wavelength, float): - return - - # check that all measurements in the wells have only one wavelength - wavelengths = set() - for well in self.plate.wells: - for meas in well.measurements: - wavelengths.add(meas.wavelength) - - if len(wavelengths) > 1: - raise ValueError("Multiple wavelengths were measured. Please specify one.") - - self.wavelength = wavelengths.pop() - - def _init_calibrators(self): - """Initializes calibrators for all molecules with a standard.""" - - for molecule in self.molecules: - assert ( - molecule.id not in self.calibrator_dict - ), f"Calibrator for molecule {molecule.id} already exists in calibrator_dict." - - if not molecule.standard: - continue - - calibrator = Calibrator.from_standard(molecule.standard) - - self.calibrator_dict[molecule.id] = calibrator - - logger.debug(f"Initialized calibrator for molecule {molecule.id}") diff --git a/temp_handler/model.py b/temp_handler/model.py deleted file mode 100644 index b5e2f4c..0000000 --- a/temp_handler/model.py +++ /dev/null @@ -1,947 +0,0 @@ -## This is a generated file. Do not modify it manually! - -from __future__ import annotations - -from enum import Enum -from typing import Generic, Optional, TypeVar -from uuid import uuid4 - -from pydantic import BaseModel, ConfigDict, Field - -# Filter Wrapper definition used to filter a list of objects -# based on their attributes -Cls = TypeVar("Cls") - - -class FilterWrapper(Generic[Cls]): - """Wrapper class to filter a list of objects based on their attributes""" - - def __init__(self, collection: list[Cls], **kwargs): - self.collection = collection - self.kwargs = kwargs - - def filter(self) -> list[Cls]: - for key, value in self.kwargs.items(): - self.collection = [ - item for item in self.collection if self._fetch_attr(key, item) == value - ] - return self.collection - - def _fetch_attr(self, name: str, item: Cls): - try: - return getattr(item, name) - except AttributeError: - raise AttributeError(f"{item} does not have attribute {name}") - - -# JSON-LD Helper Functions -def add_namespace(obj, prefix: str | None, iri: str | None): - """Adds a namespace to the JSON-LD context - - Args: - prefix (str): The prefix to add - iri (str): The IRI to add - """ - if prefix is None and iri is None: - return - elif prefix and iri is None: - raise ValueError("If prefix is provided, iri must also be provided") - elif iri and prefix is None: - raise ValueError("If iri is provided, prefix must also be provided") - - obj.ld_context[prefix] = iri # type: ignore - - -def validate_prefix(term: str | dict, prefix: str): - """Validates that a term is prefixed with a given prefix - - Args: - term (str): The term to validate - prefix (str): The prefix to validate against - - Returns: - bool: True if the term is prefixed with the prefix, False otherwise - """ - - if isinstance(term, dict) and not term["@id"].startswith(prefix + ":"): - raise ValueError(f"Term {term} is not prefixed with {prefix}") - elif isinstance(term, str) and not term.startswith(prefix + ":"): - raise ValueError(f"Term {term} is not prefixed with {prefix}") - - -# Model Definitions - - -class Plate(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - ) # type: ignore - - temperature_unit: UnitDefinition - id: Optional[str] = Field(default=None) - name: Optional[str] = Field(default=None) - wells: list[Well] = Field(default_factory=list) - date_measured: Optional[str] = Field(default=None) - temperatures: list[float] = Field(default_factory=list) - times: list[float] = Field(default_factory=list) - time_unit: Optional[UnitDefinition] = Field(default=None) - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", default_factory=lambda: "md:Plate/" + str(uuid4()) - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:Plate", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def filter_wells(self, **kwargs) -> list[Well]: - """Filters the wells attribute based on the given kwargs - - Args: - **kwargs: The attributes to filter by. - - Returns: - list[Well]: The filtered list of Well objects - """ - - return FilterWrapper[Well](self.wells, **kwargs).filter() - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - def add_to_wells( - self, - id: str, - x_pos: int, - y_pos: int, - ph: Optional[float] = None, - init_conditions: list[InitCondition] = [], - measurements: list[PhotometricMeasurement] = [], - volume: Optional[float] = None, - volume_unit: Optional[UnitDefinition] = None, - **kwargs, - ): - params = { - "id": id, - "x_pos": x_pos, - "y_pos": y_pos, - "ph": ph, - "init_conditions": init_conditions, - "measurements": measurements, - "volume": volume, - "volume_unit": volume_unit, - } - - if "id" in kwargs: - params["id"] = kwargs["id"] - - self.wells.append(Well(**params)) - - return self.wells[-1] - - -class Well(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - ) # type: ignore - - id: str - x_pos: int - y_pos: int - ph: Optional[float] = Field(default=None) - init_conditions: list[InitCondition] = Field(default_factory=list) - measurements: list[PhotometricMeasurement] = Field(default_factory=list) - volume: Optional[float] = Field(default=None) - volume_unit: Optional[UnitDefinition] = Field(default=None) - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", default_factory=lambda: "md:Well/" + str(uuid4()) - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:Well", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def filter_init_conditions(self, **kwargs) -> list[InitCondition]: - """Filters the init_conditions attribute based on the given kwargs - - Args: - **kwargs: The attributes to filter by. - - Returns: - list[InitCondition]: The filtered list of InitCondition objects - """ - - return FilterWrapper[InitCondition](self.init_conditions, **kwargs).filter() - - def filter_measurements(self, **kwargs) -> list[PhotometricMeasurement]: - """Filters the measurements attribute based on the given kwargs - - Args: - **kwargs: The attributes to filter by. - - Returns: - list[PhotometricMeasurement]: The filtered list of PhotometricMeasurement objects - """ - - return FilterWrapper[PhotometricMeasurement]( - self.measurements, **kwargs - ).filter() - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - def add_to_init_conditions( - self, - species_id: str, - init_conc: float, - conc_unit: UnitDefinition, - **kwargs, - ): - params = { - "species_id": species_id, - "init_conc": init_conc, - "conc_unit": conc_unit, - } - - if "id" in kwargs: - params["id"] = kwargs["id"] - - self.init_conditions.append(InitCondition(**params)) - - return self.init_conditions[-1] - - def add_to_measurements( - self, - wavelength: float, - time_unit: UnitDefinition, - absorption: list[float] = [], - time: list[float] = [], - blank_states: list[BlankState] = [], - **kwargs, - ): - params = { - "wavelength": wavelength, - "time_unit": time_unit, - "absorption": absorption, - "time": time, - "blank_states": blank_states, - } - - if "id" in kwargs: - params["id"] = kwargs["id"] - - self.measurements.append(PhotometricMeasurement(**params)) - - return self.measurements[-1] - - -class PhotometricMeasurement(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - ) # type: ignore - - wavelength: float - time_unit: UnitDefinition - absorption: list[float] = Field(default_factory=list) - time: list[float] = Field(default_factory=list) - blank_states: list[BlankState] = Field(default_factory=list) - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", - default_factory=lambda: "md:PhotometricMeasurement/" + str(uuid4()), - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:PhotometricMeasurement", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def filter_blank_states(self, **kwargs) -> list[BlankState]: - """Filters the blank_states attribute based on the given kwargs - - Args: - **kwargs: The attributes to filter by. - - Returns: - list[BlankState]: The filtered list of BlankState objects - """ - - return FilterWrapper[BlankState](self.blank_states, **kwargs).filter() - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - def add_to_blank_states( - self, - species_id: str, - contributes_to_signal: bool = True, - **kwargs, - ): - params = { - "species_id": species_id, - "contributes_to_signal": contributes_to_signal, - } - - if "id" in kwargs: - params["id"] = kwargs["id"] - - self.blank_states.append(BlankState(**params)) - - return self.blank_states[-1] - - -class InitCondition(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - ) # type: ignore - - species_id: str - init_conc: float - conc_unit: UnitDefinition - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", - default_factory=lambda: "md:InitCondition/" + str(uuid4()), - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:InitCondition", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - -class BlankState(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - ) # type: ignore - - species_id: str - contributes_to_signal: bool = True - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", - default_factory=lambda: "md:BlankState/" + str(uuid4()), - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:BlankState", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - -class UnitDefinition(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - use_enum_values=True, - ) # type: ignore - - id: Optional[str] = Field(default=None) - name: Optional[str] = Field(default=None) - base_units: list[BaseUnit] = Field(default_factory=list) - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", - default_factory=lambda: "md:UnitDefinition/" + str(uuid4()), - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:UnitDefinition", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def filter_base_units(self, **kwargs) -> list[BaseUnit]: - """Filters the base_units attribute based on the given kwargs - - Args: - **kwargs: The attributes to filter by. - - Returns: - list[BaseUnit]: The filtered list of BaseUnit objects - """ - - return FilterWrapper[BaseUnit](self.base_units, **kwargs).filter() - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - def add_to_base_units( - self, - kind: UnitType, - exponent: int, - multiplier: Optional[float] = None, - scale: Optional[float] = None, - **kwargs, - ): - params = { - "kind": kind, - "exponent": exponent, - "multiplier": multiplier, - "scale": scale, - } - - if "id" in kwargs: - params["id"] = kwargs["id"] - - self.base_units.append(BaseUnit(**params)) - - return self.base_units[-1] - - -class BaseUnit(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - use_enum_values=True, - ) # type: ignore - - kind: UnitType - exponent: int - multiplier: Optional[float] = Field(default=None) - scale: Optional[float] = Field(default=None) - - # JSON-LD fields - ld_id: str = Field( - serialization_alias="@id", default_factory=lambda: "md:BaseUnit/" + str(uuid4()) - ) - ld_type: list[str] = Field( - serialization_alias="@type", - default_factory=lambda: [ - "md:BaseUnit", - ], - ) - ld_context: dict[str, str | dict] = Field( - serialization_alias="@context", - default_factory=lambda: { - "md": "https://github.com/FAIRChemistry/MTPHandler", - }, - ) - - def set_attr_term( - self, - attr: str, - term: str | dict, - prefix: str | None = None, - iri: str | None = None, - ): - """Sets the term for a given attribute in the JSON-LD object - - Example: - # Using an IRI term - >> obj.set_attr_term("name", "http://schema.org/givenName") - - # Using a prefix and term - >> obj.set_attr_term("name", "schema:givenName", "schema", "http://schema.org") - - # Usinng a dictionary term - >> obj.set_attr_term("name", {"@id": "http://schema.org/givenName", "@type": "@id"}) - - Args: - attr (str): The attribute to set the term for - term (str | dict): The term to set for the attribute - - Raises: - AssertionError: If the attribute is not found in the model - """ - - assert ( - attr in self.model_fields - ), f"Attribute {attr} not found in {self.__class__.__name__}" - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_context[attr] = term - - def add_type_term( - self, term: str, prefix: str | None = None, iri: str | None = None - ): - """Adds a term to the @type field of the JSON-LD object - - Example: - # Using a term - >> obj.add_type_term("https://schema.org/Person") - - # Using a prefixed term - >> obj.add_type_term("schema:Person", "schema", "https://schema.org/Person") - - Args: - term (str): The term to add to the @type field - prefix (str, optional): The prefix to use for the term. Defaults to None. - iri (str, optional): The IRI to use for the term prefix. Defaults to None. - - Raises: - ValueError: If prefix is provided but iri is not - ValueError: If iri is provided but prefix is not - """ - - if prefix: - validate_prefix(term, prefix) - - add_namespace(self, prefix, iri) - self.ld_type.append(term) - - -class UnitType(Enum): - AMPERE = "ampere" - AVOGADRO = "avogadro" - BECQUEREL = "becquerel" - CANDELA = "candela" - CELSIUS = "celsius" - COULOMB = "coulomb" - DIMENSIONLESS = "dimensionless" - FARAD = "farad" - GRAM = "gram" - GRAY = "gray" - HENRY = "henry" - HERTZ = "hertz" - ITEM = "item" - JOULE = "joule" - KATAL = "katal" - KELVIN = "kelvin" - KILOGRAM = "kilogram" - LITRE = "litre" - LUMEN = "lumen" - LUX = "lux" - METRE = "metre" - MOLE = "mole" - NEWTON = "newton" - OHM = "ohm" - PASCAL = "pascal" - RADIAN = "radian" - SECOND = "second" - SIEMENS = "siemens" - SIEVERT = "sievert" - STERADIAN = "steradian" - TESLA = "tesla" - VOLT = "volt" - WATT = "watt" - WEBER = "weber" diff --git a/temp_handler/molecule.py b/temp_handler/molecule.py deleted file mode 100644 index e14026a..0000000 --- a/temp_handler/molecule.py +++ /dev/null @@ -1,166 +0,0 @@ -import re - -from calipytion.model import Standard -from calipytion.model import UnitDefinition as CalUnit -from calipytion.tools.calibrator import Calibrator -from calipytion.units import C -from pydantic import BaseModel, ConfigDict, Field - -from mtphandler.model import UnitDefinition - - -class Molecule(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - use_enum_values=True, - ) # type: ignore - - id: str = Field( - description="ID of the molecule", - ) - pubchem_cid: int = Field( - description="PubChem CID of the molecule", - ) - name: str = Field( - description="Name of the molecule", - ) - standard: Standard | None = Field( - description="Standard instance associated with the molecule", default=None - ) - constant: bool = Field( - description="Boolean indicating whether the molecule concentration is constant throughout the experiment", - default=False, - ) - - # @model_validator(mode="before") - # @classmethod - # def get_molecule_name(cls, data: Any) -> Any: - # """Retrieves the molecule name from the PubChem database based on the PubChem CID.""" - - # if "name" not in data: - # data["molecule_name"] = pubchem_request_molecule_name(data["pubchem_cid"]) - # return data - - # # validator that if a standard is provided, the retention time must be defined and vice versa - - # @model_validator(mode="before") - # @classmethod - # def validate_standard_and_retention_time(cls, data: Any) -> Any: - # if data.get("standard") and data.get("retention_time"): - # assert data["standard"].retention_time == data["retention_time"], """ - # The retention time of the standard and the molecule must be the same. - # """ - - @property - def ld_id_url(self) -> str | None: - """Returns the URL of the PubChem page of the molecule based on the PubChem CID - - Returns: - str | None: URL of the PubChem page of the molecule if the PubChem CID is defined, None otherwise. - """ - - if self.pubchem_cid == -1: - return None - - return f"https://pubchem.ncbi.nlm.nih.gov/compound/{self.pubchem_cid}" - - @classmethod - def from_standard( - cls, standard: Standard, init_conc: float, conc_unit: UnitDefinition - ): - """Creates a Molecule instance from a Standard instance.""" - - assert standard.retention_time, """ - The retention time of the standard needs to be defined. - Specify the `retention_time` attribute of the standard. - """ - - return cls( - id=standard.molecule_id, - pubchem_cid=standard.pubchem_cid, - name=standard.molecule_name, - standard=standard, - ) - - def create_standard( - self, - areas: list[float], - concs: list[float], - conc_unit: UnitDefinition, - ph: float, - temperature: float, - temp_unit: CalUnit = C, - visualize: bool = True, - ) -> Standard: - """Creates a linear standard from the molecule's calibration data.""" - - calibrator = Calibrator( - molecule_id=self.id, - pubchem_cid=self.pubchem_cid, - molecule_name=self.name, - concentrations=concs, - conc_unit=CalUnit(**conc_unit.model_dump()), - signals=areas, - ) - calibrator.models = [] - model = calibrator.add_model( - name="linear", - signal_law=f"{self.id} * a", - ) - - calibrator.fit_models() - model.calibration_range.conc_lower = 0.0 - model.calibration_range.signal_lower = 0.0 - - if visualize: - calibrator.visualize() - - standard = calibrator.create_standard( - model=model, - ph=ph, - temperature=temperature, - temp_unit=CalUnit(**temp_unit.model_dump()), - ) - - self.standard = standard - - return standard - - -class Protein(BaseModel): - model_config: ConfigDict = ConfigDict( # type: ignore - validate_assigment=True, - use_enum_values=True, - ) # type: ignore - - id: str = Field( - description="ID of the Protein", - ) - name: str = Field( - description="Name of the protein", - ) - sequence: str | None = Field( - description="Amino acid sequence of the protein", - default=None, - ) - constant: bool = Field( - description="Boolean indicating whether the protein concentration is constant", - default=True, - ) - - @property - def ld_id_url(self) -> str | None: - """Returns the URL of the UniProt page of the protein based on the protein ID - - Returns: - str | None: URL of the UniProt page of the protein if the protein ID is defined, None otherwise. - """ - - uniprot_pattern = ( - r"[OPQ][0-9][A-Z0-9]{3}[0-9]|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}" - ) - - if re.fullmatch(uniprot_pattern, self.id) is None: - return None - else: - return f"https://www.uniprot.org/uniprotkb/{self.id}/entry" diff --git a/temp_handler/mtp_logging.py b/temp_handler/mtp_logging.py deleted file mode 100644 index 3e81585..0000000 --- a/temp_handler/mtp_logging.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys - -from loguru import logger - -# Flag to check if the logger has already been configured -mtphandler_logger_configured = False - - -def configure_logger( - log_level_std: str = "INFO", - log_level_file: str = "DEBUG", - log_file: str = "mtp_handler.log", - log_file_rotation_MB: int = 1, -): - """Configures the logger severity level for the package.""" - global mtphandler_logger_configured - - if not mtphandler_logger_configured: - logger.remove() - - logger.add(sys.stdout, level=log_level_std) - logger.add( - log_file, - level=log_level_file, - rotation=f"{log_file_rotation_MB} MB", - ) - mtphandler_logger_configured = True diff --git a/temp_handler/plate_manager.py b/temp_handler/plate_manager.py deleted file mode 100644 index 64c954f..0000000 --- a/temp_handler/plate_manager.py +++ /dev/null @@ -1,1258 +0,0 @@ -from __future__ import annotations - -from collections import defaultdict -from typing import Any, Literal, Optional, Tuple, get_args - -import numpy as np -import pandas as pd -from loguru import logger -from pydantic import BaseModel, Field, model_validator -from pyenzyme import EnzymeMLDocument -from rich import print - -from mtphandler.model import ( - BlankState, - PhotometricMeasurement, - Plate, - UnitDefinition, - Well, -) -from mtphandler.molecule import Molecule, Protein -from mtphandler.tools import ( - get_measurement, - get_species_condition, - handle_blank_status, - measurement_is_blanked_for, - pubchem_request_molecule_name, - well_contains_species, -) -from mtphandler.units import C -from mtphandler.visualize import visualize_plate - -ASSIGN_CASE = Literal["rows", "columns", "all", "all except"] -ASSIGN_CASE_VALUES: Tuple[ASSIGN_CASE, ...] = get_args(ASSIGN_CASE) - - -class PlateManager(BaseModel): - name: str = Field( - ..., - description="Name of the plate", - ) - plate: Plate = Field( - ..., - description="Plate object", - ) - molecules: list[Molecule] = Field( - default=[], - description="List of molecules", - ) - proteins: list[Protein] = Field( - default=[], - description="List of proteins", - ) - - @model_validator(mode="before") - @classmethod - def give_name_to_plate(cls, data: Any) -> Any: - if isinstance(data, dict): - if "name" not in data or data["name"] is None: - data["name"] = "MTP assay" - return data - - def define_molecule( - self, - id: str, - pubchem_cid: int, - name: str | None = None, - constant: bool = False, - ) -> Molecule: - """Defines a molecule which can be used to assign to wells on the plate. - If no name is provided, the molecule name is retrieved from the PubChem database. - If the molecule is not known in the PubChem database, please specify `pubchem_cid=-1`. - - Args: - id (str): Internal identifier of the molecule such as `s0` or `ABTS`. - pubchem_cid (int): PubChem CID of the molecule. - name (str | None, optional): Name of the molecule. Defaults to None. - constant (bool, optional): Indicates whether the molecule concentration is constant throughout the experiment. Defaults to False. - - Raises: - ValueError: If the PubChem CID is not an integer. - ValueError: If the name is not provided and the PubChem CID is not available. - - Returns: - Molecule: Molecule object. - """ - - logger.debug(f"Defining molecule {id} with PubChem CID {pubchem_cid}") - - if not isinstance(pubchem_cid, int): - raise ValueError("PubChem CID must be an integer.") - - if name is None: - if pubchem_cid != -1: - name = pubchem_request_molecule_name(pubchem_cid) - else: - raise ValueError( - "Name must be provided if PubChem CID is not available." - ) - - molecule = Molecule( - id=id, - pubchem_cid=pubchem_cid, - name=name, - constant=constant, - ) - - self._update_molecule(molecule) - - return molecule - - # Adders for species, molecules and proteins - def add_molecule( - self, - molecule: Molecule, - constant: bool | None = None, - ) -> None: - """Adds a molecule to the list of molecules. Allows to update the `constant` attribute of the molecule. - - Args: - molecule (Molecule): Molecule object to add to the list of molecules. - constant (bool | None, optional): Indicates whether the `constant` attribute of the molecule should be updated. Defaults to None. - """ - if constant is not None: - molecule = molecule.model_copy(update={"constant": constant}) - - self._update_molecule(molecule) - - def _update_molecule(self, molecule) -> None: - """Updates the molecule if it already exists in the list of molecules. - Otherwise, the molecule is added to the list of species.""" - for idx, mol in enumerate(self.molecules): - if mol.id == molecule.id: - self.molecules[idx] = molecule - assert self.molecules[idx] is molecule - return - - self.molecules.append(molecule) - - def define_protein( - self, - id: str, - name: str, - sequence: str | None = None, - constant: bool = True, - ) -> Protein: - """Defines a protein which can be used to assign to wells on the plate. - - Args: - id (str): Internal identifier of the protein such as `p0`, `MAT_K78M` or `GFP`. - name (str): Name of the protein. - sequence (str | None, optional): Amino acid sequence of the protein. Defaults to None. - constant (bool, optional): Indicates whether the protein concentration is constant throughout the experiment. Defaults to True. - - Returns: - Protein: Protein object. - """ - protein = Protein( - id=id, - name=name, - sequence=sequence, - constant=constant, - ) - - self._update_protein(protein) - - return protein - - def add_protein( - self, - protein: Protein, - constant: bool | None = None, - ) -> None: - """Adds a protein to the list of proteins. Allows to update the `constant` attribute of the protein. - - Args: - protein (Protein): Protein object to add to the list of proteins. - constant (bool | None, optional): Indicates whether the `constant` attribute of the protein should be updated. Defaults to None. - """ - if constant is not None: - protein = protein.model_copy(update={"constant": constant}) - - self._update_protein(protein) - - def _update_protein(self, protein) -> None: - """Updates the protein if it already exists in the list of proteins.""" - for idx, prot in enumerate(self.proteins): - if prot.id == protein.id: - self.proteins[idx] = protein - assert self.proteins[idx] is protein - return - - self.proteins.append(protein) - - # Assign species and conditions to wells - def assign_init_conditions( - self, - species: Molecule | Protein, - init_conc: float | list[float], - conc_unit: UnitDefinition, - to: ASSIGN_CASE, - ids: Optional[str | int | list[str] | list[int]] = None, - contributes_to_signal: Optional[bool] = None, - silent: bool = False, - ): - """ - Assigns a `Molecule` or `Protein` to specific wells on the plate based on the provided criteria. - In this way the initial concentration of the species can be set for the respective wells in a row, - column, all wells or all wells except for the specified. During the assignment, either an array of - initial concentrations or a single initial concentration can be provided. If a single initial - concentration is provided, it is assigned to all wells of e.g., a row or column. - If an array of initial concentrations is provided, the length of the array must match the number of - wells in the row or column. - - Tip: - For complex assignment scenarios, consider using the `assign_init_conditions_from_spreadsheet` function. - - Args: - species (Molecule | Protein): The species to assign to the wells. - init_conc (float | list[float]): The initial concentration(s) of the species. - conc_unit (UnitDefinition): The unit of concentration. - to (ASSIGN_CASE): The target location(s) for assigning the species. It should be one of the allowed cases. - ids (str | int | list[str] | list[int], optional): The ID(s) of the target wells, rows, or columns. Defaults to None. - contributes_to_signal (bool, optional): Indicates if the assigned species contributes to the signal. - Defaults to None. - silent (bool, optional): If True, no output is printed. Defaults to False. - - Raises: - AttributeError: If the species does not exist in the list of molecules or proteins. - AttributeError: If the 'to' argument is not a valid `ASSIGN_CASE`. - - Returns: - None - """ - - # Handle species - if isinstance(species, str): - species = self.get_species(species) - elif isinstance(species, (Molecule, Protein)): - pass - else: - raise AttributeError( - """Argument 'species' must reference an `id` of a molecule or protein from the list of molecules or proteins of the `MTPHandler`.""" - ) - - if to not in ASSIGN_CASE_VALUES: - raise AttributeError(f"Argument 'to' must be one of {ASSIGN_CASE_VALUES}.") - - if not isinstance(init_conc, list): - init_conc = [init_conc] - - if not isinstance(ids, list) and isinstance(ids, (int, str)): - ids = [ids] # type: ignore - - if to == "all": - if isinstance(init_conc, list): - if len(init_conc) == 1: - init_conc = init_conc[0] - assert isinstance( - init_conc, (float, int) - ), "Argument 'init_conc' must be a float or an integer." - - self._assign_to_all( - species=species, - init_conc=float(init_conc), - conc_unit=conc_unit, - contributes_to_signal=contributes_to_signal, - silent=silent, - ) - - elif to == "columns": - assert ( - isinstance(ids, list) and all(isinstance(i, int) for i in ids) - ), "Argument 'ids' must be a list of integers when 'to' is set to 'columns'." - - self._assign_to_columns( - column_ids=ids, - species=species, - init_concs=init_conc, - conc_unit=conc_unit, - contributes_to_signal=contributes_to_signal, - silent=silent, - ) - - elif to == "rows": - assert isinstance(ids, list) and all( - isinstance(i, str) for i in ids - ), "Argument 'ids' must be a list of strings when 'to' is set to 'rows'." - - self._assign_species_to_rows( - row_ids=ids, - species=species, - init_concs=init_conc, - conc_unit=conc_unit, - contributes_to_signal=contributes_to_signal, - silent=silent, - ) - - else: - if isinstance(init_conc, list): - if len(init_conc) == 1: - init_conc = init_conc[0] - - assert isinstance( - init_conc, float - ), "Argument 'init_conc' must be a float when 'to' is set to 'all_except'." - - self._assign_species_to_all_except( - well_ids=ids, - species=species, - init_conc=init_conc, - conc_unit=conc_unit, - contributes_to_signal=contributes_to_signal, - silent=silent, - ) - - def _assign_to_all( - self, - species: Molecule | Protein, - init_conc: float, - conc_unit: UnitDefinition, - contributes_to_signal: bool | None, - silent: bool, - ): - for well in self.plate.wells: - well.add_to_init_conditions( - species_id=species.id, - init_conc=init_conc, - conc_unit=conc_unit, - ) - - handle_blank_status(well, species.id, init_conc, contributes_to_signal) - - if not silent: - print( - f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" - f" {init_conc} {conc_unit} to all wells." - ) - - def get_calibrator( - self, - molecule: Molecule, - cutoff: float | None = None, - wavelength: float | None = None, - ): - from mtphandler.ioutils.calipytion import initialize_calibrator - - if wavelength is None: - wavelength = self._handle_wavelength() - - return initialize_calibrator( - plate=self.plate, - wavelength=wavelength, - molecule=molecule, - protein_ids=[protein.id for protein in self.proteins], - cutoff=cutoff, - ) - - def _assign_to_columns( - self, - column_ids: list[int], - species: Molecule | Protein, - init_concs: list[float], - conc_unit: UnitDefinition, - contributes_to_signal: bool | None, - silent: bool, - ): - # Handle column_ids - if not all([isinstance(column_id, int) for column_id in column_ids]): - raise AttributeError("Argument 'column_ids' must be a list of integers.") - - columns = [] - for column_id in column_ids: - wells = [well for well in self.plate.wells if well.x_pos + 1 == column_id] - wells = sorted(wells, key=lambda x: x.y_pos) - columns.append(wells) - - # assert thal all columns are the same size - assert all([len(column) == len(columns[0]) for column in columns]), ( - "All columns must be the same size. " "" - ) - - # Handle init_concs - if len(init_concs) == 1: - init_concs = init_concs * len(columns[0]) - - for wells in columns: - assert len(init_concs) == len(wells), f""" - Number of initial concentrations ({len(init_concs)}) does not match number - of wells ({len(wells)}) in columns ({column_ids}). - """ - - for well, init_conc in zip(wells, init_concs): - well.add_to_init_conditions( - species_id=species.id, - init_conc=init_conc, - conc_unit=conc_unit, - ) - - handle_blank_status(well, species.id, init_conc, contributes_to_signal) - - if not silent: - print( - f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" - f" concentrations of {init_concs} {conc_unit} to columns {column_ids}." - ) - - def _assign_species_to_rows( - self, - row_ids: list[str], - species: Molecule | Protein, - init_concs: list[float], - conc_unit: UnitDefinition, - contributes_to_signal: bool | None, - silent: bool, - ): - # Handle row_ids - - if isinstance(row_ids, str): - row_ids = [row_ids] - - if not all([isinstance(row_id, str) for row_id in row_ids]): - raise AttributeError("Argument 'row_ids' must be a list of strings.") - - rows = [] - for row_id in row_ids: - wells = [well for well in self.plate.wells if row_id in well.id] - wells = sorted(wells, key=lambda x: x.x_pos) - rows.append(wells) - - for wells in rows: - assert len(init_concs) == len(wells), f""" - Number of initial concentrations ({len(init_concs)}) does not match number - of wells ({len(wells)}) in rows ({row_ids}). - """ - - for well, init_conc in zip(wells, init_concs): - well.add_to_init_conditions( - species_id=species.id, - init_conc=init_conc, - conc_unit=conc_unit, - ) - - handle_blank_status(well, species.id, init_conc, contributes_to_signal) - - if not silent: - print( - f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" - f" {init_concs} {conc_unit} to rows {row_ids}." - ) - - def _assign_species_to_all_except( - self, - well_ids: list[str], - species: Molecule | Protein, - init_conc: float, - conc_unit: UnitDefinition, - contributes_to_signal: bool | None, - silent: bool, - ): - # validate all well_id exist - for well_id in well_ids: - if not self._well_id_exists(well_id): - raise AttributeError(f"Well ID '{well_id}' not found on the plate.") - - wells = (well for well in self.plate.wells if well.id not in well_ids) - for well in wells: - well.add_to_init_conditions( - species_id=species.id, - init_conc=init_conc, - conc_unit=conc_unit, - ) - - handle_blank_status(well, species.id, init_conc, contributes_to_signal) - - if not silent: - print( - f"Assigned [bold magenta]{species.name}[/] ({species.id}) with" - f" {init_conc} {conc_unit} to all wells except {well_ids}." - ) - - def assign_init_conditions_from_spreadsheet( - self, - conc_unit: UnitDefinition, - path: str, - header: int = 0, - index: int = 0, - silent: bool = False, - ): - """Assign initial concentrations from an Excel spreadsheet to the wells on the plate. - - Note: - This function goes through the sheets in an excel spreadsheet. If the sheet name - matches the id of a protein or molecule defined for the plate, the initial concentration - form the plate map in the excel spreadsheet is assigned to the respective well. - - The excel spreadsheet must have the following structure: - - - The first row must contain the column numbers from 1 to 12. - - The first column must contain the row letters from A to H. - - If a cell is left empty for a species, the species is not assigned to the well. - - If the initial concentration is `0`, the species is added to the well. This is useful for - specifying a product which is not present in the initial reaction mixture, but is formed - during the reaction. - - Args: - conc_unit (UnitDefinition): The unit of concentration. - path (str): Path to the Excel spreadsheet. - header (int, optional): Row to use as the column names. Defaults to 0. - index (int, optional): Column to use as the row labels. Defaults to 0. - silent (bool, optional): If True, no output is printed. Defaults to False. - """ - # get excel sheet names - count = 0 - sheet_names = pd.ExcelFile(path).sheet_names - - species_matches: set[str] = set() - for protein in self.proteins: - if protein.id.lower() in [sheet.lower() for sheet in sheet_names]: - species_matches.add(protein.id) - for molecule in self.molecules: - if molecule.id.lower() in [sheet.lower() for sheet in sheet_names]: - species_matches.add(molecule.id) - - for species_id in species_matches: - df = pd.read_excel( - io=path, header=header, index_col=index, sheet_name=species_id - ) - for well in self.plate.wells: - init_conc = df.iloc[well.y_pos, well.x_pos] - - if np.isnan(init_conc): - continue - - well.add_to_init_conditions( - species_id=species_id, - init_conc=init_conc, - conc_unit=conc_unit, - ) - count += 1 - - handle_blank_status( - well, species_id, init_conc, contributes_to_signal=None - ) - - if not silent: - print( - f"📍 Assigned {count} initial concentrations coditions for [bold magenta]{list(species_matches)}[/]" - f" from {path} to the plate." - ) - - def set_species_contribututes_to_signal( - self, - species: Molecule | Protein, - contributes_to_signal: bool, - wavelength: float | None = None, - silent: bool = False, - ): - """Set the contribution of a species to the signal in all wells. - - Args: - species (Molecule | Protein): The species for which to set the contribution to the signal. - contributes_to_signal (bool): If True, the species contributes to the signal. If False, the species does not contribute to the signal. - wavelength (float | None, optional): The wavelength at which to set the contribution to the signal. Defaults to None. - silent (bool, optional): If True, no output is printed. Defaults to False. - """ - if wavelength is None: - try: - wavelength = self._handle_wavelength() - except ValueError: - raise ValueError( - "Multiple wavelengths were measured. Please specify one." - ) - - for well in self.plate.wells: - if not well_contains_species(well, species.id): - print("Species not found in well.") - continue - - for measurement in well.measurements: - if measurement.wavelength != wavelength: - continue - - for state in measurement.blank_states: - if state.species_id == species.id: - state.contributes_to_signal = contributes_to_signal - - if not silent: - print( - f"Set contribution to signal of [bold magenta]{species.name}[/] ({species.id}) at" - f" {wavelength} nm to {contributes_to_signal}." - ) - - def get_well(self, id: str) -> Well: - """Get a well from the plate by its id. - - Args: - id (str): The id of the well. - - Raises: - ValueError: If the well with the given id is not found. - - Returns: - Well: The well object. - """ - - for well in self.plate.wells: - if well.id.lower() == id.lower(): - return well - - raise ValueError(f"Well {id} not found") - - def get_species(self, id: str) -> Protein | Molecule: - """Get a species from the list of molecules and proteins by its id. - - Args: - id (str): The id of the species. - - Raises: - ValueError: If the species with the given id is not found. - - Returns: - Protein | Molecule: The species object. - """ - for protein in self.proteins: - if protein.id == id: - return protein - for molecule in self.molecules: - if molecule.id == id: - return molecule - - raise ValueError(f"Species {id} not found") - - def visualize( - self, - zoom: bool = False, - wavelengths: list[float] = [], - darkmode: bool = False, - ): - """Visualize the plate. - - Args: - zoom (bool, optional): If False, the scaling of the signal (y-axis) is the same for all wells. - If True, the scaling is adjusted for each well. Defaults to False. - wavelengths (list[float], optional): Only visualize the signal at the specified wavelengths. - If not specified, all wavelengths are visualized. Defaults to []. - darkmode (bool, optional): If True, the plot is displayed in dark mode. Defaults to False. - """ - - visualize_plate( - self.plate, - zoom=zoom, - wavelengths=wavelengths, - darkmode=darkmode, - name=self.name, - ) - - def _handle_wavelength(self) -> float: - """ - If only one wavelength was measured, the wavelength is returned. - If multiple wavelengths were measured, an error is raised. - """ - - # check that all measurements in the wells have only one wavelength - wavelengths = set() - for well in self.plate.wells: - for meas in well.measurements: - wavelengths.add(meas.wavelength) - - if len(wavelengths) > 1: - raise ValueError("Multiple wavelengths were measured. Please specify one.") - - return wavelengths.pop() - - def _find_blanking_wells( - self, - target: Molecule | Protein, - wavelength: float, - ) -> list[Well]: - wells = [] - wavelength = self._handle_wavelength() - - protein_ids = [protein.id for protein in self.proteins] - molecules_ids = [molecule.id for molecule in self.molecules] - - # find wells that contain the target species with a concentration above zero - for well in self.plate.wells: - if not well_contains_species(well, target.id, conc_above_zero=True): - continue - - # Molecule controls can not include proteins - if target.id in molecules_ids and any( - [ - well_contains_species(well, protein_id, conc_above_zero=True) - for protein_id in protein_ids - ] - ): - continue - - for measurement in well.measurements: - if measurement.wavelength != wavelength: - continue - - # sanity check, species should be present in blank states - assert target.id in [ - state.species_id for state in measurement.blank_states - ], f"Species {target.id} not found in well {well.id}." - - # check is species contributes to signal (== is already blanked) - if measurement_is_blanked_for(measurement, target.id): - wells.append(well) - - return wells - - def slice_data( - self, - start: float, - end: float, - ): - """Slices the time and absorption data of all wells in the plate - that only contains the data between the start and end time. - - Args: - start (float): Start time of the slice. - end (float): End time of the slice. - """ - - for well in self.plate.wells: - for meas in well.measurements: - # find the index of the start and end time - start_idx = np.where(np.array(meas.time) >= start)[0][0] - end_idx = np.where(np.array(meas.time) <= end)[0][-1] - - # slice the time and absorption data - meas.time = meas.time[start_idx:end_idx] - meas.absorption = meas.absorption[start_idx:end_idx] - - def blank_species( - self, - species: Molecule | Protein, - wavelength: float | None = None, - silent: bool = False, - ): - """Blank the signal contribution of a species at a given wavelength. - Therefore, control wells of that species must be present on the plate. - - Args: - species (Molecule | Protein): The species to blank. - wavelength (float): The wavelength at which to blank the species. - silent (bool, optional): If True, no output is printed. Defaults to False. - - Raises: - ValueError: If no wells are found to calculate the absorption contribution of the species. - """ - - wavelength = self._handle_wavelength() - - blanking_wells = self._find_blanking_wells( - target=species, wavelength=wavelength - ) - if not blanking_wells: - print( - "No wells found to calculate the absorption contribution of the species." - ) - return - - # get mapping of concentration to blank wells - conc_blank_mapping = self._get_conc_blank_mapping( - wells=blanking_wells, species=species, wavelength=wavelength - ) - - self._apply_blank( - species=species, - conc_blank_mapping=conc_blank_mapping, - wavelength=wavelength, - ) - - def _apply_blank( - self, - species: Molecule | Protein, - conc_blank_mapping: dict[float, float], - wavelength: float, - ): - """Apply the blanking to the absorption data of a well. - - Args: - species (Molecule | Protein): The species to blank. - conc_blank_mapping (dict[float, float]): Mapping of init concentration of the species to mean absorption. - wavelength (float): The wavelength at which to blank the species. - """ - well_blanked_count = 0 - - for well_id, well in enumerate(self.plate.wells): - for meas_id, measurement in enumerate(well.measurements): - if measurement.wavelength != wavelength: - continue - - try: - init_condition = get_species_condition(well, species.id) - except ValueError: - continue - - for state_id, blank_state in enumerate(measurement.blank_states): - if blank_state.species_id != species.id: - continue - - if blank_state.contributes_to_signal: - self.plate.wells[well_id].measurements[meas_id].absorption = [ - absorption - conc_blank_mapping[init_condition.init_conc] - for absorption in measurement.absorption - ] - - self.plate.wells[well_id].measurements[meas_id].blank_states[ - state_id - ].contributes_to_signal = False - - well_blanked_count += 1 - - print(f"Blanked {well_blanked_count} wells containing {species.name}.\n") - - def to_enzymeml( - self, - detected_molecule: Molecule, - well_ids: list[str] | None = None, - wells_with_protein_only: bool = True, - name: str | None = None, - to_concentration: bool = False, - extrapolate: bool = False, - wavelength: float | None = None, - silent: bool = False, - ) -> EnzymeMLDocument: - """Convert the plate to an EnzymeML document. - - - Args: - name (str | None, optional): Name of the EnzymeML document. Defaults to the name of the plate. - detected_molecule (Molecule): The molecule that was detected in the wells. - well_ids (list[str] | None, optional): List of well ids to include in the EnzymeML document. - If not provided, all wells are included. Defaults to None. - to_concentration (bool, optional): If True, the signal is converted to concentration. Therefore, - a calibrator must be defined for the respective molecule. Defaults to False. - extrapolate (bool, optional): If True, and `to_concentration` is True, measured absorption values - that are outside the range of the calibrator are extrapolated. Defaults to False. - wells_with_protein_only (bool, optional): If True, only wells with protein are included in the - EnzymeML document. This assumes that wells with a protein are catalyzed wells. Defaults to True. - wavelength (float | None, optional): If multiple wavelengths were measured, the wavelength for - which to convert the signal to concentration needs to be specified. Defaults to None. - silent (bool, optional): If True, no output is printed. Defaults to False. - - Returns: - EnzymeMLDocument: [`pyenzyme`](https://github.com/EnzymeML/PyEnzyme) `EnzymeMLDocument` object. - """ - from mtphandler.ioutils.pyenzyme import Plate_to_EnzymeMLDocument - - if name is None: - name = self.name - - converter = Plate_to_EnzymeMLDocument( - name=name, - plate=self.plate, - well_ids=well_ids, - molecules=self.molecules, - detected_molecule=detected_molecule, - proteins=self.proteins, - to_concentration=to_concentration, - extrapolate=extrapolate, - wells_with_protein_only=wells_with_protein_only, - wavelength=wavelength, - silent=silent, - ) - - return converter.convert() - - def _well_id_exists(self, well_id: str) -> bool: - """Check if a well with the given id exists in the plate.""" - return any([well_id in well.id for well in self.plate.wells]) - - def _get_conc_blank_mapping( - self, - wells: list[Well], - species: Protein | Molecule, - wavelength: float, - ) -> dict[float, float]: - """Calculate the mean absorption of a species at different concentrations. - - Args: - wells (list[Well]): List of wells to calculate the mean absorption for. - species (Protein | Molecule): The species for which to calculate the mean absorption. - wavelength (float): The wavelength at which to calculate the mean absorption. - - Returns: - dict[float, float]: Mapping of concentration to mean absorption. - """ - conc_to_absorptions = defaultdict(list) - - # Collect all absorption data per concentration - for well in wells: - condition = get_species_condition(well, species.id) - absorption = get_measurement(well, wavelength).absorption - conc_to_absorptions[condition.init_conc].append(absorption) - - conc_mean_blank_mapping = {} - - # Calculate mean absorption and standard deviation - for conc, absorptions in conc_to_absorptions.items(): - mean_absorption = np.nanmean(absorptions) - std_absorption = np.nanstd(absorptions) - - # Handle case where mean_absorption is zero to avoid division by zero - if mean_absorption != 0: - std_perc = float(abs(std_absorption / mean_absorption) * 100) - else: - std_perc = 0.0 - - # Print formatted information - print( - f"Mean absorption of [bold magenta]{species.name}[/] ({species.id}) at" - f" {conc} {condition.conc_unit.name}: {mean_absorption:.4f} ±" - f" {std_perc:.0f}% calculated based on wells" - f" {[well.id for well in wells]}." - ) - - conc_mean_blank_mapping[conc] = mean_absorption.tolist() - - return conc_mean_blank_mapping - - @staticmethod - def _species_contibutes( - measurement: PhotometricMeasurement, species_id: str - ) -> bool: - species_contributes = [ - state.contributes_to_signal - for state in measurement.blank_states - if state.species_id == species_id - ][0] - - return species_contributes - - @staticmethod - def _get_blank_state( - measurement: PhotometricMeasurement, species_id: str - ) -> BlankState: - for state in measurement.blank_states: - if state.species_id == species_id: - return state - - raise ValueError(f"Species {species_id} is not present in this well.") - - @classmethod - def read_spectra_max_190( - cls, - path: str, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.txt` file exported from a SpectraMax 190 software and create a PlateManager object. - - Args: - path (str): Path to the SpectraMax 190 file. - ph (float | None, optional): The pH value of the measurements. Defaults to None. - name (str | None, optional): Name of the plate. Defaults to None. - - Returns: - PlateManager: PlateManager object. - """ - from mtphandler.readers import read_spectra_max_190 as reader - - data: dict[str, Any] = {"plate": reader(path, ph)} - - if name is not None: - data["name"] = name - - return cls(**data) - - @classmethod - def read_multiskan_spectrum_1500( - cls, - path: str, - time: list[float], - time_unit: UnitDefinition, - temperature: float, - temperature_unit: UnitDefinition = C, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.txt` file exported from a Multiskan Spectrum 1500 and create a PlateManager object. - - Args: - name (str): Name of the plate. - path (str): Path to the Multiskan Spectrum 1500 file. - time (list[float]): List of time points. - time_unit (UnitDefinition): Unit of time. - temperature (float): Temperature of the measurements. - temperature_unit (UnitDefinition, optional): Unit of temperature. Defaults to C. - ph (float | None, optional): The pH value of the measurements. Defaults to None. - - Returns: - _type_: _description_ - """ - from mtphandler.readers import read_multiskan_spectrum_1500 as reader - - data: dict[str, Any] = { - "plate": reader( - path=path, - time=time, - time_unit=time_unit, - temperature=temperature, - temperature_unit=temperature_unit, - ph=ph, - ) - } - - if name is not None: - data["name"] = name - - return cls(**data) - - @classmethod - def read_tecan_spark( - cls, - path: str, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.xlsx` TECAN Spark file and create a PlateManager object. - - Args: - path (str): Path to the TECAN Spark file. - ph (float | None, optional): The pH value of the measurements. Defaults to None. - name (str | None, optional): Name of the plate. Defaults to None. - - Returns: - PlateManager: PlateManager object. - """ - from mtphandler.readers import read_tekan_spark as reader - - data: dict[str, Any] = {"plate": reader(path, ph)} - - if name is not None: - data["name"] = name - - return cls(**data) - - @classmethod - def read_biotek( - cls, - path: str, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.xlsx` file exported from a BioTek Epoch 2 software and create a PlateManager object. - - Args: - path (str): Path to the BioTek Epoch 2 file. - ph (float | None, optional): The pH value of the measurements. Defaults to None. - name (str | None, optional): Name of the plate. Defaults to None. - - Returns: - PlateManager: PlateManager object. - """ - from mtphandler.readers import read_biotek as reader - - data: dict[str, Any] = {"plate": reader(path, ph)} - - if name is not None: - data["name"] = name - - return cls(**data) - - @classmethod - def read_tekan_magellan( - cls, - path: str, - wavelength: float, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.xlsx` file exported from a TECAN Magellan software and create a PlateManager object. - - Args: - path (str): Path to the Magellan file. - wavelength (float): The wavelength of the measurements. - ph (Optional[float], optional): The pH value of the measurements. Defaults to None. - name (Optional[str], optional): Name of the plate. Defaults to None. - - Returns: - PlateManager: PlateManager object. - """ - from mtphandler.readers import read_tekan_magellan as reader - - data: dict[str, Any] = {"plate": reader(path, wavelength, ph)} - - if name is not None: - data["name"] = name - - return cls(**data) - - @classmethod - def read_multiskan_sky( - cls, - path: str, - ph: float | None = None, - name: str | None = None, - ) -> PlateManager: - """Read a `*.xlsx` file exported from a Multiskan Sky and create a PlateManager object. - - Args: - path (str): Path to the Multiskan Sky file. - ph (float | None, optional): The pH value of the measurements. Defaults to None. - name (str | None, optional): Name of the plate. Defaults to None. - - Returns: - PlateManager: _description_ - """ - from mtphandler.readers import read_multiskan_sky as reader - - data: dict[str, Any] = {"plate": reader(path, ph)} - - if name is not None: - data["name"] = name - - return cls(**data) - - -if __name__ == "__main__": - # path = ( - # "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/spectra_max_190.txt" - # ) - - # pm = PlateManager.read_spectra_max_190(path, ph=6.9) - # pm.visualize() - - # pm - - # h1 = pm.get_well("H1") - - # print(h1.id) - # print(h1.x_pos) - # print(h1.y_pos) - # print(h1.measurements[0].absorption[0]) - # print(h1.measurements[0].absorption[-1]) - - # path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/tekan_spark.xlsx" - # from mtphandler.model import Plate - - # p = PlateManager.read_tecan_spark(path, 7.4) - - # print(p.plate.temperatures) - - # p.visualize() - - # path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/magellan.xlsx" - - # plate = PlateManager.read_tekan_magellan(path, wavelength=600, ph=7) - - # plate.visualize() - - # path = ( - # "/Users/max/Documents/training_course/jules/Spectramax190 molecular Devices.txt" - # ) - - # from mtphandler.units import mM - - # pm = PlateManager.read_spectra_max_190(path, ph=7) - # # pm.visualize() - # testo = pm.define_molecule("testo", 60857, "testosterone") - # mpi = pm.define_molecule("mpi", 60961, "methylparaben") - - # pm.assign_species( - # species=testo, - # init_conc=0.1, - # conc_unit=mM, - # to="all", - # ) - - # path = ( - # "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/ BioTek_Epoch2.xlsx" - # ) - - # from mtphandler.units import mM - - # p = PlateManager.read_biotek(path, ph=7.4) - # p.visualize() - - # testo = p.define_molecule("testo", 60857, "testosterone") - # aldolase = p.define_protein("aldolase", "Aldolase") - - # p.assign_init_conditions( - # species=testo, - # init_conc=0.1, - # conc_unit=mM, - # to="all", - # ) - - # aldo = p.define_protein("aldolase", "Aldolase") - # p.assign_init_conditions( - # species=aldo, init_conc=0.1, conc_unit=mM, to="all", contributes_to_signal=False - # ) - - # enz = p.to_enzymeml(name="Test EnzymeML", wells_with_protein_only=False) - - # with open("enz.json", "w") as f: - # f.write(enz.model_dump_json(indent=4)) - # print(len(enz.measurements)) - - # from mtphandler.units import C, min - - # path = "docs/examples/data/multiskan_spectrum_1500.txt" - - # ph = 7.0 - # wavelength = 450.0 - - # time = np.arange(0, 15.5, 0.5).tolist() - # print(f"the thime is {time}") - - # plate = PlateManager.read_multiskan_spectrum_1500( - # path=path, - # ph=ph, - # time=time, - # time_unit=min, - # temperature=37.0, - # temperature_unit=C, - # ) - # plate.visualize() - - path = ( - "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/Multiskan Sky.xlsx" - ) - - p = PlateManager.read_multiskan_sky(path, ph=None) - p.visualize(darkmode=True) - print(p) - - -class BlankResult(BaseModel): - species_id: str = Field( - description="The id of the species for which the blank was calculated.", - default=None, - ) - wavelength: float = Field( - description="The wavelength at which the blank was calculated.", - default=None, - ) - control_well_ids: list[str] = Field( - description="The ids of the wells used to calculate the blank.", - default=[], - ) - mean_contribution: float = Field( - description="The mean contribution of the species to the signal.", - default=None, - ) - std_contribution: float = Field( - description="The standard deviation of the contribution of the species to the signal.", - default=None, - ) - applied_to_well_ids: list[str] = Field( - description="The ids of the wells to which the blank was applied.", - default=[], - ) diff --git a/temp_handler/readers/__init__.py b/temp_handler/readers/__init__.py deleted file mode 100644 index 2aecda4..0000000 --- a/temp_handler/readers/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .biotek import read_biotek # noqa -from .multiskan_spectrum_parser import read_multiskan_spectrum_1500 # noqa -from .spectra_max_190 import read_spectra_max_190 # noqa -from .spectramax_parser import read_spectramax # noqa -from .tekan_magellan import read_tekan_magellan # noqa -from .tekan_spark import read_tekan_spark # noqa -from .multiskan_sky import read_multiskan_sky # noqa diff --git a/temp_handler/readers/biotek.py b/temp_handler/readers/biotek.py deleted file mode 100644 index 067a098..0000000 --- a/temp_handler/readers/biotek.py +++ /dev/null @@ -1,139 +0,0 @@ -from __future__ import annotations - -import re -from datetime import datetime, timedelta - -import numpy as np -import pandas as pd - -from mtphandler.model import Plate -from mtphandler.readers.utils import id_to_xy -from mtphandler.tools import get_well -from mtphandler.units import C, second - -PATTERN_WAVELENGTH = r"Wavelengths:\s+(\d{1,4})([\s,;]+\d{1,4})*" - - -def extract_integers(s: str) -> list[int]: - # Find all sequences of digits in the string - matches = re.findall(r"\d+", s) - # Convert the matched strings to integers - return [int(match) for match in matches] - - -def parse_measurement_interval(s: str) -> float: - # Flexible regex pattern to match time formats - time_pattern = r"(\d{1,4}):(\d{1,2}):(\d{1,2})" - - interval_match = re.search(r"Interval\s+" + time_pattern, s, re.IGNORECASE) - - if not interval_match: - raise ValueError("Measurement interval not found.") - - interval = timedelta( - hours=int(interval_match.group(1)), - minutes=int(interval_match.group(2)), - seconds=int(interval_match.group(3)), - ) - - # interval in minutes - return interval.total_seconds() / 60 - - -def read_biotek( - path: str, - ph: float | None, -) -> Plate: - df = pd.read_excel(path) - - date = get_row_by_value(df, "Date")[-1] - time = get_row_by_value(df, "Time")[-1] - timestamp = datetime.combine(date, time.time()).isoformat() - - row_index_int_map = df.iloc[:, 0].apply(lambda cell: isinstance(cell, int)) - data_block_starts = [ - index for index, value in enumerate(row_index_int_map) if value - ] - - wavelengths_cell = str(df.iloc[19, 1]) - wavelengths = extract_integers(wavelengths_cell) - - measurement_int_cell = str(df.iloc[15, 1]) - measurement_interval = parse_measurement_interval(measurement_int_cell) - - plate = Plate( - date_measured=timestamp, - time_unit=second, - temperature_unit=C, - ) - - for row_index, (block_start, wavelength) in enumerate( - zip(data_block_starts, wavelengths) - ): - try: - block = df.iloc[block_start + 2 : data_block_starts[row_index + 1], :] - except IndexError: - block = df.iloc[block_start + 2 :, :] - - block = block.drop("Unnamed: 0", axis=1).reset_index(drop=True) - - all_nan_rows = block.isna().all(axis=1) - first_all_nan_index = all_nan_rows.idxmax() if all_nan_rows.any() else None - - # drop rows if any of the values are NaN - block.iloc[:first_all_nan_index, :] - block = block.dropna(how="any", axis=0) - column_names = block.iloc[0, :].tolist() - block.columns = column_names - block = block[1:].reset_index(drop=True) - - # Temperature - temperature = block.pop(column_names[1]).values - - for column_name in column_names[2:]: - x, y = id_to_xy(column_name) - - try: - well = get_well(plate, column_name) - except ValueError: - well = plate.add_to_wells(id=column_name, x_pos=x, y_pos=y, ph=ph) - - data = block[column_name].values.tolist() - time = np.arange( - 0, len(data) * measurement_interval, measurement_interval - ).tolist() - - well.add_to_measurements( - wavelength=wavelength, - absorption=data, - time=time, - time_unit=second, - ) - - plate.temperatures = temperature.tolist() - plate.times = time - - # assert that all plate -> well -> measurement -> absorption have the same length - for well in plate.wells: - for measurement in well.measurements: - assert len(measurement.absorption) == len(measurement.time), ( - f"Absorption and time data for well {well.id} and wavelength " - f"{measurement.wavelength} do not have the same length." - ) - - return plate - - -def get_row_by_value(df: pd.DataFrame, value: str) -> list: - row_df = df[df.iloc[:, 0].values == value] - row_df = row_df.reset_index(drop=True) - row_df = row_df.dropna(axis=1, how="all") - return row_df.loc[0].values.tolist() - - -if __name__ == "__main__": - path = ( - "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/ BioTek_Epoch2.xlsx" - ) - - print(read_biotek(path, ph=7.4)) diff --git a/temp_handler/readers/multiskan_sky.py b/temp_handler/readers/multiskan_sky.py deleted file mode 100644 index 33ab267..0000000 --- a/temp_handler/readers/multiskan_sky.py +++ /dev/null @@ -1,107 +0,0 @@ -from __future__ import annotations - -import pandas as pd - -from mtphandler.model import Plate, Well -from mtphandler.readers.utils import id_to_xy -from mtphandler.units import C, second - - -def read_multiskan_sky( - path: str, - ph: float | None, -) -> Plate: - sheetnames = pd.ExcelFile(path).sheet_names - - RAW_DATA = next(sheet for sheet in sheetnames if "Raw data" in sheet), None - GENERAL_INFO = next(sheet for sheet in sheetnames if "General info" in sheet), None - - if RAW_DATA is None or GENERAL_INFO is None: - raise ValueError( - "The provided Excel file does not contain the expected sheets." - ) - - df, timestamp = raw_data_to_df(RAW_DATA[0], path) - temperature = get_temperature(GENERAL_INFO[0], path) - - wells = df_to_wells(df, ph) - - return Plate( - wells=wells, - date_measured=timestamp, - temperatures=[temperature], - temperature_unit=C, - ) - - -def get_temperature(sheetname: str, path: str) -> float: - df = pd.read_excel(path, sheet_name=sheetname) - - return float(df.iloc[6, 3].split(" ")[0]) - - -def raw_data_to_df(sheetname: str, path: str) -> pd.DataFrame: - # Read the sheet from the Excel file - df = pd.read_excel(path, sheet_name=sheetname) - - # Extract the timestamp from the first row - timestamp = str(df.iloc[0, 0]) - - # Identify the row number where 'Well' is located - well_row = df[df.iloc[:, 0] == "Well"].index[0] - - # Extract the data from the identified row onwards - data_df = df.iloc[well_row:, :] - - # Set the first row as column names - data_df.columns = data_df.iloc[0] - - # Drop the row with the column names as it is now set as the header - data_df = data_df.drop(data_df.index[0]) - - # Reset the index and set a MultiIndex with 'Well' and 'Wavelength(s) [nm]' - data_df.set_index(["Well", "Wavelength(s) [nm]"], inplace=True) - - return data_df, timestamp - - -def df_to_wells(df: pd.DataFrame, ph: float | None) -> pd.DataFrame: - wells = [] - existing_well_ids = set() - - for well_id in df.index.get_level_values("Well").unique(): - df_well = df.loc[well_id] - well_id = well_id.replace(" ", "").strip() - - for wavelength in df_well.index.get_level_values("Wavelength(s) [nm]").unique(): - df_wavelength = df_well.loc[wavelength] - df_wavelength = df_wavelength.sort_values(by="Measurement time(s)") - - if well_id not in existing_well_ids: - x, y = id_to_xy(well_id) - well = Well( - id=well_id, - x_pos=x, - y_pos=y, - ph=ph, - ) - existing_well_ids.add(well_id) - - well.add_to_measurements( - wavelength=wavelength, - absorption=df_wavelength["Raw absorbance"].tolist(), - time=df_wavelength["Measurement time(s)"] / 60, - time_unit=second, - ) - - wells.append(well) - - return wells - - -if __name__ == "__main__": - path = ( - "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/Multiskan Sky.xlsx" - ) - - print(read_multiskan_sky(path, ph=None)) diff --git a/temp_handler/readers/multiskan_spectrum_parser.py b/temp_handler/readers/multiskan_spectrum_parser.py deleted file mode 100644 index 1c7d844..0000000 --- a/temp_handler/readers/multiskan_spectrum_parser.py +++ /dev/null @@ -1,198 +0,0 @@ -from __future__ import annotations - -import re -from collections import defaultdict - -import numpy as np -import pandas as pd - -from mtphandler.model import Plate, UnitDefinition -from mtphandler.units import C, nm - - -def read_multiskan_spectrum_1500( - path: str, - time: list[float], - time_unit: UnitDefinition, - temperature_unit: UnitDefinition, - ph: float | None = None, - temperature: float | None = None, -) -> Plate: - # Extract temperature from path - if not temperature: - TEMP_PATTERN = r"\d{1,3}deg" - temperature = re.findall(TEMP_PATTERN, path)[0] - temperature = re.split("(\d+)", temperature)[1] - if not temperature: - raise ValueError("Could not find pH in path. Please specify 'ph'.") - - if not temperature_unit: - temperature_unit = C - - if isinstance(time, np.ndarray): - time = time.tolist() - - # Extract pH from path - if not ph: - PH_PATTERN = r"pH\d+\.\d+" - ph = re.findall(PH_PATTERN, path)[0] - ph = float(re.search(r"\d+\.\d+", ph).group()) - if not ph: - raise ValueError("Could not find pH in path. Please specify pH.") - - # Read file - data_dict = extract_data(pd.read_csv(path).reset_index()) - - # Extract plate dimensions and number of measured timepoints - n_rows, n_columns, n_timepoints = next(iter(data_dict.values())).shape - - if len(time) != n_timepoints: - raise ValueError( - f"Number of timepoints in data set ({n_timepoints}) does not match " - f"number of timpoints in provided 'time' array ({len(time)})." - ) - - # Create plate - plate = Plate( - temperatures=[temperature], - temperature_unit=temperature_unit, - times=time, - time_unit=time_unit, - ) - - # Add wells to plate - for wavelength, data in data_dict.items(): - for row_id, row in enumerate(data): - for column_id, column in enumerate(row): - id = _coordinates_to_id(column_id, row_id) - if id not in [well.id for well in plate.wells]: - plate.add_to_wells( - id=id, - x_pos=column_id, - y_pos=row_id, - ph=ph, - ) - - well = [well for well in plate.wells if well.id == id][0] - well.add_to_measurements( - wavelength=wavelength, - wavelength_unit=nm, - absorption=column, - time=time, - time_unit=time_unit, - ) - - return plate - - -def extract_data(df: pd.DataFrame) -> dict[int, list[list[float]]]: - # Get slices of the data corresponding each iteration of measurement - wavelength_df_dict = _get_plate_dfs(df) - - # Extract data from each iteration - wavelength_data_dict = defaultdict(list) - for wavelength, dfs in wavelength_df_dict.items(): - for df in dfs: - data_of_iteration = df.apply(_extract_row_data, axis=1) - data_of_iteration = np.array(data_of_iteration.to_list()) - - wavelength_data_dict[wavelength].append(data_of_iteration) - - # Convert data to numpy arrays - for wavelength, data in wavelength_data_dict.items(): - wavelength_data_dict[wavelength] = np.array(data).swapaxes(0, 2).swapaxes(0, 1) - - return wavelength_data_dict - - -def _get_plate_dfs(df: pd.DataFrame) -> dict[int, list[pd.DataFrame]]: - wavelength_slices_dict = defaultdict(list) - - # Get data by wavelength - wavelengths = df.apply(_get_wavelengths, axis=1).dropna().astype(int).tolist() - - wavelength_dfs = _segment_dataframe(df, "Wavelength") - - for wavelength, wavelength_df in zip(wavelengths, wavelength_dfs): - iteration_dfs = _segment_dataframe(wavelength_df, "Iteration") - - for iteration_df in iteration_dfs: - wavelength_slices_dict[wavelength].append(iteration_df) - - return wavelength_slices_dict - - -def _extract_row_data(row: pd.Series): - value = row.values[1] - return [float(x) for x in value.split("\t") if len(x) != 0] - - -def _segment_dataframe(df: pd.DataFrame, key: str): - def filter_function(row: pd.Series): - row_id, value = row.values - if key in value: - return row_id - - slice_ids = df.apply(filter_function, axis=1).dropna().astype(int).values.tolist() - - list_of_slices = _list_to_slices(slice_ids) - - return (df.loc[slc] for slc in list_of_slices) - - -def _list_to_slices(_list: list[int]) -> list[slice]: - slices = [] - for idx, element in enumerate(_list): - try: - start_data_slice = element + 1 - end_data_slice = _list[idx + 1] - 1 - slices.append(slice(start_data_slice, end_data_slice)) - - except IndexError: - start_data_slice = element + 1 - end_data_slice = None - slices.append(slice(start_data_slice, end_data_slice)) - - return slices - - -def _get_wavelengths(row: pd.Series): - value = row.values[1] - - if "Wavelength" in value: - return re.findall(r"Wavelength:(.*)", value)[0] - - -def _coordinates_to_id(x: int, y: int) -> str: - return f"{chr(y + 65)}{x+1}" - - -def id_to_xy(well_id: str): - return ord(well_id[0].upper()) - 65, int(well_id[1:]) - 1 - - -if __name__ == "__main__": - import numpy as np - from devtools import pprint - - from mtphandler.model import Plate - from mtphandler.units import C, min - - path = "docs/examples/data/multiskan_spectrum_1500.txt" - - ph = 7.0 - wavelength = 450.0 - - time = np.arange(0, 15.5, 0.5).tolist() - print(f"the thime is {time}") - - pprint( - read_multiskan_spectrum_1500( - path=path, - ph=ph, - time=time, - time_unit=min, - temperature=37.0, - temperature_unit=C, - ) - ) diff --git a/temp_handler/readers/spectra_max_190.py b/temp_handler/readers/spectra_max_190.py deleted file mode 100644 index f14f972..0000000 --- a/temp_handler/readers/spectra_max_190.py +++ /dev/null @@ -1,267 +0,0 @@ -import re -from io import StringIO - -import numpy as np -import pandas as pd -from loguru import logger - -from mtphandler.model import Plate, Well -from mtphandler.readers.utils import id_to_xy, xy_to_id -from mtphandler.units import C, second - - -class WrongParserError(Exception): - """Exception raised when the wrong parser is used to read a file.""" - - def __init__(self, parser_name, expected_format): - self.parser_name = parser_name - self.expected_format = expected_format - super().__init__(self._generate_message()) - - def _generate_message(self): - return ( - f"Error in {self.parser_name}: Expected format '{self.expected_format}', " - ) - - -def read_spectra_max_190(path, ph: float | None) -> Plate: - """ - Reads SpectraMax 190 data from a file and returns a plate object. - - Args: - path (str): The path to the file containing the SpectraMax 190 data. - ph (float | None, optional): The pH value. Defaults to None. - - Returns: - plate: The plate object containing the data. - - Raises: - WrongParserError: If the file format is not SpectraMax 190. - ValueError: If the wavelengths could not be extracted or the data blocks are not of equal shape. - - """ - - iso_encoding = "ISO-8859-1" - utf16_encoding = "utf-16" - - try: - lines = open_file(path, iso_encoding) - if "##BLOCKS" not in lines[0]: - raise ValueError - except ValueError: - lines = open_file(path, utf16_encoding) - if "##BLOCKS" not in lines[0]: - raise WrongParserError( - parser_name="read_spectra_max_190", - expected_format="SpectraMax 190", - ) - - wavelength_pattern = r"(?:[^\t]*\t){15}(\d+)" - - # Extract the wavelength - try: - wavelength = float(re.findall(wavelength_pattern, lines[1])[0]) - except ValueError: - raise ValueError("Wavelengths could not be extracted.") - - try: - blocks = identify_blocks(lines) - times, temperatures, blocks = sanitize_blocks(blocks) - try: - data_matrix = np.array(blocks) - data_matrix = data_matrix.swapaxes(0, 2) - except ValueError: - raise ValueError( - "Data blocks are not of equal shape, file seems corrupted." - ) - plate = map_to_plate(data_matrix, times, temperatures, ph, wavelength) - - return plate - - except IndexError: - for line_id, line in enumerate(lines): - if line.startswith("Time"): - start_id = line_id - if line.startswith("\n"): - end_id = line_id - break - - data = lines[start_id:end_id] - # make pandas df from öist of strings - data_str = "\n".join(data) - - # Use StringIO to simulate a file object - data_io = StringIO(data_str) - - # Use pd.read_csv with sep='\t' to read the data into a DataFrame - df = pd.read_csv(data_io, sep="\t") - # drop unnamed columns - df = df.loc[:, ~df.columns.str.contains("^Unnamed")] - print(df.index) - time = df.pop("Time") - time = [time_to_min_float(t) for t in time] - temperatures = df.pop("Temperature(¡C)").values.tolist() - print(df) - - # iterate over the columns and create wells - wells = [] - for column in df.columns: - x, y = id_to_xy(column) - well = Well( - id=column, - x_pos=x, - y_pos=y, - ph=ph, - ) - well.add_to_measurements( - wavelength=wavelength, - absorption=df[column].values.tolist(), - time=time, - time_unit=second, - ) - wells.append(well) - - plate = Plate( - time_unit=second, - temperatures=temperatures, - temperature_unit=C, - wells=wells, - ) - - return plate - - -def map_to_plate( - data_matrix: np.ndarray, - times: list[float], - temperatures: list[float], - ph: float | None, - wavelength: float, -): - """ - Maps a data matrix to a Plate object. - - Args: - data_matrix (np.ndarray): The data matrix containing the measurements. - times (list[float]): The list of time values. - temperatures (list[float]): The list of temperature values. - ph (float | None): The pH value or None if not applicable. - wavelength (float): The wavelength value. - - Returns: - Plate: The Plate object containing the mapped data. - """ - - wells = [] - for column_id in range(data_matrix.shape[0]): - for row_id in range(data_matrix.shape[1]): - well = Well( - id=xy_to_id(column_id, row_id), - ph=ph, - x_pos=column_id, - y_pos=row_id, - ) - assert ( - len(times) == data_matrix[column_id, row_id].size - ), "Time and data length mismatch." - well.add_to_measurements( - wavelength=wavelength, - absorption=data_matrix[column_id, row_id].tolist(), - time=times, - time_unit=second, - ) - wells.append(well) - - # Create plate - plate = Plate( - time_unit=second, - temperatures=temperatures, - temperature_unit=C, - wells=wells, - ) - - return plate - - -def identify_blocks(lines): - """Identify blocks in the file.""" - blocks = [] - current_block = [] - time_pattern = re.compile( - r"^\d{1,2}:\d{2}(?::\d{2})?(?:\s|$)" - ) # Pattern to match h:mm or hh:mm:ss format - - # get line number is which time pattern is found - section_starts = np.array( - [i for i, line in enumerate(lines) if time_pattern.match(line)] - ) - - # check if the distance between the time pattern is consistent - if not np.all(np.diff(section_starts) == section_starts[1] - section_starts[0]): - logger.debug("Inconsistent time pattern found in file.") - # get the number of lines in each section - section_length = section_starts[1] - section_starts[0] - section_ends = section_starts + section_length - 1 - - # build slices for each section for extraction into list of blocks - slices = [slice(start, end) for start, end in zip(section_starts, section_ends)] - - for s in slices: - current_block = [line for line in lines[s]] - if len(current_block[0].split(",")) == 2: - continue - blocks.append(current_block) - - return blocks - - -def is_increasing_by_one(lst): - return all(lst[i] + 1 == lst[i + 1] for i in range(len(lst) - 1)) - - -def sanitize_blocks(blocks): - times, temperatures = [], [] - - for block_id, block in enumerate(blocks): - for line_id, line in enumerate(block): - if line_id == 0: - time, temp, line = line.split("\t", 2) - times.append(time_to_min_float(time)) - temperatures.append(float(temp.replace(",", "."))) - - line = line.strip() - line = line.replace(",", ".") - line = [float(entry) for entry in line.split("\t") if entry != ""] - - blocks[block_id][line_id] = line - - return times, temperatures, blocks - - -def time_to_min_float(time_str: str): - time_parts = time_str.split(":") - # Calculate time since zero in minutes - if len(time_parts) == 3: # hh:mm:ss format - h, m, s = time_parts - return float(h) * 60 + float(m) + float(s) / 60 - elif len(time_parts) == 2: # h:mm format - m, s = time_parts - return float(m) + float(s) / 60 - else: - raise ValueError(f"Unexpected time format: '{time_str}'") - - -def open_file(path: str, encoding: str): - with open(path, "r", encoding=encoding) as file: - lines = file.readlines() - return lines - - -if __name__ == "__main__": - path = ( - "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/spectra_max_190.txt" - ) - - plate = read_spectra_max_190(path, ph=6.9) - - print(plate.wells[0].measurements[0].absorption) diff --git a/temp_handler/readers/spectramax_parser.py b/temp_handler/readers/spectramax_parser.py deleted file mode 100644 index 062979d..0000000 --- a/temp_handler/readers/spectramax_parser.py +++ /dev/null @@ -1,127 +0,0 @@ -from __future__ import annotations - -import re -from datetime import datetime - -import numpy as np -import pandas as pd -from pyenzyme.model import UnitDefinition - -from mtphandler.model import Plate, Well -from mtphandler.units import C, nm - - -def read_spectramax( - path: str, - time_unit: UnitDefinition, - ph: float | None = None, -): - df = pd.read_csv( - path, - sep="delimiter", - encoding="utf-16", - engine="python", - skiprows=15, - ) - - df = df.map(lambda x: x.split("\t")) - - # Get date of measurement - last_saved = re.findall( - r"\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} [APMapm]{2}", df.iloc[-1, 0][0] - )[0] - created = datetime.strptime(last_saved, "%Y/%m/%d %I:%M:%S %p").isoformat() - - wavelengths = df.iloc[0, 0][-6] - wavelengths = [ - float(wavelength) for wavelength in wavelengths.split(" ") if wavelength != "" - ] - - # data - datas = df.iloc[2:-9]["~End"].tolist() - time_pattern = r"\d{2}:\d{2}:\d{2}" - times = [] - temperatures = [] - block_start_ids = [] - for data in datas: - if re.match(time_pattern, data[0]): - times.append(to_time(data[0], time_unit)) - temperatures.append(float(data[1])) - block_start_ids.append(datas.index(data)) - - time_blocks = [] - for index, start_id in enumerate(block_start_ids): - try: - time_block = datas[start_id : block_start_ids[index + 1]] - except IndexError: - time_block = datas[start_id:] - - time_block[0] = time_block[0][2:] # remove time and temperature from block - for row_id, row in enumerate(time_block): - if "" not in row: - continue - - wavelength_entry = [] - wavelength_entries = [] - for item in row: - if item == "": - if wavelength_entry: - wavelength_entries.append(wavelength_entry) - wavelength_entry = [] # reset - - else: - wavelength_entry.append(item) - - if item != "": - wavelength_entries.append(wavelength_entry) - - time_block[row_id] = wavelength_entries - - time_blocks.append(time_block) - - # Swap dimensions: rows, columns, wavelengths, timecourse - data = np.array(time_blocks).astype(float) - data = data.swapaxes(0, 3) - data = data.swapaxes(0, 1) - - # create wells - wells = [] - for row_id, row in enumerate(data): - for column_id, column in enumerate(row): - well = Well( - id=_coordinates_to_id(column_id, row_id), - ph=ph, - x_pos=column_id, - y_pos=row_id, - ) - for wavelength_id, wavelength in enumerate(column): - well.add_to_measurements( - wavelength=wavelengths[wavelength_id], - wavelength_unit=nm, - absorption=wavelength.tolist(), - time=times, - ) - wells.append(well) - - # Create plate - plate = Plate( - date_measured=created, - time_unit=time_unit, - temperatures=temperatures, - temperature_unit=C, - wells=wells, - ) - - return plate - - -def _coordinates_to_id(x: int, y: int) -> str: - return f"{chr(y + 65)}{x+1}" - - -if __name__ == "__main__": - from mtphandler.units import s - - path = "tests/data/ABTS_EnzymeML_340nm_420nm_2.5x_pH3_25deg.txt" - - print(read_spectramax(path, ph=6.9, time_unit=s)) diff --git a/temp_handler/readers/tekan_magellan.py b/temp_handler/readers/tekan_magellan.py deleted file mode 100644 index 062b3f0..0000000 --- a/temp_handler/readers/tekan_magellan.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -import math -import re -from collections import defaultdict -from datetime import datetime - -import pandas as pd - -from mtphandler.model import Plate -from mtphandler.readers.utils import WELL_ID_PATTERN, id_to_xy -from mtphandler.units import C, second - - -def read_tekan_magellan( - path: str, - wavelength: float, - ph: float | None, -) -> Plate: - df = pd.read_excel(path, header=None) - - # Define the format of the input datetime string - date_format = "%A, %B %d, %Y: %H:%M:%S" - - data = defaultdict(list) - temperatures = [] - times = [] - dates = [] - for row in df.iterrows(): - timecourser_data = row[1].values[0] - if not isinstance(timecourser_data, str): - break - else: - date_str, time_str, temperature_str = timecourser_data.split("/") - temp_value, _ = temperature_str.strip().split("°") - temperatures.append(float(temp_value)) - time, time_unit = time_str[1:-1].split(" ") - - times.append(float(time)) - dates.append(datetime.strptime(date_str.strip(), date_format)) - - created = dates[0] - print(times) - - df = df.dropna(how="all") - - for row in df.iterrows(): - first_cell = str(row[1].values[0]) - if not re.findall(WELL_ID_PATTERN, first_cell): - continue - - key = None - for element in row[1].values: - if isinstance(element, str): - key = element - elif math.isnan(element): - continue - else: - data[key].append(element) - - plate = Plate( - date_measured=str(created), - temperature_unit=C, - temperatures=temperatures, - time_unit=second, - times=times, - ) - - for well_id, abso_list in data.items(): - if well_id is not None: - x_pos, y_pos = id_to_xy(well_id) - else: - raise ValueError("Well ID not found in the data.") - - well = plate.add_to_wells( - ph=ph, - id=well_id, - x_pos=x_pos, - y_pos=y_pos, - ) - well.add_to_measurements( - wavelength=wavelength, - absorption=abso_list, - time_unit=second, - time=times, - ) - - return plate - - -if __name__ == "__main__": - path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/magellan.xlsx" - from devtools import pprint - - plate = read_tekan_magellan(path, wavelength=600, ph=7) - pprint(plate.wells[0]) diff --git a/temp_handler/readers/tekan_spark.py b/temp_handler/readers/tekan_spark.py deleted file mode 100644 index af73fb7..0000000 --- a/temp_handler/readers/tekan_spark.py +++ /dev/null @@ -1,82 +0,0 @@ -from __future__ import annotations - -from datetime import datetime - -import pandas as pd -from typing_extensions import Optional - -from mtphandler.model import Plate -from mtphandler.readers.utils import id_to_xy -from mtphandler.units import C, second - - -def read_tekan_spark( - path: str, - ph: Optional[float], -) -> Plate: - df = pd.read_excel(path) - - if not df.iloc[1, 0] == "Device: Spark": - raise ValueError("The file does not seem to be a Tekan Spark file.") - - cycle_no_row_index = df[df.iloc[:, 0].str.contains("Cycle Nr.", na=False)].index[0] - meta_df = df.iloc[:cycle_no_row_index, :] - data_df = df.iloc[cycle_no_row_index:, :].reset_index(drop=True) - - meta_df = ( - meta_df.dropna(how="all") - .dropna(axis=1, how="all") - .set_index(meta_df.columns[0]) - ) - time_measured = meta_df.loc["Start Time"].dropna(axis=1, how="all").values[0][0] - time_measured = datetime.strptime(time_measured, "%Y-%m-%d %H:%M:%S") - - wavelength = meta_df.loc["Measurement wavelength"].dropna().iloc[0] - - data_df = data_df.set_index(data_df.columns[0]) - column_names = data_df.iloc[0, :].tolist() - data_df.columns = column_names - data_df = data_df[1:].reset_index(drop=True).dropna(axis=1, how="all") - first_nan_index = data_df.isna().any(axis=1).idxmax() - data_df = data_df.iloc[:first_nan_index, :] - - time_series = data_df.pop("Time [s]") / 60 - temp_series = data_df.pop("Temp. [°C]") - - plate = Plate( - date_measured=str(time_measured), - temperatures=temp_series.values.tolist(), - temperature_unit=C, - time_unit=second, - times=time_series.values.tolist(), - ) - - for column in data_df.columns: - x, y = id_to_xy(column) - well = plate.add_to_wells( - id=column, - x_pos=x, - y_pos=y, - ph=ph, - ) - - well.add_to_measurements( - wavelength=wavelength, - wavelength_unit="nm", - absorption=data_df[column].values.tolist(), - time=time_series.values.tolist(), - time_unit=second, - ) - - return plate - - -if __name__ == "__main__": - from devtools import pprint - - path = "/Users/max/Documents/GitHub/MTPHandler/docs/examples/data/tekan_spark.xlsx" - from mtphandler.model import Plate - - p = read_tekan_spark(path, 7.4) - - pprint(p.wells[0]) diff --git a/temp_handler/readers/utils.py b/temp_handler/readers/utils.py deleted file mode 100644 index 889e7ac..0000000 --- a/temp_handler/readers/utils.py +++ /dev/null @@ -1,28 +0,0 @@ -from mtphandler.model import Well - -# regex patterns -WELL_ID_PATTERN = r"[A-H][0-9]{1,2}" - - -def xy_to_id(x: int, y: int) -> str: - """Well coordinates to well ID""" - return f"{chr(y + 65)}{x+1}" - - -def id_to_xy(well_id: str) -> tuple[int, int]: - """Well ID to well coordinates""" - return int(well_id[1:]) - 1, ord(well_id[0].upper()) - 65 - - def get_well(self, id: str) -> Well: - for well in self.plate.wells: - if well.id.lower() == id.lower(): - return well - - raise ValueError(f"Well {id} not found") - - -if __name__ == "__main__": - print(xy_to_id(0, 0)) - print(id_to_xy("A1")) - print(id_to_xy("H12")) - print(id_to_xy("C3")) diff --git a/temp_handler/tools.py b/temp_handler/tools.py deleted file mode 100644 index 75cea22..0000000 --- a/temp_handler/tools.py +++ /dev/null @@ -1,158 +0,0 @@ -import importlib.resources as pkg_resources - -import httpx -import toml - -from mtphandler.model import InitCondition, PhotometricMeasurement, Plate, Well - - -def read_static_file(path, filename: str): - """Reads a static file from the specified library path. - - Args: - path (Module): Import path of the library. - filename (str): The name of the file to read. - - Returns: - dict: The contents of the file as a dictionary. - """ - - source = pkg_resources.files(path).joinpath(filename) - with pkg_resources.as_file(source) as file: - return toml.load(file) - - -def get_measurement(well: Well, wavelength: float) -> PhotometricMeasurement: - """ - Get the measurement object for a given well and wavelength. - - Args: - well (Well): The well object. - wavelength (float): The wavelength of the measurement. - - Returns: - PhotometricMeasurement: The measurement object. - - Raises: - ValueError: If no measurement is found for the given well and wavelength. - """ - - for measurement in well.measurements: - if measurement.wavelength == wavelength: - return measurement - - raise ValueError( - f"No measurement found for well {well.id} at wavelength {wavelength}." - ) - - -def well_contains_species( - well: Well, species_id: str, conc_above_zero: bool = False -) -> bool: - """Check if a well contains a species with the given ID, and optionally, if its concentration is above zero. - - Args: - well (Well): The well to check. - species_id (str): The ID of the species. - conc_above_zero (bool): If True, checks if the species' concentration is above zero. - - Returns: - bool: True if the species is present in the well (and has a concentration above zero if conc_above_zero is True), otherwise False. - """ - for condition in well.init_conditions: - if condition.species_id == species_id: - # If conc_above_zero is True, check if concentration is > 0 - if conc_above_zero: - return condition.init_conc > 0 - # Otherwise, just return True if species is present - return True - - return False - - -def handle_blank_status( - well: Well, - species_id: str, - init_conc: float, - contributes_to_signal: bool | None, -): - """Add blank status to the measurements of a well. - If the concentration is 0, the species does not contribute to the signal. - If the concentration is not 0, the species contributes to the signal unless - overwriten by the `contributes_to_signal` argument. - - Args: - well (Well): Well for which to add blank status. - species_id (str): ID of the species. - init_conc (float): Initial concentration of the species. - contributes_to_signal (bool | None): Whether the species contributes to the signal. - """ - if contributes_to_signal is None: - if init_conc == 0: - contributes = False - else: - contributes = True - else: - contributes = contributes_to_signal - - for measurement in well.measurements: - measurement.add_to_blank_states( - species_id=species_id, - contributes_to_signal=contributes, - ) - - -def measurement_is_blanked_for( - measurement: PhotometricMeasurement, target_id: str -) -> bool: - """Checks if a the measurement is blanked for a given species target.""" - - target_contributes = None - others_contribute = [] - - for state in measurement.blank_states: - if state.species_id == target_id: - target_contributes = state.contributes_to_signal - - else: - others_contribute.append(state.contributes_to_signal) - - if target_contributes is None: - raise ValueError(f"Species {target_id} not found in blank states") - - return target_contributes and not any(others_contribute) - - -def get_species_condition(well: Well, species_id: str) -> InitCondition: - for condition in well.init_conditions: - if condition.species_id == species_id: - return condition - - raise ValueError(f"Species {species_id} not found in well {well.id}") - - -def pubchem_request_molecule_name(pubchem_cid: int) -> str: - """Retrieves molecule name from PubChem database based on CID.""" - - url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/{pubchem_cid}/property/Title/JSON" - response = httpx.get(url) - - if response.status_code == 200: - res_dict = response.json() - try: - molecule_name = res_dict["PropertyTable"]["Properties"][0]["Title"] - return molecule_name - except (KeyError, IndexError): - raise ValueError( - "Unexpected response structure while retrieving molecule name from PubChem" - ) - else: - raise ValueError("Failed to retrieve molecule name from PubChem") - - -def get_well(plate: Plate, well_id: str) -> Well: - for well in plate.wells: - if well.id.lower() == well_id.lower(): - return well - - raise ValueError(f"Well {well_id} not found") diff --git a/temp_handler/units/__init__.py b/temp_handler/units/__init__.py deleted file mode 100644 index 2590388..0000000 --- a/temp_handler/units/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .predefined import * # noqa: F403 diff --git a/temp_handler/units/ontomaps.toml b/temp_handler/units/ontomaps.toml deleted file mode 100644 index 12dffaf..0000000 --- a/temp_handler/units/ontomaps.toml +++ /dev/null @@ -1,45 +0,0 @@ -[substance] - -mol = "OBO:UO_0000013" -mmol = "OBO:UO_0000040" -umol = "OBO:UO_0000039" -nmol = "OBO:UO_0000041" - -[molarity] - -M = "OBO:UO_0000062" -mM = "OBO:UO_0000063" -uM = "OBO:UO_0000064" -nM = "OBO:UO_0000025" - -[time] - -s = "OBO:UO_0000010" -min = "OBO:UO_0000031" -hour = "OBO:UO_0000032" -day = "OBO:UO_0000033" - -[temperature] - -K = "OBO:UO_0000012" -C = "OBO:UO_0000027" - -[mass] - -kg = "OBO:UO_0000009" -g = "OBO:UO_0000021" -mg = "OBO:UO_0000022" -ug = "OBO:UO_0000023" -ng = "OBO:UO_0000024" - -[volume] - -litre = "OBO:UO_0000099" -ml = "OBO:UO_0000098" -ul = "OBO:UO_0000101" -nl = "OBO:UO_0000102" - -[length] - -metre = "OBO:UO_0000008" -nm = "OBO:UO_0000018" diff --git a/temp_handler/units/predefined.py b/temp_handler/units/predefined.py deleted file mode 100644 index 3da7786..0000000 --- a/temp_handler/units/predefined.py +++ /dev/null @@ -1,166 +0,0 @@ -from mtphandler.model import UnitType -from mtphandler.tools import read_static_file - -from .units import BaseUnit, Prefix, UnitDefinition - -BaseUnit.model_rebuild() -UnitDefinition.model_rebuild() - -ONTOMAPS = read_static_file("mtphandler.units", "ontomaps.toml") - - -class Unit: - @staticmethod - def mol(): - return BaseUnit(kind=UnitType.MOLE, exponent=1, scale=1) - - @staticmethod - def litre(): - return BaseUnit(kind=UnitType.LITRE, exponent=1, scale=1) - - @staticmethod - def second(): - return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1) - - @staticmethod - def minute(): - return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=60) - - @staticmethod - def hour(): - hour = 60 * 60 - return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=hour) - - @staticmethod - def day(): - day = 60**2 * 24 - return BaseUnit(kind=UnitType.SECOND, exponent=1, scale=1, multiplier=day) - - @staticmethod - def gram(): - return BaseUnit(kind=UnitType.GRAM, exponent=1, scale=1) - - @staticmethod - def kelvin(): - return BaseUnit(kind=UnitType.KELVIN, exponent=1, scale=1) - - @staticmethod - def celsius(): - return BaseUnit(kind=UnitType.CELSIUS, exponent=1, scale=1) - - @staticmethod - def dimensionless(): - return BaseUnit(kind=UnitType.DIMENSIONLESS, exponent=1, scale=1) - - @staticmethod - def metre(): - return BaseUnit(kind=UnitType.METRE, exponent=1, scale=1) - - -###### Single Prefixes ###### - -k = Prefix.k -m = Prefix.m -u = Prefix.u -n = Prefix.n - -##### Predefined units ##### - -# Dimensionless -dimensionless = UnitDefinition(base_units=[Unit.dimensionless()]) - -# Molarity -M = Unit.mol() / Unit.litre() -mM = m * Unit.mol() / Unit.litre() -uM = u * Unit.mol() / Unit.litre() -nM = n * Unit.mol() / Unit.litre() - -## Ontology -M.ld_id = ONTOMAPS["molarity"]["M"] -mM.ld_id = ONTOMAPS["molarity"]["mM"] -uM.ld_id = ONTOMAPS["molarity"]["uM"] -nM.ld_id = ONTOMAPS["molarity"]["nM"] - -# Substance -mol = UnitDefinition(base_units=[Unit.mol()])._get_name() -mmol = UnitDefinition(base_units=[m * Unit.mol()])._get_name() -umol = UnitDefinition(base_units=[u * Unit.mol()])._get_name() -nmol = UnitDefinition(base_units=[n * Unit.mol()])._get_name() - -## Ontology -mol.ld_id = ONTOMAPS["substance"]["mol"] -mmol.ld_id = ONTOMAPS["substance"]["mmol"] -umol.ld_id = ONTOMAPS["substance"]["umol"] -nmol.ld_id = ONTOMAPS["substance"]["nmol"] - -# Mass -gram = UnitDefinition(base_units=[Unit.gram()])._get_name() -g = UnitDefinition(base_units=[Unit.gram()])._get_name() -mg = UnitDefinition(base_units=[m * Unit.gram()])._get_name() -ug = UnitDefinition(base_units=[u * Unit.gram()])._get_name() -ng = UnitDefinition(base_units=[n * Unit.gram()])._get_name() -kg = UnitDefinition(base_units=[k * Unit.gram()])._get_name() - -## Ontology -g.ld_id = ONTOMAPS["mass"]["g"] -gram.ld_id = ONTOMAPS["mass"]["g"] -mg.ld_id = ONTOMAPS["mass"]["mg"] -ug.ld_id = ONTOMAPS["mass"]["ug"] -ng.ld_id = ONTOMAPS["mass"]["ng"] - -# Volume -litre = UnitDefinition(base_units=[Unit.litre()])._get_name() -l = UnitDefinition(base_units=[Unit.litre()])._get_name() # noqa: E741 -ml = UnitDefinition(base_units=[m * Unit.litre()])._get_name() -ul = UnitDefinition(base_units=[u * Unit.litre()])._get_name() -nl = UnitDefinition(base_units=[n * Unit.litre()])._get_name() - -## Ontology - -l.ld_id = ONTOMAPS["volume"]["litre"] -litre.ld_id = ONTOMAPS["volume"]["litre"] -ml.ld_id = ONTOMAPS["volume"]["ml"] -ul.ld_id = ONTOMAPS["volume"]["ul"] -nl.ld_id = ONTOMAPS["volume"]["nl"] - -# Time -second = UnitDefinition(base_units=[Unit.second()])._get_name() -s = UnitDefinition(base_units=[Unit.second()])._get_name() -second = UnitDefinition(base_units=[Unit.minute()])._get_name() -minute = UnitDefinition(base_units=[Unit.minute()])._get_name() -hour = UnitDefinition(base_units=[Unit.hour()])._get_name() -h = UnitDefinition(base_units=[Unit.hour()])._get_name() -day = UnitDefinition(base_units=[Unit.day()])._get_name() -d = UnitDefinition(base_units=[Unit.day()])._get_name() - -## Ontology -s.ld_id = ONTOMAPS["time"]["s"] -second.ld_id = ONTOMAPS["time"]["s"] -second.ld_id = ONTOMAPS["time"]["min"] -minute.ld_id = ONTOMAPS["time"]["min"] -hour.ld_id = ONTOMAPS["time"]["hour"] -h.ld_id = ONTOMAPS["time"]["hour"] -day.ld_id = ONTOMAPS["time"]["day"] -d.ld_id = ONTOMAPS["time"]["day"] - -# Temperature - -kelvin = UnitDefinition(base_units=[Unit.kelvin()])._get_name() -K = UnitDefinition(base_units=[Unit.kelvin()])._get_name() -celsius = UnitDefinition(base_units=[Unit.celsius()])._get_name() -C = UnitDefinition(base_units=[Unit.celsius()])._get_name() - -## Ontology - -K.ld_id = ONTOMAPS["temperature"]["K"] -kelvin.ld_id = ONTOMAPS["temperature"]["K"] - -# Length - -metre = UnitDefinition(base_units=[Unit.metre()])._get_name() -nm = UnitDefinition(base_units=[n * Unit.metre()])._get_name() - -## Ontology - -metre.ld_id = ONTOMAPS["length"]["metre"] -nm.ld_id = ONTOMAPS["length"]["nm"] diff --git a/temp_handler/units/units.py b/temp_handler/units/units.py deleted file mode 100644 index f7f0541..0000000 --- a/temp_handler/units/units.py +++ /dev/null @@ -1,374 +0,0 @@ -from enum import Enum -from functools import partial - -from pydantic import model_validator - -from mtphandler.model import ( - BaseUnit as _BaseUnit, -) -from mtphandler.model import ( - UnitDefinition as _UnitDefinition, -) -from mtphandler.model import ( - UnitType, -) - -UNIT_OF_MEAS_TYPE = "OBO:UO_0000000" -NAME_MAPS = { - UnitType.LITRE.value: "l", - UnitType.MOLE.value: "mol", - UnitType.SECOND.value: "s", - UnitType.GRAM.value: "g", - UnitType.KELVIN.value: "K", -} - - -def _is_unit(other: object) -> bool: - """Check if the given object is an instance of 'unit'. - - Args: - other (object): The object to check. - - Returns: - bool: True if the object is an instance of 'unit', False otherwise. - """ - return other.__class__.__name__ == "unit" - - -def set_scale(unit: _BaseUnit, scale: int) -> _BaseUnit: - """Set the scale of a unit. - - Args: - unit (_BaseUnit): The unit to set the scale for. - scale (int): The scale value to set. - - Returns: - _BaseUnit: The unit with the updated scale. - """ - unit.scale = scale - return unit - - -class Prefix(Enum): - """Enumeration for unit prefixes with corresponding scales.""" - - k = partial(set_scale, scale=3) - m = partial(set_scale, scale=-3) - u = partial(set_scale, scale=-6) - n = partial(set_scale, scale=-9) - - def __mul__(self, other: _BaseUnit) -> _BaseUnit: - """Multiply prefix with a BaseUnit. - - When multiplying a prefix with a BaseUnit, the scale of the BaseUnit is updated. - - Args: - other (_BaseUnit): The other operand, which should be a BaseUnit. - - Returns: - _BaseUnit: The resulting unit with the prefix applied. - - Raises: - TypeError: If the other operand is not a BaseUnit. - """ - if isinstance(other, _BaseUnit): - return self.value(other) - - raise TypeError( - f"unsupported operand type(s) for *: 'Prefix' and '{type(other)}'" - ) - - -class UnitDefinition(_UnitDefinition): - """Extended UnitDefinition class with additional operations.""" - - @model_validator(mode="after") - def set_name_and_type(self): - """Initialize the UnitDefinition object.""" - self._get_name() - self.ld_type = [UNIT_OF_MEAS_TYPE] - return self - - def __rtruediv__(self, other: object) -> "UnitDefinition": - """Right division operation to handle unit division. - - If the other operand is a UnitDefinition, the base units are appended to the current unit. - If the other operand is a BaseUnit, the base unit is appended to the current unit. - - Args: - other (object): The numerator in the division. - - Returns: - UnitDefinition: The resulting unit after division. - - Raises: - TypeError: If the other operand type is unsupported. - """ - for base in self.base_units: - base.exponent = -abs(base.exponent) - - if isinstance(other, UnitDefinition): - self.base_units.extend(other.base_units) - elif isinstance(other, _BaseUnit): - self.base_units.append(other) - - self._get_name() - - return self - - def __truediv__(self, other: object) -> "UnitDefinition": - """Division operation to handle unit division. - - If the other operand is a UnitDefinition, the base units are appended to the current unit. - If the other operand is a BaseUnit, the base unit is appended to the current unit. - - Args: - other (object): The numerator in the - - Returns: - UnitDefinition: The resulting unit after division. - - Raises: - TypeError: If the other operand type is unsupported. - - """ - - if isinstance(other, UnitDefinition): - for base in other.base_units: - base.exponent = -abs(base.exponent) - self.base_units.extend(other.base_units) - elif isinstance(other, _BaseUnit): - other.exponent = -abs(other.exponent) - self.base_units.append(other) - - self._get_name() - - return self - - def __mul__(self, other: object) -> "UnitDefinition": - """Multiplication operation to handle unit multiplication. - - Args: - other (object): The multiplier in the multiplication. - - Returns: - UnitDefinition: The resulting unit after multiplication. - - Raises: - TypeError: If the other operand type is unsupported. - """ - if isinstance(other, (int, float)): - for base in self.base_units: - if base.multiplier: - base.multiplier *= other - else: - base.multiplier = other - - self._get_name() - - return self - - raise TypeError( - f"unsupported operand type(s) for *: 'UnitDefinition' and '{type(other)}'" - ) - - def _get_name(self): - """Get the name of the unit based on the base units.""" - self.name = str(self) - - return self - - def __str__(self) -> str: - """String representation of the UnitDefinition. - - Returns: - str: The string representation of the unit. - - Raises: - ValueError: If no base units are found. - """ - - numerator = [ - self._map_prefix(base.scale) - + self._map_name(base.kind) - + self._exponent(base.exponent) - for base in self.base_units - if base.exponent > 0 - ] - denominator = [ - self._map_prefix(base.scale) - + self._map_name(base.kind) - + self._exponent(base.exponent) - for base in self.base_units - if base.exponent < 0 - ] - - numerator_str = " ".join(numerator) if numerator else "" - denominator_str = " ".join(denominator) if denominator else "" - - if numerator_str and denominator_str: - return f"{numerator_str} / {denominator_str}" - elif numerator_str: - return numerator_str - elif denominator_str: - return f"1 / {denominator_str}" - - raise ValueError("No base units found") - - @staticmethod - def _map_prefix(scale: int | None) -> str: - """Map a scale to its corresponding prefix. - - Args: - scale (int): The scale value to map. - - Returns: - str: The corresponding prefix. - """ - - if scale is None: - return "" - - mapping = { - 3: "k", - -3: "m", - -6: "u", - -9: "n", - } - - return mapping.get(scale, "") - - @staticmethod - def _map_name(kind: str) -> str: - if isinstance(kind, str): # TODO: find issue of incorrect enum usage - return NAME_MAPS.get(kind, kind.capitalize()) - return NAME_MAPS.get(kind, kind.name.capitalize()) - - @staticmethod - def _exponent(exponent: int) -> str: - """Format the exponent for display. - - Args: - exponent (int): The exponent value to format. - - Returns: - str: The formatted exponent string. - """ - if abs(exponent) == 1: - return "" - - return f"^{abs(exponent)}" - - -class BaseUnit(_BaseUnit): - """Extended BaseUnit class with additional operations.""" - - def __rtruediv__(self, other: object) -> "UnitDefinition | BaseUnit": - """Right division operation to handle unit division. - - Args: - other (object): The numerator in the division. - - Returns: - UnitDefinition: The resulting unit after division. - - Raises: - TypeError: If the other operand type is unsupported. - """ - if isinstance(other, UnitDefinition): - self.exponent = -self.exponent - other.base_units.append(self) - - other._get_name() - - return other - elif isinstance(other, (int, float)): - self.exponent = -self.exponent - return self - - raise TypeError( - f"unsupported operand type(s) for /: 'BaseUnit' and '{type(other)}'" - ) - - def __truediv__(self, other: object) -> "UnitDefinition": - """Division operation to handle unit division. - - Args: - other (object): The denominator in the division. - - Returns: - UnitDefinition: The resulting unit after division. - - Raises: - TypeError: If the other operand type is unsupported. - """ - if isinstance(other, BaseUnit): - other.exponent = -other.exponent - return UnitDefinition(base_units=[self, other])._get_name() - elif isinstance(other, UnitDefinition): - for base_unit in other.base_units: - base_unit.exponent = -base_unit.exponent - other.base_units.append(self) - other._get_name() - - return other - - raise TypeError( - f"unsupported operand type(s) for /: 'BaseUnit' and '{type(other)}'" - ) - - def __pow__(self, other: int) -> "_BaseUnit": - """Exponentiation operation to handle unit exponentiation. - - Args: - other (int): The exponent value. - - Returns: - _BaseUnit: The resulting unit after exponentiation. - - Raises: - TypeError: If the exponent is not an integer. - """ - if isinstance(other, int): - self.exponent = other - return self - - raise TypeError( - f"unsupported operand type(s) for **: 'BaseUnit' and '{type(other)}'" - ) - - def __mul__(self, other: object) -> object: - """Multiplication operation to handle unit multiplication. - - Args: - other (object): The multiplier in the multiplication. - - Returns: - object: The resulting unit after multiplication. - - Raises: - TypeError: If the other operand type is unsupported. - """ - if isinstance(other, BaseUnit): - if self.exponent < 0 or other.exponent < 0: - self.exponent = abs(self.exponent) - other.exponent = abs(other.exponent) - - return UnitDefinition(base_units=[self, other])._get_name() - elif isinstance(other, UnitDefinition): - other.base_units.append(self) - other._get_name() - - return other - elif isinstance(other, Prefix): - return other * self - elif isinstance(other, (int, float)): - if self.multiplier: - self.multiplier *= other - else: - self.multiplier = other - return self - - raise TypeError( - f"unsupported operand type(s) for *: 'BaseUnit' and '{type(other)}'" - ) diff --git a/temp_handler/visualize.py b/temp_handler/visualize.py deleted file mode 100644 index 835a88d..0000000 --- a/temp_handler/visualize.py +++ /dev/null @@ -1,110 +0,0 @@ -import itertools as it - -import plotly.express as px -import plotly.graph_objects as go -from plotly.subplots import make_subplots - -from mtphandler.model import Plate - - -def visualize_plate( - plate: Plate, - name: str, - zoom: bool = False, - wavelengths: list[float] = [], - static: bool = False, - darkmode: bool = False, -): - """Visualize a plate with all its wells and measurements.""" - - if darkmode: - theme = "plotly_dark" - plot_bgcolor = "#1e1e1e" # Dark background color for subplots - paper_bgcolor = "#1e1e1e" - gridcolor = plot_bgcolor # Grid color for dark mode - font_color = "#e5e5e5" # Lighter text for dark mode - else: - theme = "plotly_white" - plot_bgcolor = "white" # Light background for subplots - paper_bgcolor = "white" - gridcolor = plot_bgcolor # Light grid color for white mode - font_color = "#000000" - - if zoom: - shared_yaxes = False - else: - shared_yaxes = True - - if not wavelengths: - wavelengths = [plate.wells[0].measurements[0].wavelength] - - if not isinstance(wavelengths, list): - wavelengths = [wavelengths] - - fig = make_subplots( - rows=8, - cols=12, - shared_xaxes=True, - subplot_titles=_generate_possible_well_ids(), - shared_yaxes=shared_yaxes, - ) - colors = px.colors.qualitative.Plotly - - for well in plate.wells: - for measurement, color in zip(well.measurements, colors): - if measurement.wavelength not in wavelengths: - continue - - fig.add_trace( - go.Scatter( - x=measurement.time, - y=measurement.absorption, - name=f"{measurement.wavelength} nm", - mode="lines", - showlegend=False, - line=dict(color=color), - hovertemplate="%{y:.2f}
", - ), - col=well.x_pos + 1, - row=well.y_pos + 1, - ) - - # Update x and y axes for dark mode or light mode - fig.update_xaxes( - showticklabels=False, gridcolor=gridcolor, zeroline=False, showline=False - ) - fig.update_yaxes( - showticklabels=False, gridcolor=gridcolor, zeroline=False, showline=False - ) - - # Update subplot backgrounds and layout - fig.update_layout( - plot_bgcolor=plot_bgcolor, - paper_bgcolor=paper_bgcolor, - font=dict(color=font_color), - hovermode="x", - title=dict( - text=name, - font=dict(color=font_color), - ), - margin=dict(l=20, r=20, t=100, b=20), - template=theme, - ) - - if static: - fig.show("png") - - fig.show() - - -def _generate_possible_well_ids() -> list[str]: - characters = "ABCDEFGH" - integers = range(1, 13) # 1 to 12 - - sub_char = characters[:8] - sub_int = integers[:12] - - # Generate combinations of characters and integers - combinations = ["".join(item) for item in it.product(sub_char, map(str, sub_int))] - - return combinations