Adding equipment usage.

This commit is contained in:
lwark
2025-06-10 10:29:01 -05:00
parent 10d4c9f155
commit 58c669bc30
12 changed files with 145 additions and 88 deletions

View File

@@ -1320,11 +1320,20 @@ class Procedure(BaseClass):
Returns: Returns:
dict: dictionary of functions dict: dictionary of functions
""" """
names = ["Add Results", "Edit", "Add Comment", "Show Details", "Delete"] names = ["Add Results", "Add Equipment", "Edit", "Add Comment", "Show Details", "Delete"]
return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names} return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names}
def add_results(self, obj, resultstype_name:str): def add_results(self, obj, resultstype_name:str):
logger.debug(f"Add Results! {resultstype_name}") logger.debug(f"Add Results! {resultstype_name}")
from frontend.widgets import results
results_class = getattr(results, resultstype_name)
rs = results_class(procedure=self, parent=obj)
def add_equipment(self, obj):
from frontend.widgets.equipment_usage import EquipmentUsage
dlg = EquipmentUsage(parent=obj, procedure=self)
if dlg.exec():
pass
def edit(self, obj): def edit(self, obj):
logger.debug("Edit!") logger.debug("Edit!")
@@ -2668,7 +2677,6 @@ class Results(BaseClass):
procedure = relationship("Procedure", back_populates="results") procedure = relationship("Procedure", back_populates="results")
assoc_id = Column(INTEGER, ForeignKey("_proceduresampleassociation.id", ondelete='SET NULL', assoc_id = Column(INTEGER, ForeignKey("_proceduresampleassociation.id", ondelete='SET NULL',
name="fk_RES_ASSOC_id")) name="fk_RES_ASSOC_id"))
sampleprocedureassociation = relationship("ProcedureSampleAssociation", back_populates="results") sampleprocedureassociation = relationship("ProcedureSampleAssociation", back_populates="results")
_img = Column(String(128)) _img = Column(String(128))

View File

@@ -314,6 +314,7 @@ class ClientSubmission(BaseClass, LogMixin):
assoc.save() assoc.save()
else: else:
logger.warning("Run cancelled.") logger.warning("Run cancelled.")
obj.set_data()
def edit(self, obj): def edit(self, obj):
@@ -1138,8 +1139,7 @@ class Run(BaseClass, LogMixin):
sql, _ = dlg.return_sql() sql, _ = dlg.return_sql()
logger.debug(f"Output run samples:\n{pformat(sql.run.sample)}") logger.debug(f"Output run samples:\n{pformat(sql.run.sample)}")
sql.save() sql.save()
obj.set_data()
def delete(self, obj=None): def delete(self, obj=None):
""" """
@@ -2029,7 +2029,7 @@ class RunSampleAssociation(BaseClass):
class ProcedureSampleAssociation(BaseClass): class ProcedureSampleAssociation(BaseClass):
id = Column(INTEGER, primary_key=True) id = Column(INTEGER, unique=True, nullable=False)
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
row = Column(INTEGER) row = Column(INTEGER)
@@ -2042,15 +2042,34 @@ class ProcedureSampleAssociation(BaseClass):
results = relationship("Results", back_populates="sampleprocedureassociation") results = relationship("Results", back_populates="sampleprocedureassociation")
@classmethod
def query(cls, sample: Sample|str|None=None, procedure: Procedure|str|None=None, limit: int=0, **kwargs):
query = cls.__database_session__.query(cls)
match sample:
case Sample():
query = query.filter(cls.sample==sample)
case str():
query = query.join(Sample).filter(Sample.sample_id==sample)
case _:
pass
match procedure:
case Procedure():
query = query.filter(cls.procedure == procedure)
case str():
query = query.join(Procedure).filter(Procedure.name == procedure)
case _:
pass
if sample and procedure:
limit = 1
return cls.execute_query(query=query, limit=limit, **kwargs)
# def __init__(self, new_id:int|None=None, **kwarg): def __init__(self, new_id:int|None=None, **kwarg):
# if new_id: if new_id:
# self.id = new_id self.id = new_id
# else: else:
# self.id = self.__class__.autoincrement_id() self.id = self.__class__.autoincrement_id()
# # new_id = self.__class__.autoincrement_id() super().__init__(**kwarg)
# super().__init__(**kwarg)
@classmethod @classmethod

