UPdates to managers.

This commit is contained in:
lwark
2025-07-09 12:13:43 -05:00
parent 432854e76f
commit d5961f42a5
10 changed files with 147 additions and 89 deletions

View File

@@ -18,10 +18,8 @@ from sqlalchemy.exc import ArgumentError
from typing import Any, List, ClassVar
from pathlib import Path
from sqlalchemy.orm.relationships import _RelationshipDeclared
from frontend import select_save_file
from tools import report_result, list_sort_dict
from backend.excel import writers
# NOTE: Load testing environment
if 'pytest' in sys.modules:
@@ -638,15 +636,19 @@ class BaseClass(Base):
pass
def export(self, obj, output_filepath: str|Path|None=None):
if not hasattr(self, "template_file"):
logger.error(f"Export not implemented for {self.__class__.__name__}")
return
pyd = self.to_pydantic()
if not output_filepath:
output_filepath = select_save_file(obj=obj, default_name=pyd.construct_filename(), extension="xlsx")
Writer = getattr(writers, f"{self.__class__.__name__}Writer")
writer = Writer(output_filepath=output_filepath, pydant_obj=pyd, range_dict=self.range_dict)
workbook = writer
# if not hasattr(self, "template_file"):
# logger.error(f"Export not implemented for {self.__class__.__name__}")
# return
# pyd = self.to_pydantic()
# if not output_filepath:
# from frontend import select_save_file
# output_filepath = select_save_file(obj=obj, default_name=pyd.construct_filename(), extension="xlsx")
# Writer = getattr(writers, f"{self.__class__.__name__}Writer")
# writer = Writer(output_filepath=output_filepath, pydant_obj=pyd, range_dict=self.range_dict)
# workbook = writer
from backend import managers
Manager = getattr(managers, f"Default{self.__class__.__name__}")
manager = Manager(parent=obj, input_object=self)
class LogMixin(Base):

View File

