diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index cab8b02..a848504 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -5,6 +5,7 @@ from __future__ import annotations import sys, logging, json +import sqlalchemy.exc from dateutil.parser import parse from pandas import DataFrame from pydantic import BaseModel @@ -239,14 +240,23 @@ class BaseClass(Base): allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute)] # and not isinstance(v.property, _RelationshipDeclared)] sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed} - logger.debug(f"Sanitized kwargs: {sanitized_kwargs}") + # logger.debug(f"Sanitized kwargs: {sanitized_kwargs}") instance = cls.query(**sanitized_kwargs) if not instance or isinstance(instance, list): instance = cls() new = True for k, v in sanitized_kwargs.items(): - logger.debug(f"QorC Setting {k} to {v}") - setattr(instance, k, v) + # logger.debug(f"QorC Setting {k} to {v}") + if k == "id": + continue + try: + setattr(instance, k, v) + except AttributeError as e: + from backend.validators.pydant import PydBaseClass + if issubclass(v.__class__, PydBaseClass): + setattr(instance, k, v.to_sql()) + else: + logger.error(f"Could not set {k} due to {e}") logger.info(f"Instance from query or create: {instance}, new: {new}") return instance, new @@ -315,9 +325,13 @@ class BaseClass(Base): try: self.__database_session__.add(self) self.__database_session__.commit() + # except sqlalchemy.exc.IntegrityError as i: + # logger.error(f"Integrity error saving {self} due to: {i}") + # logger.error(pformat(self.__dict__)) except Exception as e: - logger.critical(f"Problem saving object: {e}") + logger.critical(f"Problem saving {self} due to: {e}") logger.error(f"Error message: {type(e)}") + logger.error(pformat(self.__dict__)) self.__database_session__.rollback() report.add_result(Result(msg=e, status="Critical")) return report @@ -484,9 +498,9 @@ class BaseClass(Base): # logger.debug(f"Setting ColumnProperty to {value}") return super().__setattr__(key, value) case _RelationshipDeclared(): - logger.debug(f"{self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}") + # logger.debug(f"{self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}") if field_type.property.uselist: - logger.debug(f"Setting with uselist") + # logger.debug(f"Setting with uselist") existing = self.__getattribute__(key) # NOTE: This is causing problems with removal of items from lists. Have to overhaul it. if existing is not None: @@ -502,8 +516,11 @@ class BaseClass(Base): pass else: value = [value] - value = list(set(value)) - logger.debug(f"Final value for {key}: {value}") + try: + value = list(set(value)) + except TypeError: + pass + # logger.debug(f"Final value for {key}: {value}") return super().__setattr__(key, value) else: if isinstance(value, list): @@ -514,7 +531,7 @@ class BaseClass(Base): try: return super().__setattr__(key, value) except AttributeError: - logger.debug(f"Possible attempt to set relationship {key} to simple var type. {value}") + logger.warning(f"Possible attempt to set relationship {key} to simple var type. {value}") relationship_class = field_type.property.entity.entity value = relationship_class.query(name=value) try: @@ -555,7 +572,7 @@ class BaseClass(Base): output_date = datetime.combine(output_date, addition_time).strftime("%Y-%m-%d %H:%M:%S") return output_date - def details_dict(self): + def details_dict(self, **kwargs): relevant = {k: v for k, v in self.__class__.__dict__.items() if isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)} output = {} @@ -578,6 +595,16 @@ class BaseClass(Base): output[k.strip("_")] = value return output + def to_pydantic(self, **kwargs): + from backend.validators import pydant + pyd_model_name = f"Pyd{self.__class__.__name__}" + logger.debug(f"Looking for pydant model {pyd_model_name}") + try: + pyd = getattr(pydant, pyd_model_name) + except AttributeError: + raise AttributeError(f"Could not get pydantic class {pyd_model_name}") + return pyd(**self.details_dict()) + def show_details(self, obj): logger.debug("Show Details") from frontend.widgets.submission_details import SubmissionDetails diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 2782a73..11093b2 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -4,14 +4,16 @@ All kittype and reagent related models from __future__ import annotations import zipfile, logging, re from operator import itemgetter +from pprint import pformat import numpy as np from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.ext.associationproxy import association_proxy from datetime import date, datetime, timedelta + from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, timezone, \ - jinja_template_loading + jinja_template_loading, flatten_list from typing import List, Literal, Generator, Any, Tuple, TYPE_CHECKING from pandas import ExcelFile from pathlib import Path @@ -20,6 +22,7 @@ from io import BytesIO if TYPE_CHECKING: from backend.db.models.submissions import Run, ProcedureSampleAssociation + from backend.validators.pydant import PydSample, PydResults logger = logging.getLogger(f'procedure.{__name__}') @@ -127,6 +130,11 @@ class KitType(BaseClass): process = relationship("Process", back_populates="kittype", secondary=kittype_process) #: equipment process used by this kittype + proceduretypeequipmentroleassociation = relationship("ProcedureTypeEquipmentRoleAssociation", back_populates="kittype", + cascade="all, delete-orphan",) + + equipmentrole = association_proxy("proceduretypeequipmentroleassociation", "equipmentrole") + kittypereagentroleassociation = relationship( "KitTypeReagentRoleAssociation", back_populates="kittype", @@ -148,6 +156,8 @@ class KitType(BaseClass): creator=lambda ST: ProcedureTypeKitTypeAssociation( submissiontype=ST)) #: Association proxy to SubmissionTypeKitTypeAssociation + + @classproperty def aliases(cls) -> List[str]: """ @@ -174,17 +184,30 @@ class KitType(BaseClass): """ match proceduretype: case ProcedureType(): - relevant_associations = [item for item in self.kittypereagentroleassociation if + relevant_associations = [assoc for assoc in self.kittypereagentroleassociation if + assoc.proceduretype == proceduretype] + case str(): + relevant_associations = [assoc for assoc in self.kittypereagentroleassociation if + assoc.proceduretype.name == proceduretype] + case _: + relevant_associations = [assoc for assoc in self.kittypereagentroleassociation] + if required_only: + return (assoc.reagentrole for assoc in relevant_associations if assoc.required == 1) + else: + return (assoc.reagentrole for assoc in relevant_associations) + + def get_equipmentroles(self, proceduretype: str| ProcedureType | None = None) -> Generator[ReagentRole, None, None]: + match proceduretype: + case ProcedureType(): + relevant_associations = [item for item in self.proceduretypeequipmentroleassociation if item.proceduretype == proceduretype] case str(): - relevant_associations = [item for item in self.kittypereagentroleassociation if + relevant_associations = [item for item in self.proceduretypeequipmentroleassociation if item.proceduretype.name == proceduretype] case _: - relevant_associations = [item for item in self.kittypereagentroleassociation] - if required_only: - return (item.reagentrole for item in relevant_associations if item.required == 1) - else: - return (item.reagentrole for item in relevant_associations) + relevant_associations = [item for item in self.proceduretypeequipmentroleassociation] + return (assoc.equipmentrole for assoc in relevant_associations) + def construct_xl_map_for_use(self, proceduretype: str | SubmissionType) -> Tuple[dict | None, KitType]: """ @@ -1223,26 +1246,24 @@ class ProcedureType(BaseClass): output = super().details_dict(**kwargs) output['kittype'] = [item.details_dict() for item in output['kittype']] # output['process'] = [item.details_dict() for item in output['process']] - output['equipment'] = [item.details_dict() for item in output['equipment']] + output['equipment'] = [item.details_dict(proceduretype=self) for item in output['equipment']] return output - - - def construct_dummy_procedure(self, run: Run|None=None): + def construct_dummy_procedure(self, run: Run | None = None): from backend.validators.pydant import PydProcedure if run: samples = run.constuct_sample_dicts_for_proceduretype(proceduretype=self) + else: + samples = [] output = dict( proceduretype=self, - #name=dict(value=self.name, missing=True), - #possible_kits=[kittype.name for kittype in self.kittype], repeat=False, - # plate_map=plate_map - run=run + run=run, + sample=samples ) return PydProcedure(**output) - def construct_plate_map(self, sample_dicts: List[dict]) -> str: + def construct_plate_map(self, sample_dicts: List["PydSample"]) -> str: """ Constructs an html based plate map for procedure details. @@ -1256,23 +1277,31 @@ class ProcedureType(BaseClass): """ if self.plate_rows == 0 or self.plate_columns == 0: return "" - # plate_rows = range(1, self.plate_rows + 1) - # plate_columns = range(1, self.plate_columns + 1) - # total_wells = self.plate_columns * self.plate_rows + sample_dicts = self.pad_sample_dicts(sample_dicts=sample_dicts) + logger.debug(f"Sample dicts: {pformat(sample_dicts)}") vw = round((-0.07 * len(sample_dicts)) + 12.2, 1) - # sample_dicts = run.constuct_sample_dicts_for_proceduretype(proceduretype=self) - # output_samples = [next((item for item in sample_dicts if item['row'] == row and item['column'] == column), - # dict(sample_id="", row=row, column=column, background_color="#ffffff")) - # for row in plate_rows - # for column in plate_columns] - # logger.debug(f"Output samples:\n{pformat(output_samples)}") # NOTE: An overly complicated list comprehension create a list of sample locations # NOTE: next will return a blank cell if no value found for row/column env = jinja_template_loading() template = env.get_template("support/plate_map.html") - html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=sample_dicts, vw=vw) + html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=sample_dicts, + vw=vw) return html + "" + def pad_sample_dicts(self, sample_dicts: List["PydSample"]): + from backend.validators.pydant import PydSample + output = [] + logger.debug(f"Rows: {self.plate_rows}") + logger.debug(f"Columns: {self.plate_columns}") + for row, column in self.ranked_plate.values(): + sample = next((sample for sample in sample_dicts if sample.row == row and sample.column == column), + PydSample(**dict(sample_id="", row=row, column=column, enabled=False))) + sample.background_color = "#6ffe1d" if sample.enabled else "#ffffff" + output.append(sample) + logger.debug(f"Appending {sample} at row {row}, column {column}") + return output + + @property def ranked_plate(self): matrix = np.array([[0 for yyy in range(1, self.plate_rows + 1)] for xxx in range(1, self.plate_columns + 1)]) @@ -1318,7 +1347,7 @@ class Procedure(BaseClass): ) #: Relation to ProcedureReagentAssociation reagent = association_proxy("procedurereagentassociation", - "reagent", creator=lambda reg: ProcedureReagentAssociation( + "reagent", creator=lambda reg: ProcedureReagentAssociation( reagent=reg)) #: Association proxy to RunReagentAssociation.reagent procedureequipmentassociation = relationship( @@ -1365,7 +1394,7 @@ class Procedure(BaseClass): pass return cls.execute_query(query=query, limit=limit) - def to_dict(self, full_data: bool=False): + def to_dict(self, full_data: bool = False): output = dict() output['name'] = self.name return output @@ -1381,7 +1410,7 @@ class Procedure(BaseClass): names = ["Add Results", "Add Equipment", "Edit", "Add Comment", "Show Details", "Delete"] return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names} - def add_results(self, obj, resultstype_name:str): + def add_results(self, obj, resultstype_name: str): logger.debug(f"Add Results! {resultstype_name}") from ...managers import results results_class = getattr(results, resultstype_name) @@ -1395,30 +1424,35 @@ class Procedure(BaseClass): obj (_type_): parent widget """ logger.debug(f"Add equipment") - from frontend.widgets.equipment_usage import EquipmentUsage - dlg = EquipmentUsage(parent=obj, procedure=self) + from frontend.widgets.equipment_usage_2 import EquipmentUsage + dlg = EquipmentUsage(parent=obj, procedure=self.to_pydantic()) if dlg.exec(): - equipment = dlg.parse_form() - for equip in equipment: - logger.debug(f"Parsed equipment: {equip}") - _, assoc = equip.to_sql(procedure=self) - logger.debug(f"Got equipment association: {assoc} for {equip}") - try: - assoc.save() - except AttributeError as e: - logger.error(f"Couldn't save association with {equip} due to {e}") - if equip.tips: - for tips in equip.tips: - # logger.debug(f"Attempting to add tips assoc: {tips} (pydantic)") - tassoc, _ = tips.to_sql(procedure=self) - # logger.debug(f"Attempting to add tips assoc: {tips.__dict__} (sql)") - if tassoc not in self.proceduretipsassociation: - tassoc.save() - else: - logger.error(f"Tips already found in submission, skipping.") + # equipment = dlg.parse_form() + # for equip in equipment: + # logger.debug(f"Parsed equipment: {equip}") + # _, assoc = equip.to_sql(procedure=self) + # logger.debug(f"Got equipment association: {assoc} for {equip}") + # try: + # assoc.save() + # except AttributeError as e: + # logger.error(f"Couldn't save association with {equip} due to {e}") + # if equip.tips: + # for tips in equip.tips: + # # logger.debug(f"Attempting to add tips assoc: {tips} (pydantic)") + # tassoc, _ = tips.to_sql(procedure=self) + # # logger.debug(f"Attempting to add tips assoc: {tips.__dict__} (sql)") + # if tassoc not in self.proceduretipsassociation: + # tassoc.save() + # else: + # logger.error(f"Tips already found in submission, skipping.") + dlg.save_procedure() def edit(self, obj): + from frontend.widgets.procedure_creation import ProcedureCreation logger.debug("Edit!") + dlg = ProcedureCreation(parent=obj, procedure=self.to_pydantic()) + if dlg.exec(): + logger.debug("Edited") def add_comment(self, obj): logger.debug("Add Comment!") @@ -1439,7 +1473,8 @@ class Procedure(BaseClass): if sample.sample.sample_id in [s.sample_id for s in run_samples]] for sample in active_samples: sample['active'] = True - inactive_samples = [sample.details_dict() for sample in run_samples if sample.name not in [s['sample_id'] for s in active_samples]] + inactive_samples = [sample.details_dict() for sample in run_samples if + sample.name not in [s['sample_id'] for s in active_samples]] # logger.debug(f"Inactive samples:{pformat(inactive_samples)}") for sample in inactive_samples: sample['active'] = False @@ -1451,10 +1486,30 @@ class Procedure(BaseClass): output['tips'] = [tips.details_dict() for tips in output['proceduretipsassociation']] output['repeat'] = bool(output['repeat']) output['excluded'] = ['id', "results", "proceduresampleassociation", "sample", "procedurereagentassociation", - "procedureequipmentassociation", "proceduretipsassociation", "reagent", "equipment", "tips", + "procedureequipmentassociation", "proceduretipsassociation", "reagent", "equipment", + "tips", "excluded"] return output + def to_pydantic(self, **kwargs): + from backend.validators.pydant import PydResults + output = super().to_pydantic() + output.kittype = dict(value=output.kittype['name'], missing=False) + results = [] + for result in output.results: + match result: + case dict(): + results.append(PydResults(**result)) + case PydResults(): + results.append(result) + case _: + pass + output.results = results + # for sample in output.sample: + # sample.enabled = True + return output + + class ProcedureTypeKitTypeAssociation(BaseClass): """ Abstract of relationship between kits and their procedure type. @@ -2046,11 +2101,16 @@ class Equipment(BaseClass, LogMixin): PydEquipment: pydantic equipment object """ from backend.validators.pydant import PydEquipment + creation_dict = self.details_dict() processes = self.get_processes(proceduretype=proceduretype, kittype=kittype, equipmentrole=equipmentrole) + logger.debug(f"Processes: {processes}") + creation_dict['processes'] = processes logger.debug(f"EquipmentRole: {equipmentrole}") - return PydEquipment(processes=processes, equipmentrole=equipmentrole, - **self.to_dict(processes=False)) + creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole'] + # return PydEquipment(process=processes, equipmentrole=equipmentrole, + # **self.to_dict(processes=False)) + return PydEquipment(**creation_dict) @classproperty def manufacturer_regex(cls) -> re.Pattern: @@ -2267,11 +2327,43 @@ class EquipmentRole(BaseClass): return OmniEquipmentRole(instance_object=self, name=self.name) def details_dict(self, **kwargs): + if "proceduretype" in kwargs: + proceduretype = kwargs['proceduretype'] + else: + proceduretype = None + match proceduretype: + case ProcedureType(): + pass + case str(): + proceduretype = ProcedureType.query(name=proceduretype, limit=1) + case _: + proceduretype = None output = super().details_dict(**kwargs) + # Note con output['equipment'] = [item.details_dict() for item in output['equipment']] + output['equipment_json'] = [] + equip = [] + for eq in output['equipment']: + dicto = dict(name=eq['name'], asset_number=eq['asset_number']) + dicto['processes'] = [dict(name=process.name, tiprole=process.tiprole) for process in eq['process'] if proceduretype in process.proceduretype] + for process in dicto['processes']: + try: + process['tips'] = flatten_list([[tips.name for tips in tr.tips]for tr in process['tiprole']]) + except KeyError: + logger.debug(f"process: {pformat(process)}") + raise KeyError() + del process['tiprole'] + equip.append(dicto) + output['equipment_json'].append(dict(name=self.name, equipment=equip)) output['process'] = [item.details_dict() for item in output['process']] + try: + output['tips'] = [item.details_dict() for item in output['tips']] + except KeyError: + # logger.error(pformat(output)) + pass return output + class ProcedureEquipmentAssociation(BaseClass): """ Abstract association between BasicRun and Equipment @@ -2297,7 +2389,8 @@ class ProcedureEquipmentAssociation(BaseClass): except AttributeError: return "" - def __init__(self, procedure=None, equipment=None, procedure_id:int|None=None, equipment_id:int|None=None, equipmentrole: str = "None"): + def __init__(self, procedure=None, equipment=None, procedure_id: int | None = None, equipment_id: int | None = None, + equipmentrole: str = "None"): if not procedure: if procedure_id: procedure = Procedure.query(id=procedure_id) @@ -2310,8 +2403,16 @@ class ProcedureEquipmentAssociation(BaseClass): else: logger.error("Creation error") self.equipment = equipment + if isinstance(equipmentrole, list): + equipmentrole = equipmentrole[0] + if isinstance(equipmentrole, EquipmentRole): + equipmentrole = equipmentrole.name self.equipmentrole = equipmentrole + @property + def name(self): + return f"{self.procedure.name} & {self.equipment.name}" + @property def process(self): return Process.query(id=self.process_id) @@ -2379,6 +2480,7 @@ class ProcedureEquipmentAssociation(BaseClass): output['process'] = self.process.details_dict() return output + class ProcedureTypeEquipmentRoleAssociation(BaseClass): """ Abstract association between SubmissionType and EquipmentRole @@ -2386,6 +2488,8 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated procedure + kittype_id = Column(INTEGER, ForeignKey("_kittype.id"), + primary_key=True) uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. static = Column(INTEGER, default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? @@ -2396,6 +2500,8 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): equipmentrole = relationship(EquipmentRole, back_populates="equipmentroleproceduretypeassociation") #: associated equipment + kittype = relationship(KitType, back_populates="proceduretypeequipmentroleassociation") + @validates('static') def validate_static(self, key, value): """ @@ -2781,6 +2887,10 @@ class Tips(BaseClass, LogMixin): # template = env.get_template("tips_details.html") # return template + def to_pydantic(self, **kwargs): + output = super().to_pydantic() + return output + class ProcedureTypeTipRoleAssociation(BaseClass): """ @@ -2828,7 +2938,7 @@ class ProcedureTipsAssociation(BaseClass): @classmethod @setup_lookup - def query(cls, tips: int|Tips, tiprole: str, procedure: int | Procedure | None = None, limit: int = 0, **kwargs) \ + def query(cls, tips: int | Tips, tiprole: str, procedure: int | Procedure | None = None, limit: int = 0, **kwargs) \ -> Any | List[Any]: query: Query = cls.__database_session__.query(cls) match tips: @@ -2873,15 +2983,15 @@ class ProcedureTipsAssociation(BaseClass): class Results(BaseClass): - id = Column(INTEGER, primary_key=True) result_type = Column(String(32)) result = Column(JSON) + date_analyzed = Column(TIMESTAMP) procedure_id = Column(INTEGER, ForeignKey("_procedure.id", ondelete='SET NULL', - name="fk_RES_procedure_id")) + name="fk_RES_procedure_id")) procedure = relationship("Procedure", back_populates="results") assoc_id = Column(INTEGER, ForeignKey("_proceduresampleassociation.id", ondelete='SET NULL', - name="fk_RES_ASSOC_id")) + name="fk_RES_ASSOC_id")) sampleprocedureassociation = relationship("ProcedureSampleAssociation", back_populates="results") _img = Column(String(128)) @@ -2900,5 +3010,3 @@ class Results(BaseClass): @image.setter def image(self, value): self._img = value - - diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index c37b62e..0e8e563 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -398,6 +398,10 @@ class Run(BaseClass, LogMixin): def name(self): return self.rsl_plate_number + @hybrid_property + def plate_number(self): + return self.rsl_plate_number + @classmethod def get_default_info(cls, *args, submissiontype: SubmissionType | None = None) -> dict: """ @@ -602,6 +606,7 @@ class Run(BaseClass, LogMixin): def details_dict(self, **kwargs): output = super().details_dict() + output['plate_number'] = self.plate_number submission_samples = [sample for sample in self.clientsubmission.sample] # logger.debug(f"Submission samples:{pformat(submission_samples)}") active_samples = [sample.details_dict() for sample in output['runsampleassociation'] @@ -1174,7 +1179,7 @@ class Run(BaseClass, LogMixin): from frontend.widgets.procedure_creation import ProcedureCreation procedure_type = next((proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name)) logger.debug(f"Got ProcedureType: {procedure_type}") - dlg = ProcedureCreation(parent=obj, run=self, proceduretype=procedure_type) + dlg = ProcedureCreation(parent=obj, procedure=procedure_type.construct_dummy_procedure(run=self)) if dlg.exec(): sql, _ = dlg.return_sql() logger.debug(f"Output run samples:\n{pformat(sql.run.sample)}") @@ -1358,12 +1363,12 @@ class Run(BaseClass, LogMixin): row, column = plate_dict[submission_rank] ranked_samples.append( dict(well_id=sample.sample_id, sample_id=sample.sample_id, row=row, column=column, submission_rank=submission_rank, - background_color="#6ffe1d")) + background_color="#6ffe1d", enabled=True)) padded_list = [] for iii in range(1, proceduretype.total_wells+1): row, column = proceduretype.ranked_plate[iii] sample = next((item for item in ranked_samples if item['submission_rank']==iii), - dict(well_id=f"blank_{iii}", sample_id="", row=row, column=column, submission_rank=iii, background_color="#ffffff") + dict(well_id=f"blank_{iii}", sample_id="", row=row, column=column, submission_rank=iii, background_color="#ffffff", enabled=False) ) padded_list.append(sample) # logger.debug(f"Final padded list:\n{pformat(list(sorted(padded_list, key=itemgetter('submission_rank'))))}") diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 2a301be..ddfd9be 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -9,6 +9,8 @@ from dateutil.parser import parse from dateutil.parser import ParserError from typing import List, Tuple, Literal from types import GeneratorType + +import backend from . import RSLNamer from pathlib import Path from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone @@ -31,7 +33,11 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): def prevalidate(cls, data): sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)] output = {} - for key, value in data.items(): + try: + items = data.items() + except AttributeError: + return data + for key, value in items: new_key = key.replace("_", "") if new_key in sql_fields: output[new_key] = value @@ -104,11 +110,14 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): def to_sql(self): dicto = self.improved_dict(dictionaries=False) logger.debug(f"Dicto: {dicto}") - sql, _ = self._sql_object().query_or_create(**dicto) + # sql, new = self._sql_object().query_or_create(**dicto) + sql, new = self._sql_object.query_or_create(**dicto) + if new: + logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}") return sql -class PydReagent(BaseModel): +class PydReagent(PydBaseClass): lot: str | None reagentrole: str | None expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True) @@ -269,6 +278,7 @@ class PydSample(PydBaseClass): value = row_keys[value] return value + class PydTips(BaseModel): name: str lot: str | None = Field(default=None) @@ -277,6 +287,13 @@ class PydTips(BaseModel): @field_validator('tiprole', mode='before') @classmethod def get_role_name(cls, value): + if isinstance(value, list): + output = [] + for tiprole in value: + if isinstance(tiprole, TipRole): + tiprole = tiprole.name + return tiprole + value = output if isinstance(value, TipRole): value = value.name return value @@ -304,13 +321,21 @@ class PydEquipment(BaseModel, extra='ignore'): asset_number: str name: str nickname: str | None - process: List[str] | None - equipmentrole: str | None - tips: List[PydTips] | None = Field(default=None) + # process: List[dict] | None + process: PydProcess | None + equipmentrole: str | PydEquipmentRole | None + tips: List[PydTips] | None = Field(default=[]) @field_validator('equipmentrole', mode='before') @classmethod def get_role_name(cls, value): + match value: + case list(): + value = value[0] + case GeneratorType(): + value = next(value) + case _: + pass if isinstance(value, EquipmentRole): value = value.name return value @@ -325,12 +350,24 @@ class PydEquipment(BaseModel, extra='ignore'): value = convert_nans_to_nones(value) if not value: value = [''] + logger.debug(value) try: - value = [item.strip() for item in value] + # value = [item.strip() for item in value] + value = next((PydProcess(**process.details_dict()) for process in value)) except AttributeError: pass return value + @field_validator('tips', mode='before') + @classmethod + def tips_to_pydantic(cls, value): + output = [] + for tips in value: + if isinstance(tips, Tips): + tips = tips.to_pydantic() + output.append(tips) + return output + @report_result def to_sql(self, procedure: Procedure | str = None, kittype: KitType | str = None) -> Tuple[ Equipment, ProcedureEquipmentAssociation]: @@ -357,7 +394,7 @@ class PydEquipment(BaseModel, extra='ignore'): # NOTE: Need to make sure the same association is not added to the procedure try: assoc, new = ProcedureEquipmentAssociation.query_or_create(equipment=equipment, procedure=procedure, - equipmentrole=self.equipmentrole, limit=1) + equipmentrole=self.equipmentrole, limit=1) except TypeError as e: logger.error(f"Couldn't get association due to {e}, returning...") return None, None @@ -1282,8 +1319,14 @@ class PydProcess(BaseModel, extra="allow"): @classmethod def enforce_list(cls, value): if not isinstance(value, list): - return [value] - return value + value = [value] + output = [] + for v in value: + if issubclass(v.__class__, BaseClass): + output.append(v.name) + else: + output.append(v) + return output @report_result def to_sql(self): @@ -1356,7 +1399,28 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): plate_map: str | None = Field(default=None) reagent: list | None = Field(default=[]) reagentrole: dict | None = Field(default={}, validate_default=True) - samples: List[PydSample] = Field(default=[]) + sample: List[PydSample] = Field(default=[]) + equipment: List[PydEquipment] = Field(default=[]) + results: List[PydResults] | List[dict] = Field(default=[]) + + @field_validator("name", "technician", "kittype", mode="before") + @classmethod + def convert_to_dict(cls, value): + if isinstance(value, str): + value = dict(value=value, missing=False) + return value + + @field_validator("proceduretype", mode="before") + @classmethod + def lookup_proceduretype(cls, value): + match value: + case dict(): + value = ProcedureType.query(name=value['name']) + case str(): + value = ProcedureType.query(name=value) + case _: + pass + return value @field_validator("name") @classmethod @@ -1378,25 +1442,41 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): @classmethod def rescue_possible_kits(cls, value, values): if not value: - if values.data['proceduretype']: - value = [kittype.name for kittype in values.data['proceduretype'].kittype] + try: + if values.data['proceduretype']: + value = [kittype.__dict__['name'] for kittype in values.data['proceduretype'].kittype] + except KeyError: + pass return value @field_validator("name", "technician", "kittype") @classmethod def set_colour(cls, value): - if value["missing"]: - value["colour"] = "FE441D" - else: - value["colour"] = "6ffe1d" + try: + if value["missing"]: + value["colour"] = "FE441D" + else: + value["colour"] = "6ffe1d" + except KeyError: + pass return value @field_validator("reagentrole") @classmethod def rescue_reagentrole(cls, value, values): if not value: - if values.data['kittype']['value'] != cls.model_fields['kittype'].default['value']: - kittype = KitType.query(name=values.data['kittype']['value']) + match values.data['kittype']: + case dict(): + if "value" in values.data['kittype'].keys(): + roi = values.data['kittype']['value'] + elif "name" in values.data['kittype'].keys(): + roi = values.data['kittype']['name'] + else: + raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}") + case str(): + roi = values.data['kittype'] + if roi != cls.model_fields['kittype'].default['value']: + kittype = KitType.query(name=roi) value = {item.name: item.reagent for item in kittype.reagentrole} return value @@ -1413,6 +1493,19 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): self.kittype['value'] = kittype self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) + def update_kittype_equipmentroles(self, kittype: str | KitType): + if kittype == self.__class__.model_fields['kittype'].default['value']: + return + if isinstance(kittype, str): + kittype_obj = KitType.query(name=kittype) + try: + self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in + kittype_obj.get_reagents(proceduretype=self.proceduretype)} + except AttributeError: + self.reagentrole = {} + self.kittype['value'] = kittype + self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) + def update_samples(self, sample_list: List[dict]): logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}") for sample_dict in sample_list: @@ -1425,7 +1518,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): (item for item in self.samples if item.sample_id.upper() == sample_dict['sample_id'].upper())) except StopIteration: # NOTE: Code to check for added controls. - logger.debug(f"Sample not found by name: {sample_dict['sample_id']}, checking row {row} column {column}") + logger.debug( + f"Sample not found by name: {sample_dict['sample_id']}, checking row {row} column {column}") try: sample = next( (item for item in self.samples if item.row == row and item.column == column)) @@ -1441,7 +1535,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): def to_sql(self): from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation + # results = [] + # for result in self.results: + # result, _ = result.to_sql() sql = super().to_sql() + logger.debug(f"Initial PYD: {pformat(self.__dict__)}") + # sql.results = [result.to_sql() for result in self.results] if self.run: sql.run = self.run if self.proceduretype: @@ -1450,27 +1549,37 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1 except ValueError: start_index = 1 - relevant_samples = [sample for sample in self.samples if not sample.sample_id.startswith("blank_") and not sample.sample_id == ""] + relevant_samples = [sample for sample in self.sample if + not sample.sample_id.startswith("blank_") and not sample.sample_id == ""] logger.debug(f"start index: {start_index}") - assoc_id_range = range(start_index, start_index + len(relevant_samples)+1) + assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1) logger.debug(f"Association id range: {assoc_id_range}") for iii, sample in enumerate(relevant_samples): sample_sql = sample.to_sql() if sql.run: if sample_sql not in sql.run.sample: logger.debug(f"sample {sample_sql} not found in {sql.run.sample}") - run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, column=sample.column) + run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, + column=sample.column) else: logger.debug(f"sample {sample_sql} found in {sql.run.sample}") - proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, row=sample.row, column=sample.column) + if sample_sql not in sql.sample: + proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, + row=sample.row, column=sample.column) if self.kittype['value'] not in ["NA", None, ""]: kittype = KitType.query(name=self.kittype['value'], limit=1) if kittype: sql.kittype = kittype + for equipment in self.equipment: + equip = Equipment.query(name=equipment.name) + if equip not in sql.equipment: + equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql, equipmentrole=equip.equipmentrole[0]) + process = equipment.process.to_sql() + equip_assoc.process = process + logger.debug(f"Output sql: {[pformat(item.__dict__) for item in sql.procedureequipmentassociation]}") return sql, None - class PydClientSubmission(PydBaseClass): # sql_object: ClassVar = ClientSubmission @@ -1549,15 +1658,34 @@ class PydClientSubmission(PydBaseClass): class PydResults(PydBaseClass, arbitrary_types_allowed=True): - results: dict = Field(default={}) results_type: str = Field(default="NA") img: None | bytes = Field(default=None) - parent: Procedure|ProcedureSampleAssociation|None = Field(default=None) + parent: Procedure | ProcedureSampleAssociation | None = Field(default=None) + date_analyzed: datetime | None = Field(default=None) + + @field_validator("date_analyzed") + @classmethod + def set_today(cls, value): + match value: + case str(): + value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + case datetime(): + pass + case _: + value = datetime.now() + return value def to_sql(self): - sql = Results(results_type=self.results_type, result=self.results) - sql.image = self.img + sql, _ = Results.query_or_create(results_type=self.results_type, result=self.results) + try: + check = sql.image + except FileNotFoundError: + check = False + if not check: + sql.image = self.img + if not sql.date_analyzed: + sql.date_analyzed = self.date_analyzed match self.parent: case ProcedureSampleAssociation(): sql.sampleprocedureassociation = self.parent @@ -1566,4 +1694,3 @@ class PydResults(PydBaseClass, arbitrary_types_allowed=True): case _: logger.error("Improper association found.") return sql - diff --git a/src/submissions/frontend/widgets/app.py b/src/submissions/frontend/widgets/app.py index 56bee9a..3733824 100644 --- a/src/submissions/frontend/widgets/app.py +++ b/src/submissions/frontend/widgets/app.py @@ -13,7 +13,7 @@ from PyQt6.QtGui import QAction from pathlib import Path from markdown import markdown from pandas import ExcelWriter -from backend import Reagent, Sample, ClientSubmission, KitType, Run +from backend.db.models import Reagent, Sample, ClientSubmission, KitType, Run from tools import ( check_if_app, Settings, Report, jinja_template_loading, check_authorization, page_size, is_power_user, under_development diff --git a/src/submissions/frontend/widgets/equipment_usage.py b/src/submissions/frontend/widgets/equipment_usage.py index 94b7b4f..66e62ab 100644 --- a/src/submissions/frontend/widgets/equipment_usage.py +++ b/src/submissions/frontend/widgets/equipment_usage.py @@ -127,7 +127,7 @@ class RoleComboBox(QWidget): logger.debug(f"Equip2: {equip2}") with QSignalBlocker(self.process) as blocker: self.process.clear() - self.process.addItems([item for item in equip2.processes if item in self.role.process]) + self.process.addItems([item for item in equip2.process if item in self.role.process]) def update_tips(self): """ diff --git a/src/submissions/frontend/widgets/procedure_creation.py b/src/submissions/frontend/widgets/procedure_creation.py index 1336279..dcfa601 100644 --- a/src/submissions/frontend/widgets/procedure_creation.py +++ b/src/submissions/frontend/widgets/procedure_creation.py @@ -14,27 +14,28 @@ from PyQt6.QtWebChannel import QWebChannel from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWidgets import QDialog, QGridLayout, QMenu, QDialogButtonBox from typing import TYPE_CHECKING, Any - if TYPE_CHECKING: - from backend.db.models import Run, ProcedureType + from backend.db.models import Run, Procedure + from backend.validators import PydProcedure from tools import jinja_template_loading, get_application_from_parent, render_details_template -from backend.validators import PydProcedure + logger = logging.getLogger(f"submissions.{__name__}") class ProcedureCreation(QDialog): - def __init__(self, parent, run: Run, proceduretype: ProcedureType): + def __init__(self, parent, procedure: PydProcedure): super().__init__(parent) - self.run = run - self.proceduretype = proceduretype - self.setWindowTitle(f"New {proceduretype.name} for { run.rsl_plate_number }") - self.created_procedure = self.proceduretype.construct_dummy_procedure(run=self.run) - self.created_procedure.update_kittype_reagentroles(kittype=self.created_procedure.possible_kits[0]) - self.created_procedure.samples = self.run.constuct_sample_dicts_for_proceduretype(proceduretype=self.proceduretype) + self.run = procedure.run + self.procedure = procedure + self.proceduretype = procedure.proceduretype + self.setWindowTitle(f"New {self.proceduretype.name} for { self.run.rsl_plate_number }") + # self.created_procedure = self.proceduretype.construct_dummy_procedure(run=self.run) + self.procedure.update_kittype_reagentroles(kittype=self.procedure.possible_kits[0]) + # self.created_procedure.samples = self.run.constuct_sample_dicts_for_proceduretype(proceduretype=self.proceduretype) # logger.debug(f"Samples to map\n{pformat(self.created_procedure.samples)}") - self.plate_map = self.proceduretype.construct_plate_map(sample_dicts=self.created_procedure.samples) + self.plate_map = self.proceduretype.construct_plate_map(sample_dicts=self.procedure.sample) # logger.debug(f"Plate map: {self.plate_map}") # logger.debug(f"Created dummy: {self.created_procedure}") self.app = get_application_from_parent(parent) @@ -61,46 +62,56 @@ class ProcedureCreation(QDialog): self.layout.addWidget(self.buttonBox, 11, 1, 1, 1) def set_html(self): + from .equipment_usage_2 import EquipmentUsage + proceduretype_dict = self.proceduretype.details_dict() + if self.procedure.equipment: + for equipmentrole in proceduretype_dict['equipment']: + # NOTE: Check if procedure equipment is present and move to head of the list if so. + try: + relevant_procedure_item = next((equipment for equipment in self.procedure.equipment if equipment.equipmentrole == equipmentrole['name'])) + except StopIteration: + continue + item_in_er_list = next((equipment for equipment in equipmentrole['equipment'] if equipment['name'] == relevant_procedure_item.name)) + equipmentrole['equipment'].insert(0, equipmentrole['equipment'].pop(equipmentrole['equipment'].index(item_in_er_list))) + proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True) html = render_details_template( template_name="procedure_creation", # css_in=['new_context_menu'], js_in=["procedure_form", "grid_drag", "context_menu"], - proceduretype=self.proceduretype.details_dict(), + proceduretype=proceduretype_dict, run=self.run.details_dict(), - procedure=self.created_procedure.__dict__, + procedure=self.procedure.__dict__, plate_map=self.plate_map ) - # with open("procedure.html", "w") as f: - # f.write(html) self.webview.setHtml(html) @pyqtSlot(str, str) def text_changed(self, key: str, new_value: str): logger.debug(f"New value for {key}: {new_value}") - attribute = getattr(self.created_procedure, key) + attribute = getattr(self.procedure, key) attribute['value'] = new_value.strip('\"') @pyqtSlot(str, bool) def check_toggle(self, key: str, ischecked: bool): # logger.debug(f"{key} is checked: {ischecked}") - setattr(self.created_procedure, key, ischecked) + setattr(self.procedure, key, ischecked) @pyqtSlot(str) def update_kit(self, kittype): - self.created_procedure.update_kittype_reagentroles(kittype=kittype) - logger.debug({k: v for k, v in self.created_procedure.__dict__.items() if k != "plate_map"}) + self.procedure.update_kittype_reagentroles(kittype=kittype) + logger.debug({k: v for k, v in self.procedure.__dict__.items() if k != "plate_map"}) self.set_html() @pyqtSlot(list) def rearrange_plate(self, sample_list: list): - self.created_procedure.update_samples(sample_list=sample_list) + self.procedure.update_samples(sample_list=sample_list) @pyqtSlot(str) def log(self, logtext: str): logger.debug(logtext) def return_sql(self): - return self.created_procedure.to_sql() + return self.procedure.to_sql() # class ProcedureWebViewer(QWebEngineView): diff --git a/src/submissions/templates/css/styles.css b/src/submissions/templates/css/styles.css index 54ef4d7..9885374 100644 --- a/src/submissions/templates/css/styles.css +++ b/src/submissions/templates/css/styles.css @@ -179,3 +179,7 @@ ul.no-bullets { text-decoration-color: red; } +.grid-container { + display: grid; + grid-auto-flow: column; + } \ No newline at end of file diff --git a/src/submissions/templates/js/equipment.js b/src/submissions/templates/js/equipment.js new file mode 100644 index 0000000..64a543b --- /dev/null +++ b/src/submissions/templates/js/equipment.js @@ -0,0 +1,89 @@ +const equipment_json = {{ proceduretype['equipment_json'] }}; + +window.addEventListener('load', function () { + equipment_json.forEach(startup); +}) + +function startup(equipmentrole) { + updateEquipmentChoices(equipmentrole); + var eq_dropdown = document.getElementById(equipmentrole.name); + eq_dropdown.addEventListener("change", function(event){ + updateProcessChoices(equipmentrole); + updateBackend(equipmentrole); + }); + var process_dropdown = document.getElementById(equipmentrole.name + "_process"); + process_dropdown.addEventListener("change", function(event){ + updateTipChoices(equipmentrole); + updateBackend(equipmentrole); + }); + var tips_dropdown = document.getElementById(equipmentrole.name + "_tips"); + tips_dropdown.addEventListener("change", function(event){ + updateBackend(equipmentrole); + }); +} + +function updateEquipmentChoices(equipmentrole) { + console.log("Updating equipment choices."); + var dropdown_oi = document.getElementById(equipmentrole.name); + while (dropdown_oi.options.length > 0) { + dropdown_oi.remove(0); + } + dropdown_oi.json = equipmentrole; + for (let iii = 0; iii < equipmentrole.equipment.length; iii++) { + var opt = document.createElement('option'); + opt.value = equipmentrole.equipment[iii].name; + opt.innerHTML = equipmentrole.equipment[iii].name; + dropdown_oi.appendChild(opt); + } + updateProcessChoices(equipmentrole); +} + +function updateProcessChoices(equipmentrole) { + console.log("Updating process choices."); + var dropdown_oi = document.getElementById(equipmentrole.name + "_process"); + while (dropdown_oi.options.length > 0) { + dropdown_oi.remove(0); + } + dropdown_oi.json = equipmentrole; + var equipment_name = document.getElementById(equipmentrole.name).value; + var equipment = equipmentrole.equipment.filter(function(x){ return x.name == equipment_name })[0]; + for (let iii = 0; iii < equipment.processes.length; iii++) { + var opt = document.createElement('option'); + opt.value = equipment.processes[iii].name; + opt.innerHTML = equipment.processes[iii].name; + dropdown_oi.appendChild(opt); + } + updateTipChoices(equipmentrole); +} + +function updateTipChoices(equipmentrole) { + console.log("Updating tip choices."); + var dropdown_oi = document.getElementById(equipmentrole.name + "_tips"); + dropdown_oi.innerHTML = ""; + dropdown_oi.json = equipmentrole; + var equipment_name = document.getElementById(equipmentrole.name).value; + var process_name = document.getElementById(equipmentrole.name + "_process").value; + console.log(process_name); + var equipment = equipmentrole.equipment.filter(function(x){ return x.name == equipment_name })[0]; + console.log(equipment); + var process = equipment.processes.filter(function(x){ return x.name == process_name })[0]; + console.log(process); + for (let iii = 0; iii < process.tips.length; iii++) { + var opt = document.createElement('option'); + opt.value = process.tips[iii]; + opt.innerHTML = process.tips[iii]; + dropdown_oi.appendChild(opt); + } +} + +function updateBackend(equipmentrole) { + alert("Updating Backend"); + var equipmentrole_name = equipmentrole.name + var dropdown_oi = document.getElementById(equipmentrole.name); + var equipment_name = dropdown_oi.value; + dropdown_oi = document.getElementById(equipmentrole.name + "_process"); + var process_name = dropdown_oi.value; + dropdown_oi = document.getElementById(equipmentrole.name + "_tips"); + var tips_name = dropdown_oi.value; + backend.update_equipment(equipmentrole_name, equipment_name, process_name, tips_name) +} diff --git a/src/submissions/templates/procedure_creation.html b/src/submissions/templates/procedure_creation.html index a8c0aec..07c5911 100644 --- a/src/submissions/templates/procedure_creation.html +++ b/src/submissions/templates/procedure_creation.html @@ -28,24 +28,13 @@ {% for key, value in procedure['reagentrole'].items() %} {{ key }}: - + {% for reagent in value %} {{ reagent }} {% endfor %} - - {% endfor %} - {% endif %} - {% if proceduretype['equipment'] %} - - {% for equipmentrole in proceduretype['equipment'] %} - {{ equipmentrole['name'] }}: - - {% for equipment in equipmentrole['equipment'] %} - {{ equipment['name'] }} - {% endfor %} {% endfor %} - {% endif%} + {% endif %} @@ -55,6 +44,9 @@ {% endif %} + {% with proceduretype=proceduretype, child=True %} + {% include "support/equipment_usage.html" %} + {% endwith %} {% include 'support/context_menu.html' %} {% endblock %} diff --git a/src/submissions/templates/support/equipment_usage.html b/src/submissions/templates/support/equipment_usage.html new file mode 100644 index 0000000..f97205d --- /dev/null +++ b/src/submissions/templates/support/equipment_usage.html @@ -0,0 +1,141 @@ +{% if not child %} + + + {% block head %} + + {% if css %} + + {% endif %} + {% endblock %} + + + +{% endif %} + + +Equipment + +{% for equipmentrole in proceduretype['equipment_json'] %} + + + {{ equipmentrole['name'] }}: + + + + + Process: + + + + + Tips: + + + + + +{% endfor %} + +{% if not child %} +{% for j in js%} + +{% endfor %} +{% endif %} +