View File

@@ -7,6 +7,7 @@ from typing import Generator, Tuple
from openpyxl import load_workbook from openpyxl import load_workbook
from pandas import DataFrame from pandas import DataFrame
from backend.validators import pydant from backend.validators import pydant
from backend.db.models import Procedure
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -15,7 +16,8 @@ class DefaultParser(object):
def __repr__(self): def __repr__(self):
return f"{self.__class__.__name__}<{self.filepath.stem}>" return f"{self.__class__.__name__}<{self.filepath.stem}>"
def __init__(self, filepath: Path | str, range_dict: dict | None = None, *args, **kwargs): def __init__(self, filepath: Path | str, procedure: Procedure|None=None, range_dict: dict | None = None, *args, **kwargs):
self.procedure = procedure
try: try:
self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}") self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}")
except AttributeError: except AttributeError:

View File

@@ -8,44 +8,14 @@ from typing import Generator, Tuple
from openpyxl import load_workbook from openpyxl import load_workbook
from backend.db.models import Run, Sample from backend.db.models import Run, Sample, Procedure, ProcedureSampleAssociation
from . import DefaultKEYVALUEParser, DefaultTABLEParser from . import DefaultKEYVALUEParser, DefaultTABLEParser
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class PCRSampleParser(DefaultTABLEParser):
"""Object to pull data from Design and Analysis PCR export file."""
default_range_dict = [dict(
header_row=25,
sheet="Results"
)]
@property
def parsed_info(self):
output = [item for item in super().parsed_info]
merge_column = "sample"
sample_names = list(set([item['sample'] for item in output]))
for sample in sample_names:
multi = dict()
sois = (item for item in output if item['sample']==sample)
for soi in sois:
multi[soi['target']] = {k:v for k, v in soi.items() if k != "target"}
yield (sample, multi)
def to_pydantic(self):
for key, sample_info in self.parsed_info:
sample_obj = Sample.query(sample_id=key)
if sample_obj and not isinstance(sample_obj, list):
yield self._pyd_object(results=sample_info, parent=sample_obj)
else:
continue
class PCRInfoParser(DefaultKEYVALUEParser): class PCRInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict( default_range_dict = [dict(
start_row=1, start_row=1,
end_row=24, end_row=24,
@@ -72,11 +42,10 @@ class PCRInfoParser(DefaultKEYVALUEParser):
# #
def to_pydantic(self): def to_pydantic(self):
from backend.db.models import Procedure # from backend.db.models import Procedure
data = {key: value for key, value in self.parsed_info} data = {key: value for key, value in self.parsed_info}
data['filepath'] = self.filepath data['filepath'] = self.filepath
return self._pyd_object(**data, parent=Procedure) return self._pyd_object(**data, parent=self.procedure)
# @property # @property
# def pcr_info(self) -> dict: # def pcr_info(self) -> dict:
@@ -97,3 +66,36 @@ class PCRInfoParser(DefaultKEYVALUEParser):
# pcr[key] = value # pcr[key] = value
# pcr['imported_by'] = getuser() # pcr['imported_by'] = getuser()
# return pcr # return pcr
class PCRSampleParser(DefaultTABLEParser):
"""Object to pull data from Design and Analysis PCR export file."""
default_range_dict = [dict(
header_row=25,
sheet="Results"
)]
@property
def parsed_info(self):
output = [item for item in super().parsed_info]
merge_column = "sample"
sample_names = list(set([item['sample'] for item in output]))
for sample in sample_names:
multi = dict()
sois = [item for item in output if item['sample'] == sample]
for soi in sois:
multi[soi['target']] = {k: v for k, v in soi.items() if k != "target" and k != "sample"}
yield {sample: multi}
def to_pydantic(self):
logger.debug(f"running to pydantic")
for item in self.parsed_info:
sample_obj = Sample.query(sample_id=list(item.keys())[0])
logger.debug(f"Sample object {sample_obj}")
assoc = ProcedureSampleAssociation.query(sample=sample_obj, procedure=self.procedure)
if assoc and not isinstance(assoc, list):
yield self._pyd_object(results=list(item.values())[0], parent=assoc)
else:
continue

View File

@@ -1436,9 +1436,15 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql.run = self.run sql.run = self.run
if self.proceduretype: if self.proceduretype:
sql.proceduretype = self.proceduretype sql.proceduretype = self.proceduretype
for sample in self.samples: try:
if sample.sample_id.startswith("blank_") or sample.sample_id == "": start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1
continue except ValueError:
start_index = 1
relevant_samples = [sample for sample in self.samples if not sample.sample_id.startswith("blank_") and not sample.sample_id == ""]
logger.debug(f"start index: {start_index}")
assoc_id_range = range(start_index, start_index + len(relevant_samples)+1)
logger.debug(f"Association id range: {assoc_id_range}")
for iii, sample in enumerate(relevant_samples):
sample_sql = sample.to_sql() sample_sql = sample.to_sql()
if sql.run: if sql.run:
if sample_sql not in sql.run.sample: if sample_sql not in sql.run.sample:
@@ -1446,7 +1452,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, column=sample.column) run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, column=sample.column)
else: else:
logger.debug(f"sample {sample_sql} found in {sql.run.sample}") logger.debug(f"sample {sample_sql} found in {sql.run.sample}")
proc_assoc = ProcedureSampleAssociation(procedure=sql, sample=sample_sql, row=sample.row, column=sample.column) proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, row=sample.row, column=sample.column)
if self.kittype['value'] not in ["NA", None, ""]: if self.kittype['value'] not in ["NA", None, ""]:
kittype = KitType.query(name=self.kittype['value'], limit=1) kittype = KitType.query(name=self.kittype['value'], limit=1)
if kittype: if kittype:
@@ -1535,5 +1541,17 @@ class PydClientSubmission(PydBaseClass):
class PydResults(PydBaseClass, arbitrary_types_allowed=True): class PydResults(PydBaseClass, arbitrary_types_allowed=True):
results: dict = Field(default={}) results: dict = Field(default={})
parent: Sample|Procedure img: None = Field(default=None)
parent: Procedure|ProcedureSampleAssociation|None = Field(default=None)
def to_sql(self):
sql = Results(result=self.results)
match self.parent:
case ProcedureSampleAssociation():
sql.sampleprocedureassociation = self.parent
case Procedure():
sql.procedure = self.parent
case _:
logger.error("Improper association found.")
return sql