@@ -9,15 +9,19 @@ from copy import deepcopy
from getpass import getuser
import logging, uuid, tempfile, re, base64, numpy as np, pandas as pd, types, sys
from inspect import isclass
from io import BytesIO
from zipfile import ZipFile, BadZipfile
from tempfile import TemporaryDirectory, TemporaryFile
from operator import itemgetter
from pprint import pformat
import openpyxl
from pandas import DataFrame
from sqlalchemy.ext.hybrid import hybrid_property
from frontend import select_save_file
from . import Base, BaseClass, Reagent, SubmissionType, KitType, ClientLab, Contact, LogMixin, Procedure, kittype_procedure
from frontend.widgets.functions import select_save_file
from . import Base, BaseClass, Reagent, SubmissionType, KitType, ClientLab, Contact, LogMixin, Procedure, \
kittype_procedure
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified
@@ -35,10 +39,10 @@ from pathlib import Path
from jinja2.exceptions import TemplateNotFound
from jinja2 import Template
from PIL import Image
if TYPE_CHECKING:
from backend.db.models.kits import ProcedureType, Procedure
logger = logging.getLogger(f"procedure.{__name__}")
@@ -73,7 +77,7 @@ class ClientSubmission(BaseClass, LogMixin):
) #: Relation to ClientSubmissionSampleAssociation
sample = association_proxy("clientsubmissionsampleassociation",
"sample") #, creator=lambda sample: ClientSubmissionSampleAssociation(
"sample") #, creator=lambda sample: ClientSubmissionSampleAssociation(
# sample=sample)) #: Association proxy to ClientSubmissionSampleAssociation.sample
@@ -362,10 +366,12 @@ class ClientSubmission(BaseClass, LogMixin):
output['expanded'] = ["clientlab", "contact", "submissiontype"]
return output
def to_pydantic(self, filepath: Path|str|None=None, **kwargs):
def to_pydantic(self, filepath: Path | str | None = None, **kwargs):
output = super().to_pydantic(filepath=filepath, **kwargs)
output.template_file = self.template_file
return output
class Run(BaseClass, LogMixin):
"""
Object for an entire procedure procedure. Links to client procedure, reagents, equipment, process
@@ -634,7 +640,8 @@ class Run(BaseClass, LogMixin):
# logger.debug(f"Active samples:{pformat(active_samples)}")
for sample in active_samples:
sample['active'] = True
inactive_samples = [sample.details_dict() for sample in submission_samples if sample.name not in [s['sample_id'] for s in active_samples]]
inactive_samples = [sample.details_dict() for sample in submission_samples if
sample.name not in [s['sample_id'] for s in active_samples]]
# logger.debug(f"Inactive samples:{pformat(inactive_samples)}")
for sample in inactive_samples:
sample['active'] = False
@@ -642,7 +649,8 @@ class Run(BaseClass, LogMixin):
output['sample'] = active_samples + inactive_samples
output['procedure'] = [procedure.details_dict() for procedure in output['procedure']]
output['permission'] = is_power_user()
output['excluded'] = ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom', 'permission']
output['excluded'] = ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom',
'permission']
return output
@@ -727,7 +735,8 @@ class Run(BaseClass, LogMixin):
@property
def sample_dicts(self) -> List[dict]:
return [dict(sample_id=assoc.sample.sample_id, row=assoc.row, column=assoc.column, background_color="#6ffe1d") for assoc in self.runsampleassociation]
return [dict(sample_id=assoc.sample.sample_id, row=assoc.row, column=assoc.column, background_color="#6ffe1d")
for assoc in self.runsampleassociation]
@classmethod
def make_plate_map(cls, sample_list: list, plate_rows: int = 8, plate_columns=12) -> str:
@@ -1010,7 +1019,6 @@ class Run(BaseClass, LogMixin):
regex = re.compile(rstring, flags=re.IGNORECASE | re.VERBOSE)
return regex
# NOTE: Query functions
@classmethod
@@ -1190,14 +1198,15 @@ class Run(BaseClass, LogMixin):
Returns:
dict: dictionary of functions
"""
names = ["Add Procedure", "Edit", "Add Comment", "Show Details", "Delete"]
names = ["Add Procedure", "Edit", "Export", "Add Comment", "Show Details", "Delete"]
output = {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names}
logger.debug(output)
return output
def add_procedure(self, obj, proceduretype_name: str):
from frontend.widgets.procedure_creation import ProcedureCreation
procedure_type = next((proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name))
procedure_type = next(
(proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name))
logger.debug(f"Got ProcedureType: {procedure_type}")
dlg = ProcedureCreation(parent=obj, procedure=procedure_type.construct_dummy_procedure(run=self))
if dlg.exec():
@@ -1279,14 +1288,19 @@ class Run(BaseClass, LogMixin):
self.set_attribute(key='comment', value=comment)
self.save(original=False)
def export(self, obj, output_filepath: str|Path|None=None):
def export(self, obj, output_filepath: str | Path | None = None):
from backend.excel import writers
clientsubmission_pyd = self.clientsubmission.to_pydantic()
if not output_filepath:
output_filepath = select_save_file(obj=obj, default_name=clientsubmission_pyd.construct_filename(), extension="xlsx")
output_filepath = select_save_file(obj=obj, default_name=self.construct_filename(), extension="xlsx")
Writer = getattr(writers, "ClientSubmissionWriter")
writer = Writer(output_filepath=output_filepath, pydant_obj=pyd, range_dict=self.range_dict)
workbook = writer.
writer = Writer(output_filepath=output_filepath, pydant_obj=clientsubmission_pyd,
range_dict=self.clientsubmission.range_dict)
workbook: openpyxl.Workbook = writer.write_info()
workbook.save(filename=output_filepath)
def construct_filename(self):
return f"{self.rsl_plate_number}-{self.clientsubmission.clientlab.name}-{self.clientsubmission.submitter_plate_id}"
def backup(self, obj=None, fname: Path | None = None, full_backup: bool = False):
"""
@@ -1360,7 +1374,7 @@ class Run(BaseClass, LogMixin):
def allowed_procedures(self):
return self.clientsubmission.submissiontype.proceduretype
def get_submission_rank_of_sample(self, sample: Sample|str):
def get_submission_rank_of_sample(self, sample: Sample | str):
if isinstance(sample, str):
sample = Sample.query(sample_id=sample)
clientsubmissionsampleassoc = next((assoc for assoc in self.clientsubmission.clientsubmissionsampleassociation
@@ -1378,10 +1392,12 @@ class Run(BaseClass, LogMixin):
submission_rank = self.get_submission_rank_of_sample(sample=sample)
if submission_rank != 0:
row, column = plate_dict[submission_rank]
ranked_samples.append(dict(well_id=sample.sample_id, sample_id=sample.sample_id, row=row, column=column, submission_rank=submission_rank, background_color="#6ffe1d"))
ranked_samples.append(dict(well_id=sample.sample_id, sample_id=sample.sample_id, row=row, column=column,
submission_rank=submission_rank, background_color="#6ffe1d"))
else:
unranked_samples.append(sample)
possible_ranks = (item for item in list(plate_dict.keys()) if item not in [sample['submission_rank'] for sample in ranked_samples])
possible_ranks = (item for item in list(plate_dict.keys()) if
item not in [sample['submission_rank'] for sample in ranked_samples])
# logger.debug(possible_ranks)
# possible_ranks = (plate_dict[idx] for idx in possible_ranks)
for sample in unranked_samples:
@@ -1391,13 +1407,15 @@ class Run(BaseClass, LogMixin):
continue
row, column = plate_dict[submission_rank]
ranked_samples.append(
dict(well_id=sample.sample_id, sample_id=sample.sample_id, row=row, column=column, submission_rank=submission_rank,
dict(well_id=sample.sample_id, sample_id=sample.sample_id, row=row, column=column,
submission_rank=submission_rank,
background_color="#6ffe1d", enabled=True))
padded_list = []
for iii in range(1, proceduretype.total_wells+1):
for iii in range(1, proceduretype.total_wells + 1):
row, column = proceduretype.ranked_plate[iii]
sample = next((item for item in ranked_samples if item['submission_rank']==iii),
dict(well_id=f"blank_{iii}", sample_id="", row=row, column=column, submission_rank=iii, background_color="#ffffff", enabled=False)
sample = next((item for item in ranked_samples if item['submission_rank'] == iii),
dict(well_id=f"blank_{iii}", sample_id="", row=row, column=column, submission_rank=iii,
background_color="#ffffff", enabled=False)
)
padded_list.append(sample)
# logger.debug(f"Final padded list:\n{pformat(list(sorted(padded_list, key=itemgetter('submission_rank'))))}")
@@ -1485,7 +1503,7 @@ class Sample(BaseClass, LogMixin):
)
if full_data:
sample['clientsubmission'] = sorted([item.to_sub_dict() for item in self.sampleclientsubmissionassociation],
key=itemgetter('submitted_date'))
key=itemgetter('submitted_date'))
return sample
def to_pydantic(self):
@@ -2126,8 +2144,8 @@ class RunSampleAssociation(BaseClass):
output['misc_info'] = misc
return output
class ProcedureSampleAssociation(BaseClass):
class ProcedureSampleAssociation(BaseClass):
id = Column(INTEGER, unique=True, nullable=False)
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
@@ -2142,13 +2160,14 @@ class ProcedureSampleAssociation(BaseClass):
results = relationship("Results", back_populates="sampleprocedureassociation")
@classmethod
def query(cls, sample: Sample|str|None=None, procedure: Procedure|str|None=None, limit: int=0, **kwargs):
def query(cls, sample: Sample | str | None = None, procedure: Procedure | str | None = None, limit: int = 0,
**kwargs):
query = cls.__database_session__.query(cls)
match sample:
case Sample():
query = query.filter(cls.sample==sample)
query = query.filter(cls.sample == sample)
case str():
query = query.join(Sample).filter(Sample.sample_id==sample)
query = query.join(Sample).filter(Sample.sample_id == sample)
case _:
pass
match procedure:
@@ -2162,15 +2181,13 @@ class ProcedureSampleAssociation(BaseClass):
limit = 1
return cls.execute_query(query=query, limit=limit, **kwargs)
def __init__(self, new_id:int|None=None, **kwarg):
def __init__(self, new_id: int | None = None, **kwarg):
if new_id:
self.id = new_id
else:
self.id = self.__class__.autoincrement_id()
super().__init__(**kwarg)
@classmethod
def autoincrement_id(cls) -> int:
"""
@@ -2195,4 +2212,3 @@ class ProcedureSampleAssociation(BaseClass):
output['misc_info'] = misc
output['results'] = [result.details_dict() for result in output['results']]
return output

