From fcaa9334a2cc6195bce78962603ba3038cc8ef28 Mon Sep 17 00:00:00 2001 From: lwark Date: Tue, 2 Sep 2025 12:37:14 -0500 Subject: [PATCH] MVP working (AFAIK) --- src/submissions/backend/db/models/__init__.py | 3 +- .../backend/db/models/procedures.py | 808 ++++++++++-------- src/submissions/backend/validators/pydant.py | 141 ++- .../frontend/widgets/equipment_usage_2.py | 2 + .../frontend/widgets/procedure_creation.py | 28 +- .../frontend/widgets/submission_details.py | 2 +- .../frontend/widgets/submission_table.py | 3 +- .../templates/support/equipment_list.html | 2 +- 8 files changed, 522 insertions(+), 467 deletions(-) diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 9da0905..e754d59 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -672,7 +672,8 @@ class BaseClass(Base): pyd = getattr(pydant, pyd_model_name) except AttributeError: raise AttributeError(f"Could not get pydantic class {pyd_model_name}") - logger.debug(f"Kwargs: {kwargs}") + # logger.debug(f"Kwargs: {kwargs}") + # logger.debug(f"Dict: {pformat(self.details_dict())}") return pyd(**self.details_dict(**kwargs)) def show_details(self, obj): diff --git a/src/submissions/backend/db/models/procedures.py b/src/submissions/backend/db/models/procedures.py index c2aa348..edb3611 100644 --- a/src/submissions/backend/db/models/procedures.py +++ b/src/submissions/backend/db/models/procedures.py @@ -2,6 +2,8 @@ All kittype and reagent related models """ from __future__ import annotations + +import sys import zipfile, logging, re from operator import itemgetter from pprint import pformat @@ -36,30 +38,30 @@ reagentrole_reagent = Table( extend_existing=True ) -equipmentrole_equipment = Table( - "_equipmentrole_equipment", - Base.metadata, - Column("equipment_id", INTEGER, ForeignKey("_equipment.id")), - Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), - extend_existing=True -) - -# equipment_process = Table( -# "_equipment_process", +# equipmentrole_equipment = Table( +# "_equipmentrole_equipment", # Base.metadata, -# Column("process_id", INTEGER, ForeignKey("_process.id")), # Column("equipment_id", INTEGER, ForeignKey("_equipment.id")), +# Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), # extend_existing=True # ) -equipmentrole_process = Table( - "_equipmentrole_process", +equipment_process = Table( + "_equipment_process", Base.metadata, Column("process_id", INTEGER, ForeignKey("_process.id")), - Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), + Column("equipment_id", INTEGER, ForeignKey("_equipment.id")), extend_existing=True ) +# equipmentrole_process = Table( +# "_equipmentrole_process", +# Base.metadata, +# Column("process_id", INTEGER, ForeignKey("_process.id")), +# Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), +# extend_existing=True +# ) + # kittype_process = Table( # "_kittype_process", # Base.metadata, @@ -77,10 +79,10 @@ equipmentrole_process = Table( # ) process_tips = Table( - "_process_tiprole", + "_process_tips", Base.metadata, Column("process_id", INTEGER, ForeignKey("_process.id")), - Column("tiprole_id", INTEGER, ForeignKey("_tiprole.id")), + Column("tips_id", INTEGER, ForeignKey("_tips.id")), extend_existing=True ) @@ -597,7 +599,7 @@ class Reagent(BaseClass, LogMixin): eol_ext = Column(Interval()) #: extension of life interval name = Column(String(64)) #: reagent name cost_per_ml = Column(FLOAT) - lots = relationship("ReagentLot") + reagentlot = relationship("ReagentLot", back_populates="reagent") def __repr__(self): if self.name: @@ -818,16 +820,17 @@ class Reagent(BaseClass, LogMixin): @property def lot_dicts(self): - return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.lots] + return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.reagentlot] class ReagentLot(BaseClass): id = Column(INTEGER, primary_key=True) #: primary key lot = Column(String(64), unique=True) #: lot number of reagent expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically - reagent = relationship("Reagent") #: joined parent reagent type + active = Column(INTEGER, default=1) reagent_id = Column(INTEGER, ForeignKey("_reagent.id", ondelete='SET NULL', name="fk_REGLOT_reagent_id")) #: id of parent reagent type + reagent = relationship("Reagent", back_populates="reagentlot") #: joined parent reagent type reagentlotprocedureassociation = relationship( "ProcedureReagentLotAssociation", @@ -908,7 +911,7 @@ class Discount(BaseClass): @setup_lookup def query(cls, clientlab: ClientLab | str | int | None = None, - kittype: KitType | str | int | None = None, + proceduretype: ProcedureType | str | int | None = None, ) -> Discount | List[Discount]: """ Lookup discount objects (union of kittype and clientlab) @@ -930,13 +933,13 @@ class Discount(BaseClass): query = query.join(ClientLab).filter(ClientLab.id == clientlab) case _: pass - match kittype: - case KitType(): - query = query.filter(cls.kittype == kittype) + match proceduretype: + case ProcedureType(): + query = query.filter(cls.proceduretype == proceduretype) case str(): - query = query.join(KitType).filter(KitType.name == kittype) + query = query.join(ProcedureType).filter(ProcedureType.name == proceduretype) case int(): - query = query.join(KitType).filter(KitType.id == kittype) + query = query.join(ProcedureType).filter(ProcedureType.id == proceduretype) case _: pass return cls.execute_query(query=query) @@ -1207,8 +1210,8 @@ class ProcedureType(BaseClass): procedure = relationship("Procedure", back_populates="proceduretype") #: Concrete control of this type. - process = relationship("Process", back_populates="proceduretype", - secondary=proceduretype_process) #: Relation to equipment process used for this type. + # process = relationship("Process", back_populates="proceduretype", + # secondary=proceduretype_process) #: Relation to equipment process used for this type. submissiontype = relationship("SubmissionType", back_populates="proceduretype", secondary=submissiontype_proceduretype) #: run this kittype was used for @@ -1243,11 +1246,11 @@ class ProcedureType(BaseClass): creator=lambda reagentrole: ProcedureTypeReagentRoleAssociation( reagentrole=reagentrole)) #: Proxy of equipmentrole associations - proceduretypetiproleassociation = relationship( - "ProcedureTypeTipRoleAssociation", - back_populates="proceduretype", - cascade="all, delete-orphan" - ) #: Association of tiproles + # proceduretypetiproleassociation = relationship( + # "ProcedureTypeTipRoleAssociation", + # back_populates="proceduretype", + # cascade="all, delete-orphan" + # ) #: Association of tiproles def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1317,16 +1320,16 @@ class ProcedureType(BaseClass): # else: # return None - def get_equipment(self, kittype: str | KitType | None = None) -> Generator['PydEquipmentRole', None, None]: + def get_equipment(self) -> Generator['PydEquipmentRole', None, None]: """ Returns PydEquipmentRole of all equipment associated with this SubmissionType Returns: Generator['PydEquipmentRole', None, None]: List of equipment roles """ - return (item.to_pydantic(proceduretype=self, kittype=kittype) for item in self.equipment) + return (item.to_pydantic(proceduretype=self) for item in self.equipment) - def get_processes_for_role(self, equipmentrole: str | EquipmentRole, kittype: str | KitType | None = None) -> list: + def get_processes_for_role(self, equipmentrole: str | EquipmentRole) -> list: """ Get process associated with this SubmissionType for an EquipmentRole @@ -1342,10 +1345,10 @@ class ProcedureType(BaseClass): """ match equipmentrole: case str(): - relevant = [item.get_all_processes(kittype) for item in self.proceduretypeequipmentroleassociation if + relevant = [item.get_all_processes() for item in self.proceduretypeequipmentroleassociation if item.equipmentrole.name == equipmentrole] case EquipmentRole(): - relevant = [item.get_all_processes(kittype) for item in self.proceduretypeequipmentroleassociation if + relevant = [item.get_all_processes() for item in self.proceduretypeequipmentroleassociation if item.equipmentrole == equipmentrole] case _: raise TypeError(f"Type {type(equipmentrole)} is not allowed") @@ -1627,7 +1630,7 @@ class Procedure(BaseClass): # output['sample'] = [sample.details_dict() for sample in output['sample']] output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']] output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']] - output['tips'] = [tips.details_dict() for tips in output['proceduretipsassociation']] + # output['tips'] = [tips.details_dict() for tips in output['proceduretipsassociation']] output['repeat'] = bool(output['repeat']) output['run'] = self.run.name output['excluded'] += self.get_default_info("details_ignore") @@ -2139,11 +2142,11 @@ class ProcedureReagentLotAssociation(BaseClass): RunReagentAssociation|List[RunReagentAssociation]: SubmissionReagentAssociation(s) of interest """ query: Query = cls.__database_session__.query(cls) - match reagent: + match reagentlot: case Reagent() | str(): - if isinstance(reagent, str): - reagent = Reagent.query(lot=reagent) - query = query.filter(cls.reagent == reagent) + if isinstance(reagentlot, str): + reagent = ReagentLot.query(lot=reagentlot) + query = query.filter(cls.reagentlot == reagentlot) case _: pass match procedure: @@ -2200,246 +2203,6 @@ class ProcedureReagentLotAssociation(BaseClass): raise e -class Equipment(BaseClass, LogMixin): - """ - A concrete instance of equipment - """ - - id = Column(INTEGER, primary_key=True) #: id, primary key - name = Column(String(64)) #: equipment name - nickname = Column(String(64)) #: equipment nickname - asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will) - equipmentrole = relationship("EquipmentRole", back_populates="equipment", - secondary=equipmentrole_equipment) #: relation to EquipmentRoles - process = relationship("Process", back_populates="equipment", - secondary=equipment_process) #: relation to Processes - tips = relationship("Tips", back_populates="equipment", - secondary=equipment_tips) #: relation to Processes - equipmentprocedureassociation = relationship( - "ProcedureEquipmentAssociation", - back_populates="equipment", - cascade="all, delete-orphan", - ) #: Association with BasicRun - - procedure = association_proxy("equipmentprocedureassociation", - "procedure") #: proxy to equipmentprocedureassociation.procedure - - def __init__(self, name: str, nickname: str | None = None, asset_number: str = ""): - self.name = name - if nickname: - self.nickname = nickname - else: - self.nickname = self.name - self.asset_number = asset_number - - def to_dict(self, processes: bool = False) -> dict: - """ - This Equipment as a dictionary - - Args: - processes (bool, optional): Whether to include process. Defaults to False. - - Returns: - dict: Dictionary representation of this equipment - """ - if not processes: - return {k: v for k, v in self.__dict__.items() if k != 'process'} - else: - return {k: v for k, v in self.__dict__.items()} - - def get_processes(self, proceduretype: str | ProcedureType | None = None, - # kittype: str | KitType | None = None, - equipmentrole: str | EquipmentRole | None = None) -> Generator[Process, None, None]: - """ - Get all process associated with this Equipment for a given SubmissionType - - Args: - proceduretype (ProcedureType): SubmissionType of interest - kittype (str | KitType | None, optional): KitType to filter by. Defaults to None. - - Returns: - List[Process]: List of process names - """ - if isinstance(proceduretype, str): - proceduretype = ProcedureType.query(name=proceduretype) - for process in self.process: - if proceduretype not in process.proceduretype: - continue - if equipmentrole and equipmentrole not in process.equipmentrole: - continue - logger.debug(f"Getting process: {process}") - yield process - - @classmethod - @setup_lookup - def query(cls, - id: int | None = None, - name: str | None = None, - nickname: str | None = None, - asset_number: str | None = None, - limit: int = 0 - ) -> Equipment | List[Equipment]: - """ - Lookup a list of or single Equipment. - - Args: - name (str | None, optional): Equipment name. Defaults to None. - nickname (str | None, optional): Equipment nickname. Defaults to None. - asset_number (str | None, optional): Equipment asset number. Defaults to None. - limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. - - Returns: - Equipment|List[Equipment]: Equipment or list of equipment matching query parameters. - """ - query = cls.__database_session__.query(cls) - match id: - case int(): - query = query.filter(cls.id == id) - limit = 1 - case _: - pass - match name: - case str(): - query = query.filter(cls.name == name) - limit = 1 - case _: - pass - match nickname: - case str(): - query = query.filter(cls.nickname == nickname) - limit = 1 - case _: - pass - match asset_number: - case str(): - query = query.filter(cls.asset_number == asset_number) - limit = 1 - case _: - pass - return cls.execute_query(query=query, limit=limit) - - def to_pydantic(self, proceduretype: ProcedureType, - equipmentrole: str = None) -> "PydEquipment": - """ - Creates PydEquipment of this Equipment - - Args: - proceduretype (ProcedureType): Relevant SubmissionType - kittype (str | KitType | None, optional): Relevant KitType. Defaults to None. - - Returns: - PydEquipment: pydantic equipment object - """ - from backend.validators.pydant import PydEquipment - creation_dict = self.details_dict() - processes = self.get_processes(proceduretype=proceduretype, equipmentrole=equipmentrole) - logger.debug(f"Processes: {processes}") - creation_dict['processes'] = processes - logger.debug(f"EquipmentRole: {equipmentrole}") - creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole'] - # return PydEquipment(process=processes, equipmentrole=equipmentrole, - # **self.to_dict(processes=False)) - return PydEquipment(**creation_dict) - - @classproperty - def manufacturer_regex(cls) -> re.Pattern: - """ - Creates regex to determine tip manufacturer - - Returns: - re.Pattern: regex - """ - return re.compile(r""" - (?P50\d{5}$)| - (?PHC-\d{6}$)| - (?P[^\d][A-Z0-9]{6}$)| - (?P[A-Z]{3}-\d{2}-[A-Z]-[A-Z]$)| - (?P\d{4}-\d{3}-\d{3}-\d$)""", - re.VERBOSE) - - @classmethod - def assign_equipment(cls, equipmentrole: EquipmentRole | str) -> List[Equipment]: - """ - Creates a list of equipment from user input to be used in Submission Type creation - - Args: - equipmentrole (EquipmentRole): Equipment reagentrole to be added to. - - Returns: - List[Equipment]: User selected equipment. - """ - if isinstance(equipmentrole, str): - equipmentrole = EquipmentRole.query(name=equipmentrole) - equipment = cls.query() - options = "\n".join([f"{ii}. {item.name}" for ii, item in enumerate(equipment)]) - choices = input(f"Enter equipment numbers to add to {equipmentrole.name} (space separated):\n{options}\n\n") - output = [] - for choice in choices.split(" "): - try: - choice = int(choice) - except (AttributeError, ValueError): - continue - output.append(equipment[choice]) - return output - - # def details_dict(self, **kwargs): - # output = super().details_dict(**kwargs) - # for key in ["proceduretype", "equipmentroleproceduretypeassociation", "equipmentrole"]: - # try: - # del output[key] - # except KeyError: - # pass - # return output - - # def to_sub_dict(self, full_data: bool = False, **kwargs) -> dict: - # """ - # dictionary containing values necessary for gui - # - # Args: - # full_data (bool, optional): Whether to include procedure in data for details. Defaults to False. - # - # Returns: - # dict: representation of the equipment's attributes - # """ - # if self.nickname: - # nickname = self.nickname - # else: - # nickname = self.name - # output = dict( - # name=self.name, - # nickname=nickname, - # asset_number=self.asset_number - # ) - # if full_data: - # subs = [dict(plate=item.procedure.procedure.rsl_plate_number, process=item.process.name, - # sub_date=item.procedure.procedure.start_date) - # if item.process else dict(plate=item.procedure.procedure.rsl_plate_number, process="NA") - # for item in self.equipmentprocedureassociation] - # output['procedure'] = sorted(subs, key=itemgetter("sub_date"), reverse=True) - # output['excluded'] = ['missing', 'procedure', 'excluded', 'editable'] - # return output - - # @classproperty - # def details_template(cls) -> Template: - # """ - # Get the details jinja template for the correct class - # - # Args: - # base_dict (dict): incoming dictionary of Submission fields - # - # Returns: - # Tuple(dict, Template): (Updated dictionary, Template to be rendered) - # """ - # env = jinja_template_loading() - # temp_name = f"{cls.__name__.lower()}_details.html" - # try: - # template = env.get_template(temp_name) - # except TemplateNotFound as e: - # logger.error(f"Couldn't find template {e}") - # template = env.get_template("equipment_details.html") - # return template - - class EquipmentRole(BaseClass): """ Abstract roles for equipment @@ -2447,10 +2210,9 @@ class EquipmentRole(BaseClass): id = Column(INTEGER, primary_key=True) #: Role id, primary key name = Column(String(32)) #: Common name - equipment = relationship("Equipment", back_populates="equipmentrole", - secondary=equipmentrole_equipment) #: Concrete control (Equipment) of reagentrole - process = relationship("Process", back_populates='equipmentrole', - secondary=equipmentrole_process) #: Associated Processes + # equipment = relationship("Equipment", back_populates='equipmentrole', secondary=equipmentrole_equipment) + # process = relationship("Process", back_populates='equipmentrole', + # secondary=equipmentrole_process) #: Associated Processes equipmentroleproceduretypeassociation = relationship( "ProcedureTypeEquipmentRoleAssociation", @@ -2461,6 +2223,16 @@ class EquipmentRole(BaseClass): proceduretype = association_proxy("equipmentroleproceduretypeassociation", "proceduretype") #: proxy to equipmentroleproceduretypeassociation.proceduretype + equipmentroleequipmentassociation = relationship( + "EquipmentRoleEquipmentAssociation", + back_populates="equipmentrole", + cascade="all, delete-orphan", + ) + + equipment = association_proxy("equipmentroleequipmentassociation", + "equipmentrole", creator=lambda equipment: EquipmentRoleEquipmentAssociation( + equipment=equipment)) + def to_dict(self) -> dict: """ This EquipmentRole as a dictionary @@ -2470,8 +2242,7 @@ class EquipmentRole(BaseClass): """ return {key: value for key, value in self.__dict__.items() if key != "process" and key != "equipment"} - def to_pydantic(self, proceduretype: ProcedureType, - kittype: str | KitType | None = None) -> "PydEquipmentRole": + def to_pydantic(self, proceduretype: ProcedureType) -> "PydEquipmentRole": """ Creates a PydEquipmentRole of this EquipmentRole @@ -2483,10 +2254,10 @@ class EquipmentRole(BaseClass): PydEquipmentRole: This EquipmentRole as PydEquipmentRole """ from backend.validators.pydant import PydEquipmentRole - equipment = [item.to_pydantic(proceduretype=proceduretype, kittype=kittype, equipmentrole=self) for item in + equipment = [item.to_pydantic(proceduretype=proceduretype, equipmentrole=self) for item in self.equipment] pyd_dict = self.to_dict() - pyd_dict['process'] = self.get_processes(proceduretype=proceduretype, kittype=kittype) + pyd_dict['process'] = self.get_processes(proceduretype=proceduretype) return PydEquipmentRole(equipment=equipment, **pyd_dict) @classmethod @@ -2576,38 +2347,321 @@ class EquipmentRole(BaseClass): case _: proceduretype = None output = super().details_dict(**kwargs) + # sys.exit(f"Equipment: {pformat(output)}") # Note con - output['equipment'] = [item.details_dict() for item in output['equipment']] + # output['equipment'] = [item.details_dict() for item in output['equipment']] # output['equipment_json'] = [] + output['equipment'] = [item.details_dict()['equipment'] for item in self.equipmentroleequipmentassociation] equip = [] for eq in output['equipment']: + # logger.debug(eq) dicto = dict(name=eq['name'], asset_number=eq['asset_number']) dicto['process'] = [ - {'name': version.name, 'tiprole': process.tiprole} + {'name': process['name'], 'tips': process['tips']} for process in eq['process'] - if proceduretype in process.proceduretype - for version in process.processversion + # if proceduretype in process.proceduretype + # for version in process.processversion ] + for process in dicto['process']: + # logger.debug(process['tips']) try: - process['tips'] = flatten_list([[tips.name for tips in tr.tips] for tr in process['tiprole']]) + process['tips'] = [tr['name'] for tr in process['tips']] except KeyError: logger.debug(f"process: {pformat(process)}") raise KeyError() - del process['tiprole'] + # del process['tiprole'] equip.append(dicto) output['equipment'] = equip + # sys.exit(pformat(output['equipment'])) # output['equipment_json'].append(dict(name=self.name, equipment=equip)) # output['process'] = [item.details_dict() for item in output['process']] - output['process'] = [version.details_dict() for version in - flatten_list([process.processversion for process in self.process])] + # output['process'] = [version.details_dict() for version in + # flatten_list([process.processversion for process in self.process])] # logger.debug(f"\n\nProcess: {pformat(output['process'])}") - try: - output['tips'] = [item.details_dict() for item in output['tips']] - except KeyError: - # logger.error(pformat(output)) - pass + # try: + # output['tips'] = [item.details_dict() for item in output['tips']] + # except KeyError: + # # logger.error(pformat(output)) + # pass + return output + + +class Equipment(BaseClass, LogMixin): + """ + A concrete instance of equipment + """ + + id = Column(INTEGER, primary_key=True) #: id, primary key + name = Column(String(64)) #: equipment name + nickname = Column(String(64)) #: equipment nickname + asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will) + # equipmentrole = relationship(EquipmentRole, back_populates="equipment", secondary=equipmentrole_equipment) + + # tips = relationship("Tips", back_populates="equipment", + # secondary=equipment_tips) #: relation to Processes + + equipmentprocedureassociation = relationship( + "ProcedureEquipmentAssociation", + back_populates="equipment", + cascade="all, delete-orphan", + ) #: Association with BasicRun + + procedure = association_proxy("equipmentprocedureassociation", + "procedure") #: proxy to equipmentprocedureassociation.procedure + + equipmentequipmentroleassociation = relationship( + "EquipmentRoleEquipmentAssociation", + back_populates="equipment", + cascade="all, delete-orphan", + ) + + equipmentrole = association_proxy("equipmentequipmentroleassociation", + "equipmentrole", creator=lambda equipmentrole: EquipmentRoleEquipmentAssociation(equipmentrole=equipmentrole) + ) + + def __init__(self, name: str, nickname: str | None = None, asset_number: str = ""): + self.name = name + if nickname: + self.nickname = nickname + else: + self.nickname = self.name + self.asset_number = asset_number + + def to_dict(self, processes: bool = False) -> dict: + """ + This Equipment as a dictionary + + Args: + processes (bool, optional): Whether to include process. Defaults to False. + + Returns: + dict: Dictionary representation of this equipment + """ + if not processes: + return {k: v for k, v in self.__dict__.items() if k != 'process'} + else: + return {k: v for k, v in self.__dict__.items()} + + # def get_processes(self, #proceduretype: str | ProcedureType | None = None, + # # kittype: str | KitType | None = None, + # equipmentrole: str | EquipmentRole | None = None) -> Generator[Process, None, None]: + # """ + # Get all process associated with this Equipment for a given SubmissionType + # + # Args: + # proceduretype (ProcedureType): SubmissionType of interest + # kittype (str | KitType | None, optional): KitType to filter by. Defaults to None. + # + # Returns: + # List[Process]: List of process names + # """ + # # if isinstance(proceduretype, str): + # # proceduretype = ProcedureType.query(name=proceduretype) + # for er in self.equipmentrole: + # for process in + # logger.debug(f"Getting process: {process}") + # yield process + + @classmethod + @setup_lookup + def query(cls, + id: int | None = None, + name: str | None = None, + nickname: str | None = None, + asset_number: str | None = None, + limit: int = 0 + ) -> Equipment | List[Equipment]: + """ + Lookup a list of or single Equipment. + + Args: + name (str | None, optional): Equipment name. Defaults to None. + nickname (str | None, optional): Equipment nickname. Defaults to None. + asset_number (str | None, optional): Equipment asset number. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + Equipment|List[Equipment]: Equipment or list of equipment matching query parameters. + """ + query = cls.__database_session__.query(cls) + match id: + case int(): + query = query.filter(cls.id == id) + limit = 1 + case _: + pass + match name: + case str(): + query = query.filter(cls.name == name) + limit = 1 + case _: + pass + match nickname: + case str(): + query = query.filter(cls.nickname == nickname) + limit = 1 + case _: + pass + match asset_number: + case str(): + query = query.filter(cls.asset_number == asset_number) + limit = 1 + case _: + pass + return cls.execute_query(query=query, limit=limit) + + def to_pydantic(self, equipmentrole: str = None) -> "PydEquipment": + """ + Creates PydEquipment of this Equipment + + Args: + proceduretype (ProcedureType): Relevant SubmissionType + kittype (str | KitType | None, optional): Relevant KitType. Defaults to None. + + Returns: + PydEquipment: pydantic equipment object + """ + from backend.validators.pydant import PydEquipment + creation_dict = self.details_dict() + processes = self.get_processes(equipmentrole=equipmentrole) + logger.debug(f"Processes: {processes}") + creation_dict['process'] = processes + logger.debug(f"EquipmentRole: {equipmentrole}") + creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole'] + # return PydEquipment(process=processes, equipmentrole=equipmentrole, + # **self.to_dict(processes=False)) + logger.debug(f"Creating pydequipment with {pformat(creation_dict)}") + return PydEquipment(**creation_dict) + + @classproperty + def manufacturer_regex(cls) -> re.Pattern: + """ + Creates regex to determine tip manufacturer + + Returns: + re.Pattern: regex + """ + return re.compile(r""" + (?P50\d{5}$)| + (?PHC-\d{6}$)| + (?P[^\d][A-Z0-9]{6}$)| + (?P[A-Z]{3}-\d{2}-[A-Z]-[A-Z]$)| + (?P\d{4}-\d{3}-\d{3}-\d$)""", + re.VERBOSE) + + @classmethod + def assign_equipment(cls, equipmentrole: EquipmentRole | str) -> List[Equipment]: + """ + Creates a list of equipment from user input to be used in Submission Type creation + + Args: + equipmentrole (EquipmentRole): Equipment reagentrole to be added to. + + Returns: + List[Equipment]: User selected equipment. + """ + if isinstance(equipmentrole, str): + equipmentrole = EquipmentRole.query(name=equipmentrole) + equipment = cls.query() + options = "\n".join([f"{ii}. {item.name}" for ii, item in enumerate(equipment)]) + choices = input(f"Enter equipment numbers to add to {equipmentrole.name} (space separated):\n{options}\n\n") + output = [] + for choice in choices.split(" "): + try: + choice = int(choice) + except (AttributeError, ValueError): + continue + output.append(equipment[choice]) + return output + + def get_processes(self, equipmentrole: str): + output = [] + for assoc in self.equipmentequipmentroleassociation: + if assoc.equipmentrole.name != equipmentrole: + continue + # logger.debug(pformat(assoc.process.details_dict())) + output.append(assoc.process.to_pydantic()) + return output + + # def details_dict(self, **kwargs): + # output = super().details_dict(**kwargs) + # for key in ["proceduretype", "equipmentroleproceduretypeassociation", "equipmentrole"]: + # try: + # del output[key] + # except KeyError: + # pass + # return output + + # def to_sub_dict(self, full_data: bool = False, **kwargs) -> dict: + # """ + # dictionary containing values necessary for gui + # + # Args: + # full_data (bool, optional): Whether to include procedure in data for details. Defaults to False. + # + # Returns: + # dict: representation of the equipment's attributes + # """ + # if self.nickname: + # nickname = self.nickname + # else: + # nickname = self.name + # output = dict( + # name=self.name, + # nickname=nickname, + # asset_number=self.asset_number + # ) + # if full_data: + # subs = [dict(plate=item.procedure.procedure.rsl_plate_number, process=item.process.name, + # sub_date=item.procedure.procedure.start_date) + # if item.process else dict(plate=item.procedure.procedure.rsl_plate_number, process="NA") + # for item in self.equipmentprocedureassociation] + # output['procedure'] = sorted(subs, key=itemgetter("sub_date"), reverse=True) + # output['excluded'] = ['missing', 'procedure', 'excluded', 'editable'] + # return output + + # @classproperty + # def details_template(cls) -> Template: + # """ + # Get the details jinja template for the correct class + # + # Args: + # base_dict (dict): incoming dictionary of Submission fields + # + # Returns: + # Tuple(dict, Template): (Updated dictionary, Template to be rendered) + # """ + # env = jinja_template_loading() + # temp_name = f"{cls.__name__.lower()}_details.html" + # try: + # template = env.get_template(temp_name) + # except TemplateNotFound as e: + # logger.error(f"Couldn't find template {e}") + # template = env.get_template("equipment_details.html") + # return template + + +class EquipmentRoleEquipmentAssociation(BaseClass): + + equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated reagent + equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated procedure + process_id = Column(INTEGER, ForeignKey("_process.id")) + + equipmentrole = relationship("EquipmentRole", + back_populates="equipmentroleequipmentassociation") #: associated procedure + + equipment = relationship("Equipment", + back_populates="equipmentequipmentroleassociation") #: associated procedure + + process = relationship("Process", + back_populates="equipmentroleeequipmentassociation") #: associated procedure + + def details_dict(self, **kwargs) -> dict: + output = super().details_dict(**kwargs) + output['equipment'] = self.equipment.details_dict() + output['equipment']['process'] = [item.details_dict() for item in self.process.processversion if bool(item.active)] return output @@ -2620,20 +2674,22 @@ class Process(BaseClass): id = Column(INTEGER, primary_key=True) #: Process id, primary key name = Column(String(64), unique=True) #: Process name - proceduretype = relationship("ProcedureType", back_populates='process', - secondary=proceduretype_process) #: relation to SubmissionType - equipment = relationship("Equipment", back_populates='process', - secondary=equipment_process) #: relation to Equipment - equipmentrole = relationship("EquipmentRole", back_populates='process', - secondary=equipmentrole_process) #: relation to EquipmentRoles + # proceduretype = relationship("ProcedureType", back_populates='process', + # secondary=proceduretype_process) #: relation to SubmissionType + # equipment = relationship("Equipment", back_populates='process', + # secondary=equipment_process) #: relation to Equipment + # equipment = relationship("EquipmentRole", back_populates='process', + # secondary=equipmentrole_process) #: relation to EquipmentRoles # kittype = relationship("KitType", back_populates='process', # secondary=kittype_process) #: relation to KitType - tiprole = relationship("TipRole", back_populates='process', - secondary=process_tiprole) #: relation to KitType + tips = relationship("Tips", back_populates='process', + secondary=process_tips) #: relation to KitType processversion = relationship("ProcessVersion", back_populates="process") + equipmentroleeequipmentassociation = relationship("EquipmentRoleEquipmentAssociation", back_populates="process") + def set_attribute(self, key, value): match key: case "name": @@ -2736,19 +2792,14 @@ class Process(BaseClass): output = super().details_dict(**kwargs) output['processversion'] = [item.details_dict() for item in self.processversion] # logger.debug(f"Process output dict: {pformat(output)}") + tips = flatten_list([tipslot for tipslot in [tips.tipslot for tips in self.tips]]) + output['tips'] = [tipslot.details_dict() for tipslot in tips] return output - # def to_pydantic(self): - # from backend.validators.pydant import PydProcess - # output = {} - # for k, v in self.details_dict().items(): - # if isinstance(v, list): - # output[k] = [item.name if issubclass(item.__class__, BaseClass) else item for item in v] - # elif issubclass(v.__class__, BaseClass): - # output[k] = v.name - # else: - # output[k] = v - # return PydProcess(**output) + def to_pydantic(self): + output = super().to_pydantic() + # output.tips = [[tipslot.to_pydantic() for tipslot in tips] for tips in self.tips] + return output class ProcessVersion(BaseClass): @@ -2756,11 +2807,12 @@ class ProcessVersion(BaseClass): version = Column(FLOAT(2), default=1.00) date_verified = Column(TIMESTAMP) project = Column(String(128)) + active = Column(INTEGER, default=1) process = relationship("Process", back_populates="processversion") process_id = Column(INTEGER, ForeignKey("_process.id", ondelete="SET NULL", name="fk_version_process_id")) - procedure = relationship("ProcedureEquipmentAssociation", - backref='process') #: relation to RunEquipmentAssociation + procedureequipmentassociation = relationship("ProcedureEquipmentAssociation", + back_populates ='processversion') #: relation to RunEquipmentAssociation @property def name(self) -> str: @@ -2771,6 +2823,8 @@ class ProcessVersion(BaseClass): output['name'] = self.name if not output['project']: output['project'] = "" + output['tips'] = flatten_list([[lot.details_dict() for lot in tips.tipslot if bool(lot.active)] for tips in self.process.tips]) + # logger.debug(f"Tips for {self.name} -\n\n {pformat(output['tips'])}") return output def set_attribute(self, key, value): @@ -2801,13 +2855,17 @@ class Tips(BaseClass): An abstract reagentrole that a tip fills during a process """ id = Column(INTEGER, primary_key=True) #: primary key - name = Column(String(64)) #: name of reagent type - tipslot = relationship("TipsLot", back_populates="tips", - secondary=tiprole_tips) #: concrete control of this reagent type + # name = Column(String(64)) #: name of reagent type + tipslot = relationship("TipsLot", back_populates="tips") #: concrete control of this reagent type manufacturer = Column(String(64)) capacity = Column(INTEGER) ref = Column(String(64)) #: tip reference number - process = relationship("Process", back_populates="tips", secondary=process_tiprole) + process = relationship("Process", back_populates="tips", secondary=process_tips) + + + @property + def name(self): + return f"{self.manufacturer}-{self.ref}" # tiproleproceduretypeassociation = relationship( # "ProcedureTypeTipRoleAssociation", @@ -2838,7 +2896,7 @@ class Tips(BaseClass): def query(cls, name: str | None = None, limit: int = 0, - **kwargs) -> TipRole | List[TipRole]: + **kwargs) -> Tips | List[Tips]: query = cls.__database_session__.query(cls) match name: case str(): @@ -2864,18 +2922,21 @@ class Tips(BaseClass): tips=tips ) + # def details_dict(self, **kwargs) -> dict: + # output = super().details_dict(**kwargs) + # return output class TipsLot(BaseClass, LogMixin): """ A concrete instance of tips. """ id = Column(INTEGER, primary_key=True) #: primary key - tips = relationship("Tips", back_populates="tipslot", - secondary=tiprole_tips) #: joined parent reagent type + tips = relationship("Tips", back_populates="tipslot") #: joined parent reagent type tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete='SET NULL', name="fk_tips_id")) #: id of parent reagent type lot = Column(String(64), unique=True) expiry = Column(TIMESTAMP) + active = Column(INTEGER, default=1) # lot = Column(String(64)) #: lot number of tips # equipment = relationship("Equipment", back_populates="tips", @@ -2888,15 +2949,13 @@ class TipsLot(BaseClass, LogMixin): # # procedure = association_proxy("tipsprocedureassociation", 'procedure') - procedureequipmentassociation = - @property def size(self) -> str: return f"{self.capacity}ul" @property def name(self) -> str: - return f"{self.manufacturer}-{self.size}-{self.ref}" + return f"{self.tips.manufacturer}-{self.tips.capacity}-{self.lot}" # @classmethod # def query_or_create(cls, **kwargs) -> Tuple[Tips, bool]: @@ -2993,10 +3052,14 @@ class TipsLot(BaseClass, LogMixin): # template = env.get_template("tips_details.html") # return template - def to_pydantic(self, **kwargs): - output = super().to_pydantic() - return output + # def to_pydantic(self, **kwargs): + # output = super().to_pydantic() + # return output + def details_dict(self, **kwargs) -> dict: + output = super().details_dict() + output['name'] = self.name + return output class ProcedureEquipmentAssociation(BaseClass): """ @@ -3017,12 +3080,12 @@ class ProcedureEquipmentAssociation(BaseClass): equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment - processversion = relationship(ProcessVersion) + processversion = relationship(ProcessVersion, back_populates="procedureequipmentassociation") - tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete="SET NULL", - name="SEA_Process_id")) + tipslot_id = Column(INTEGER, ForeignKey("_tipslot.id", ondelete="SET NULL", + name="SEA_Tipslot_id")) - tips = relationship(Tips) + tipslot = relationship(TipsLot) def __repr__(self) -> str: try: @@ -3128,7 +3191,7 @@ class ProcedureEquipmentAssociation(BaseClass): output['equipment_role'] = self.equipmentrole output['processversion'] = self.processversion.details_dict() try: - output['tips'] = self.tips.details_dict() + output['tips'] = self.tipslot.details_dict() except AttributeError: output['tips'] = None return output @@ -3139,21 +3202,18 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): Abstract association between SubmissionType and EquipmentRole """ equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment - proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), - primary_key=True) #: id of associated procedure - # kittype_id = Column(INTEGER, ForeignKey("_kittype.id"), - # primary_key=True) + proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated procedure uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. - static = Column(INTEGER, - default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? - + static = Column(INTEGER, default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? proceduretype = relationship(ProcedureType, - back_populates="proceduretypeequipmentroleassociation") #: associated procedure - + back_populates="proceduretypeequipmentroleassociation", + foreign_keys=[proceduretype_id]) #: associated procedure equipmentrole = relationship(EquipmentRole, - back_populates="equipmentroleproceduretypeassociation") #: associated equipment - + back_populates="equipmentroleproceduretypeassociation", + foreign_keys=[equipmentrole_id]) #: associated equipment + # equipment = relationship("Equipment", back_populates="equipmentroleproceduretypeassociation", + # secondary=proceduretypeequipmentroleassociation_equipment) #: Concrete control (Equipment) of reagentrole @validates('static') def validate_static(self, key, value): @@ -3179,27 +3239,27 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): super().save() -class ProcedureTypeTipRoleAssociation(BaseClass): - """ - Abstract association between SubmissionType and TipRole - """ - tiprole_id = Column(INTEGER, ForeignKey("_tiprole.id"), primary_key=True) #: id of associated equipment - proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), - primary_key=True) #: id of associated procedure - uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. - static = Column(INTEGER, - default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? - proceduretype = relationship(ProcedureType, - back_populates="proceduretypetiproleassociation") #: associated procedure - tiprole = relationship(TipRole, - back_populates="tiproleproceduretypeassociation") #: associated equipment - - @check_authorization - def save(self): - super().save() - - def to_omni(self): - pass +# class ProcedureTypeTipRoleAssociation(BaseClass): +# """ +# Abstract association between SubmissionType and TipRole +# """ +# tiprole_id = Column(INTEGER, ForeignKey("_tiprole.id"), primary_key=True) #: id of associated equipment +# proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), +# primary_key=True) #: id of associated procedure +# uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. +# static = Column(INTEGER, +# default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? +# proceduretype = relationship(ProcedureType, +# back_populates="proceduretypetiproleassociation") #: associated procedure +# tiprole = relationship(TipRole, +# back_populates="tiproleproceduretypeassociation") #: associated equipment +# +# @check_authorization +# def save(self): +# super().save() +# +# def to_omni(self): +# pass # class ProcedureTipsAssociation(BaseClass): diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 9340bd9..a903a5f 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -340,24 +340,24 @@ class PydSample(PydBaseClass): class PydTips(PydBaseClass): name: str lot: str | None = Field(default=None) - tiprole: str + # tips: str - @field_validator('tiprole', mode='before') - @classmethod - def get_role_name(cls, value): - if isinstance(value, list): - output = [] - for tiprole in value: - if isinstance(tiprole, TipRole): - tiprole = tiprole.name - return tiprole - value = output - if isinstance(value, TipRole): - value = value.name - return value + # @field_validator('tips', mode='before') + # @classmethod + # def get_role_name(cls, value): + # if isinstance(value, list): + # output = [] + # for tips in value: + # if isinstance(tips, Tips): + # tips = tips.name + # output.append(tips) + # value = output + # if isinstance(value, Tips): + # value = value.name + # return value @report_result - def to_sql(self, procedure: Run) -> ProcedureTipsAssociation: + def to_sql(self, procedure: Run) -> Tuple[Tips, Report]: """ Convert this object to the SQL version for database storage. @@ -368,11 +368,11 @@ class PydTips(PydBaseClass): SubmissionTipsAssociation: Association between queried tips and procedure """ report = Report() - tips = Tips.query(name=self.name, limit=1) + tips = TipsLot.query(name=self.name, limit=1) # logger.debug(f"Tips query has yielded: {tips}") - assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1) - logger.debug(f"Got association: {assoc}") - return assoc, report + # assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1) + # logger.debug(f"Got association: {assoc}") + return tips, report class PydEquipment(PydBaseClass): @@ -382,7 +382,7 @@ class PydEquipment(PydBaseClass): # process: List[dict] | None process: List[PydProcess] | PydProcess | None equipmentrole: str | PydEquipmentRole | None - tips: List[PydTips] | PydTips | None = Field(default=[]) + # tips: List[PydTips] | PydTips | None = Field(default=[]) @field_validator('equipmentrole', mode='before') @classmethod @@ -408,42 +408,42 @@ class PydEquipment(PydBaseClass): value = convert_nans_to_nones(value) if not value: value = [] - logger.debug(value) try: # value = [item.strip() for item in value] d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None) - # logger.debug(f"Next process: {d.details_dict()}") - value = d.to_pydantic() - # value = PydProcess(**d.details_dict()) + print(f"Next process: {d.details_dict()}") + if d: + # value = PydProcess(**d.details_dict()) + value = d.to_pydantic() # value = next((process.to_pydantic() for process in value)) except AttributeError as e: logger.error(f"Process Validation error due to {e}") pass return value - @field_validator('tips', mode='before') - @classmethod - def tips_to_pydantic(cls, value): - match value: - case list(): - output = [] - for tips in value: - match tips: - case Tips(): - tips = tips.to_pydantic() - case dict(): - tips = PydTips(**tips) - case _: - continue - output.append(tips) - case _: - output = value - return output - - @field_validator('tips') - @classmethod - def single_out_tips(cls, value, values): - return value + # @field_validator('tips', mode='before') + # @classmethod + # def tips_to_pydantic(cls, value): + # match value: + # case list(): + # output = [] + # for tips in value: + # match tips: + # case Tips(): + # tips = tips.to_pydantic() + # case dict(): + # tips = PydTips(**tips) + # case _: + # continue + # output.append(tips) + # case _: + # output = value + # return output + # + # @field_validator('tips') + # @classmethod + # def single_out_tips(cls, value, values): + # return value @report_result def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[ @@ -483,10 +483,10 @@ class PydEquipment(PydBaseClass): if len(self.processes) > 1: process = Process.query(proceduretype=procedure.get_submission_type(), equipmentrole=self.role) else: - process = Process.query(name=self.processes[0]) + process = Process.query(name=self.processes[0], limit=1) if process is None: logger.error(f"Found unknown process: {process}.") - # logger.debug(f"Using process: {process}") + logger.debug(f"Using process: {process}") assoc.process = process assoc.equipmentrole = self.equipmentrole else: @@ -1173,13 +1173,10 @@ class PydEquipmentRole(BaseModel): class PydProcess(PydBaseClass, extra="allow"): name: str version: str = Field(default="1") - proceduretype: List[str] - equipment: List[str] - equipmentrole: List[str] - # kittype: List[str] - tiprole: List[str] + # equipment: List[str] + tips: List[PydTips] - @field_validator("proceduretype", "equipment", "equipmentrole", "tiprole", mode="before") + @field_validator("tips", mode="before") @classmethod def enforce_list(cls, value): # logger.debug(f"Validating field: {value}") @@ -1193,6 +1190,14 @@ class PydProcess(PydBaseClass, extra="allow"): output.append(v) return output + @field_validator("tips", mode="before") + @classmethod + def validate_tips(cls, value): + if not value: + return [] + value = [item for item in value if item] + return value + @report_result def to_sql(self): report = Report() @@ -1202,30 +1207,6 @@ class PydProcess(PydBaseClass, extra="allow"): logger.debug(f"Got instance: {instance}") if not instance: instance = ProcessVersion() - # fields = [item for item in self.model_fields] - # for field in fields: - # logger.debug(f"Field: {field}") - # try: - # field_type = getattr(instance.__class__, field).property - # except AttributeError: - # logger.error(f"No attribute: {field} in {instance.__class__}") - # continue - # match field_type: - # case _RelationshipDeclared(): - # logger.debug(f"{field} is a relationship with {field_type.entity.class_}") - # query_str = getattr(self, field) - # if isinstance(query_str, list): - # query_str = query_str[0] - # if query_str in ["", " ", None]: - # continue - # logger.debug(f"Querying {field_type.entity.class_} with name {query_str}") - # field_value = field_type.entity.class_.query(name=query_str) - # logger.debug(f"{field} query result: {field_value}") - # case ColumnProperty(): - # logger.debug(f"{field} is a property.") - # field_value = getattr(self, field) - # # instance.set_attribute(key=field, value=field_value) - # setattr(instance, field, field_value) return instance, report @@ -1570,6 +1551,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): # equip = Equipment.query(name=equipment.name) equip, _ = equipment.to_sql() logger.debug(f"Process: {equipment.process}") + if isinstance(equipment.process, list): + equipment.process = equipment.process[0] if equip not in sql.equipment: equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql, equipmentrole=equip.equipmentrole[0]) diff --git a/src/submissions/frontend/widgets/equipment_usage_2.py b/src/submissions/frontend/widgets/equipment_usage_2.py index 3749cf6..7dc4755 100644 --- a/src/submissions/frontend/widgets/equipment_usage_2.py +++ b/src/submissions/frontend/widgets/equipment_usage_2.py @@ -1,6 +1,7 @@ ''' Creates forms that the user can enter equipment info into. ''' +import sys from pprint import pformat from PyQt6.QtCore import Qt, QSignalBlocker, pyqtSlot from PyQt6.QtWebChannel import QWebChannel @@ -98,6 +99,7 @@ class EquipmentUsage(QDialog): @pyqtSlot(str, str, str, str) def update_equipment(self, equipmentrole: str, equipment: str, process: str, tips: str): + try: equipment_of_interest = next( (item for item in self.procedure.equipment if item.equipmentrole == equipmentrole)) diff --git a/src/submissions/frontend/widgets/procedure_creation.py b/src/submissions/frontend/widgets/procedure_creation.py index 5837424..47cc520 100644 --- a/src/submissions/frontend/widgets/procedure_creation.py +++ b/src/submissions/frontend/widgets/procedure_creation.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Any, List if TYPE_CHECKING: from backend.db.models import Run, Procedure - from backend.validators import PydProcedure + from backend.validators import PydProcedure, PydEquipment from tools import jinja_template_loading, get_application_from_parent, render_details_template, sanitize_object_for_json logger = logging.getLogger(f"submissions.{__name__}") @@ -92,7 +92,7 @@ class ProcedureCreation(QDialog): equipmentrole['equipment'].index(item_in_er_list))) proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True) proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']] - + logger.debug(proceduretype_dict['equipment']) self.update_equipment = EquipmentUsage.update_equipment regex = re.compile(r".*R\d$") proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))] @@ -124,18 +124,26 @@ class ProcedureCreation(QDialog): if equipment_of_interest: eoi = self.procedure.equipment.pop(self.procedure.equipment.index(equipment_of_interest)) else: - eoi = equipment.to_pydantic(equipmentrole=equipmentrole, proceduretype=self.procedure.proceduretype) + eoi: PydEquipment = equipment.to_pydantic(equipmentrole=equipmentrole) eoi.name = equipment.name eoi.asset_number = equipment.asset_number eoi.nickname = equipment.nickname - process = next((prcss for prcss in equipment.process if prcss.name == process), None) - if process: - eoi.process = process.to_pydantic() - tips = next((tps for tps in equipment.tips if tps.name == tips), None) - if tips: - eoi.tips = tips.to_pydantic() + logger.warning("Setting processes.") + eoi.process = [process for process in equipment.get_processes(equipmentrole=equipmentrole)] + # process = next((prcss for prcss in equipment.process if prcss.name == process), None) + # if process: + # for process in equipment.process: + # logger.debug(f"Retrieved process: {process}") + # # tips = [tip.to_pydantic() for tip in process.tips] + # # if tips: + # # process.tips = tips + # # else: + # # process.tips = [""] + # eoi.process.append(process.to_pydantic()) + # # tips = next((tps for tps in process.tips if tps.name == tips), None) + self.procedure.equipment.append(eoi) - logger.debug(f"Updated equipment: {self.procedure.equipment}") + logger.debug(f"Updated equipment: {pformat(self.procedure.equipment)}") @pyqtSlot(str, str) def text_changed(self, key: str, new_value: str): diff --git a/src/submissions/frontend/widgets/submission_details.py b/src/submissions/frontend/widgets/submission_details.py index 48cf1cf..8b7d68c 100644 --- a/src/submissions/frontend/widgets/submission_details.py +++ b/src/submissions/frontend/widgets/submission_details.py @@ -62,7 +62,7 @@ class SubmissionDetails(QDialog): css = f.read() key = object.__class__.__name__.lower() d = {key: details} - logger.debug(f"Using details: {pformat(d)}") + logger.debug(f"Using details: {pformat(d['procedure']['equipment'])}") html = template.render(**d, css=[css]) self.webview.setHtml(html) self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}") diff --git a/src/submissions/frontend/widgets/submission_table.py b/src/submissions/frontend/widgets/submission_table.py index d21e64b..5aec320 100644 --- a/src/submissions/frontend/widgets/submission_table.py +++ b/src/submissions/frontend/widgets/submission_table.py @@ -12,7 +12,8 @@ from PyQt6.QtGui import QAction, QCursor, QStandardItemModel, QStandardItem, QIc from typing import Dict, List # from backend import Procedure -from backend.db.models import Run, ClientSubmission, Procedure +from backend.db.models.submissions import Run, ClientSubmission +from backend.db.models.procedures import Procedure from tools import Report, Result, report_result, get_application_from_parent from .functions import select_open_file diff --git a/src/submissions/templates/support/equipment_list.html b/src/submissions/templates/support/equipment_list.html index c303709..6744848 100644 --- a/src/submissions/templates/support/equipment_list.html +++ b/src/submissions/templates/support/equipment_list.html @@ -1,7 +1,7 @@ {{ equipment['equipmentrole'] }} {{ equipment['name'] }} - {{ equipment['process']['name'] }} + {{ equipment['processversion']['name'] }} {% if equipment['tips'] %} {{ equipment['tips']['name'] }} {% else %}