View File

@@ -19,7 +19,7 @@ class EquipmentUsage(QDialog):
def __init__(self, parent, procedure: Procedure): def __init__(self, parent, procedure: Procedure):
super().__init__(parent) super().__init__(parent)
self.procedure = procedure self.procedure = procedure
self.setWindowTitle(f"Equipment Checklist - {procedure.rsl_plate_num}") self.setWindowTitle(f"Equipment Checklist - {procedure.name}")
self.used_equipment = self.procedure.equipment self.used_equipment = self.procedure.equipment
self.kit = self.procedure.kittype self.kit = self.procedure.kittype
self.opt_equipment = procedure.proceduretype.get_equipment() self.opt_equipment = procedure.proceduretype.get_equipment()

View File

@@ -39,6 +39,7 @@ def select_open_file(obj: QMainWindow, file_extension: str | None = None) -> Pat
logger.warning(f"No file selected, cancelling.") logger.warning(f"No file selected, cancelling.")
return return
obj.last_dir = fname.parent obj.last_dir = fname.parent
logger.debug(f"File selected: {fname}")
return fname return fname

View File

@@ -70,8 +70,8 @@ class ProcedureCreation(QDialog):
procedure=self.created_procedure.__dict__, procedure=self.created_procedure.__dict__,
plate_map=self.plate_map plate_map=self.plate_map
) )
with open("procedure.html", "w") as f: # with open("procedure.html", "w") as f:
f.write(html) # f.write(html)
self.webview.setHtml(html) self.webview.setHtml(html)
@pyqtSlot(str, str) @pyqtSlot(str, str)