View File

@@ -12,7 +12,7 @@ from openpyxl.reader.excel import load_workbook
from tools import row_keys
# from backend.db.models import SubmissionType
from . import DefaultKEYVALUEParser, DefaultTABLEParser
from backend.managers import procedures as procedure_managers
logger = logging.getLogger(f"submissions.{__name__}")
@@ -84,9 +84,13 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, *args, **kwargs):
def __init__(self, filepath: Path | str, submissiontype:"SubmissionType"|None=None, *args, **kwargs):
from frontend.widgets.pop_ups import QuestionAsker
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
from backend.managers import procedures as procedure_managers
if not submissiontype:
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
else:
self.submissiontype = submissiontype
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.info_map
super().__init__(filepath=filepath, **kwargs)
@@ -118,8 +122,11 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, *args, **kwargs):
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
def __init__(self, filepath: Path | str, submissiontype: "SubmissionType"|None=None, *args, **kwargs):
if not submissiontype:
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
else:
self.submissiontype = submissiontype
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.sample_map
super().__init__(filepath=filepath, **kwargs)

View File

@@ -1,22 +1,37 @@
import logging
from pathlib import Path
from typing import Literal
from backend.db.models import ProcedureType
from frontend.widgets.functions import select_open_file
from tools import get_application_from_parent
from backend.validators.pydant import PydBaseClass
from backend.db.models import BaseClass
logger = logging.getLogger(f"submissions.{__name__}")
class DefaultManager(object):
def __init__(self, proceduretype: ProcedureType, parent, fname: Path | str | None = None):
logger.debug(f"FName before correction: {fname}")
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype
if fname != "no_file":
if not fname:
self.fname = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent))
elif isinstance(fname, str):
self.fname = Path(fname)
logger.debug(f"FName after correction: {fname}")
def __init__(self, parent, input_object: Path | str | None = None):
logger.debug(f"FName before correction: {input_object}")
# if input_object != "no_file":
match input_object:
case str():
self.input_object = Path(input_object)
self.pyd = self.parse()
case Path():
self.input_object = input_object
self.pyd = self.parse()
case x if issubclass(input_object.__class__, PydBaseClass):
self.pyd = input_object
case x if issubclass(input_object.__class__, BaseClass):
self.pyd = input_object.to_pydantic()
case _:
self.input_object = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent))
self.pyd = self.parse()
logger.debug(f"FName after correction: {input_object}")
from .clientsubmissions import DefaultClientSubmission
from .procedures import DefaultProcedure
from.results import DefaultResults

View File

@@ -13,28 +13,33 @@ logger = logging.getLogger(f"submissions.{__name__}")
class DefaultProcedure(DefaultManager):
def __init__(self, proceduretype: "ProcedureType"|str, parent, fname: Path | str | None = None):
super().__init__(parent=parent, fname=fname)
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype
super().__init__(proceduretype=proceduretype, parent=parent, fname=fname)
def parse(self):
try:
info_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}InfoParser")
except AttributeError:
info_parser = procedure_parsers.DefaultInfoParser
self.info_parser = info_parser(filepath=fname, proceduretype=proceduretype)
self.info_parser = info_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
reagent_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}ReagentParser")
except AttributeError:
reagent_parser = procedure_parsers.DefaultReagentParser
self.reagent_parser = reagent_parser(filepath=fname, proceduretype=proceduretype)
self.reagent_parser = reagent_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
sample_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}SampleParser")
except AttributeError:
sample_parser = procedure_parsers.DefaultSampleParser
self.sample_parser = sample_parser(filepath=fname, proceduretype=proceduretype)
self.sample_parser = sample_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
equipment_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}EquipmentParser")
except AttributeError:
equipment_parser = procedure_parsers.DefaultEquipmentParser
self.equipment_parser = equipment_parser(filepath=fname, proceduretype=proceduretype)
self.equipment_parser = equipment_parser(filepath=self.fname, proceduretype=self.proceduretype)
self.to_pydantic()
def to_pydantic(self):

View File

@@ -128,6 +128,9 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
return sql
class PydReagent(PydBaseClass):
lot: str | None
reagentrole: str | None
@@ -1778,6 +1781,8 @@ class PydClientSubmission(PydBaseClass):
SubmissionFormWidget: Submission form widget
"""
from frontend.widgets.submission_widget import ClientSubmissionFormWidget
if not samples:
samples = self.samples
return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable)
def to_sql(self):
@@ -1804,6 +1809,7 @@ class PydClientSubmission(PydBaseClass):
return sql
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
results: dict = Field(default={})
results_type: str = Field(default="NA")

View File

@@ -13,7 +13,7 @@ from PyQt6.QtGui import QAction
from pathlib import Path
from markdown import markdown
from pandas import ExcelWriter
from backend.db.models import Reagent, Sample, ClientSubmission, KitType, Run
from backend.db.models import Reagent, KitType
from tools import (
check_if_app, Settings, Report, jinja_template_loading, check_authorization, page_size, is_power_user,
under_development
@@ -175,6 +175,7 @@ class App(QMainWindow):
"""
Create a search for sample.
"""
from backend.db.models.submissions import Sample
dlg = SearchBox(self, object_type=Sample, extras=[])
dlg.exec()

View File

@@ -2,7 +2,7 @@ from PyQt6.QtWidgets import (
QVBoxLayout, QDialog, QDialogButtonBox
)
from .misc import CheckableComboBox, StartEndDatePicker
from backend.db import SubmissionType
from backend.db.models.kits import SubmissionType
class DateTypePicker(QDialog):

View File

@@ -11,9 +11,9 @@ from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel, pyqtSlo
from PyQt6.QtGui import QAction, QCursor, QStandardItemModel, QStandardItem, QIcon, QColor, QContextMenuEvent
from typing import Dict, List
from backend import Procedure
from backend.db.models import Run, ClientSubmission
from tools import Report, Result, report_result
# from backend import Procedure
from backend.db.models import Run, ClientSubmission, Procedure
from tools import Report, Result, report_result, get_application_from_parent
from .functions import select_open_file
logger = logging.getLogger(f"procedure.{__name__}")
@@ -261,6 +261,7 @@ class SubmissionsTree(QTreeView):
def __init__(self, model, parent=None):
super(SubmissionsTree, self).__init__(parent)
self.app = get_application_from_parent(parent)
self.total_count = ClientSubmission.__database_session__.query(ClientSubmission).count()
# self.setIndentation(0)
self.setExpandsOnDoubleClick(False)

View File

@@ -6,6 +6,8 @@ from PyQt6.QtWidgets import (
QComboBox, QDateEdit, QLineEdit, QLabel, QCheckBox, QHBoxLayout, QGridLayout
)
from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker
from backend.managers import DefaultClientSubmission
from .functions import select_open_file, select_save_file
import logging
from pathlib import Path
@@ -120,25 +122,28 @@ class SubmissionFormContainer(QWidget):
report.add_result(Result(msg=f"File {fname.__str__()} not found.", status="critical"))
return report
# NOTE: create sheetparser using excel sheet and context from gui
try:
self.clientsubmissionparser = ClientSubmissionInfoParser(filepath=fname)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return
except AttributeError:
self.clientsubmissionparser = ClientSubmissionInfoParser(filepath=fname)
try:
# self.prsr = SheetParser(filepath=fname)
self.sampleparser = ClientSubmissionSampleParser(filepath=fname)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return
except AttributeError:
self.sampleparser = ClientSubmissionSampleParser(filepath=fname)
self.pydclientsubmission = self.clientsubmissionparser.to_pydantic()
self.pydsamples = self.sampleparser.to_pydantic()
# try:
# self.clientsubmissionparser = ClientSubmissionInfoParser(filepath=fname)
# except PermissionError:
# logger.error(f"Couldn't get permission to access file: {fname}")
# return
# except AttributeError:
# self.clientsubmissionparser = ClientSubmissionInfoParser(filepath=fname)
# try:
# # self.prsr = SheetParser(filepath=fname)
# self.sampleparser = ClientSubmissionSampleParser(filepath=fname)
# except PermissionError:
# logger.error(f"Couldn't get permission to access file: {fname}")
# return
# except AttributeError:
# self.sampleparser = ClientSubmissionSampleParser(filepath=fname)
# self.pydclientsubmission = self.clientsubmissionparser.to_pydantic()
# self.pydsamples = self.sampleparser.to_pydantic()
# logger.debug(f"Samples: {pformat(self.pydclientsubmission.sample)}")
checker = SampleChecker(self, "Sample Checker", self.pydsamples)
self.clientsubmission_manager = DefaultClientSubmission(parent=self, fname=fname)
self.pydclientsubmission = self.clientsubmission_manager.parse()
checker = SampleChecker(self, "Sample Checker", self.pydclientsubmission.samples)
if checker.exec():
# logger.debug(pformat(self.pydclientsubmission.sample))
try:
@@ -147,7 +152,7 @@ class SubmissionFormContainer(QWidget):
logger.error(f"Got wrong type for {self.pydclientsubmission}: {type(self.pydclientsubmission)}")
raise e
self.form = self.pydclientsubmission.to_form(parent=self)
self.form.samples = self.pydsamples
# self.form.samples = self.pydsamples
self.layout().addWidget(self.form)
else:
message = "Submission cancelled."