View File

@@ -1,26 +0,0 @@
"""
"""
from pathlib import Path
from backend.validators import PydResults
from backend.db.models import Procedure, Results
from backend.excel.parsers.pcr_parser import PCRSampleParser, PCRInfoParser
from frontend.widgets.functions import select_open_file
from . import DefaultResults
class PCR(DefaultResults):
def __init__(self, procedure: Procedure, fname:Path|str|None=None):
self.procedure = procedure
if not fname:
self.fname = select_open_file(file_extension="xlsx")
elif isinstance(fname, str):
self.fname = Path(fname)
self.info_parser = PCRInfoParser(filepath=fname)
self.sample_parser = PCRSampleParser(filepath=fname)
def build_procedure(self):
results = PydResults(parent=self.procedure)
results.results =

View File

@@ -4,4 +4,4 @@ class DefaultResults(object):
pass pass
from .PCR import pcr from .pcr import PCR

View File

@@ -0,0 +1,39 @@
"""
"""
import logging
from pathlib import Path
from backend.validators import PydResults
from backend.db.models import Procedure, Results
from backend.excel.parsers.pcr_parser import PCRSampleParser, PCRInfoParser
from frontend.widgets.functions import select_open_file
from tools import get_application_from_parent
from . import DefaultResults
logger = logging.getLogger(f"submissions.{__name__}")
class PCR(DefaultResults):
def __init__(self, procedure: Procedure, parent, fname:Path|str|None=None):
logger.debug(f"FName before correction: {fname}")
self.procedure = procedure
if not fname:
self.fname = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent))
elif isinstance(fname, str):
self.fname = Path(fname)
logger.debug(f"FName after correction: {fname}")
self.info_parser = PCRInfoParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = PCRSampleParser(filepath=self.fname, procedure=self.procedure)
self.build_procedure()
self.build_samples()
def build_procedure(self):
procedure_info = self.info_parser.to_pydantic()
procedure_sql = procedure_info.to_sql()
procedure_sql.save()
def build_samples(self):
samples = self.sample_parser.to_pydantic()
for sample in samples:
sql = sample.to_sql()
sql.save()

View File

@@ -321,14 +321,11 @@ class SubmissionsTree(QTreeView):
event (_type_): the item of interest event (_type_): the item of interest
""" """
indexes = self.selectedIndexes() indexes = self.selectedIndexes()
dicto = next((item.data(1) for item in indexes if item.data(1))) dicto = next((item.data(1) for item in indexes if item.data(1)))
query_obj = dicto['item_type'].query(name=dicto['query_str'], limit=1) query_obj = dicto['item_type'].query(name=dicto['query_str'], limit=1)
logger.debug(query_obj) logger.debug(query_obj)
# NOTE: Convert to data in id column (i.e. column 0) # NOTE: Convert to data in id column (i.e. column 0)
# id = id.sibling(id.row(), 0).data() # id = id.sibling(id.row(), 0).data()
# logger.debug(id.model().query_group_object(id.row())) # logger.debug(id.model().query_group_object(id.row()))
# clientsubmission = id.model().query_group_object(id.row()) # clientsubmission = id.model().query_group_object(id.row())
self.menu = QMenu(self) self.menu = QMenu(self)
@@ -403,8 +400,6 @@ class SubmissionsTree(QTreeView):
if isinstance(children, List): if isinstance(children, List):
self._populateTree(child, child_item) self._populateTree(child, child_item)
def clear(self): def clear(self):
if self.model != None: if self.model != None:
# self.model.clear() # works # self.model.clear() # works
@@ -415,7 +410,6 @@ class SubmissionsTree(QTreeView):
# NOTE: Convert to data in id column (i.e. column 0) # NOTE: Convert to data in id column (i.e. column 0)
# id = id.sibling(id.row(), 1) # id = id.sibling(id.row(), 1)
indexes = self.selectedIndexes() indexes = self.selectedIndexes()
dicto = next((item.data(1) for item in indexes if item.data(1))) dicto = next((item.data(1) for item in indexes if item.data(1)))
logger.debug(dicto) logger.debug(dicto)
# try: # try: