Before update to Submission Widget

This commit is contained in:
Landon Wark
2024-03-11 13:02:21 -05:00
parent 08988a14a0
commit badf6e21c8
5 changed files with 48 additions and 259 deletions

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package
__project__ = "submissions"
__version__ = "202403.1b"
__version__ = "202403.2b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2024, Government of Canada"

View File

@@ -426,7 +426,7 @@ class BasicSubmission(BaseClass):
dicto = self.to_dict(full_data=True, backup=backup)
new_dict = {}
for key, value in dicto.items():
missing = value is None or value == ""
missing = value is None or value in ['', 'None']
match key:
case "reagents":
new_dict[key] = [PydReagent(**reagent, missing=False) for reagent in value]

View File

@@ -111,7 +111,7 @@ class App(QMainWindow):
logger.debug(f"Connecting actions...")
self.importAction.triggered.connect(self.table_widget.formwidget.importSubmission)
self.importPCRAction.triggered.connect(self.table_widget.formwidget.import_pcr_results)
self.addReagentAction.triggered.connect(self.add_reagent)
self.addReagentAction.triggered.connect(self.table_widget.formwidget.add_reagent)
self.generateReportAction.triggered.connect(self.table_widget.sub_wid.generate_report)
self.joinExtractionAction.triggered.connect(self.table_widget.sub_wid.link_extractions)
self.joinPCRAction.triggered.connect(self.table_widget.sub_wid.link_pcr)
@@ -158,36 +158,7 @@ class App(QMainWindow):
else:
self.statusBar().showMessage("Action completed sucessfully.", 5000)
def add_reagent(self, reagent_lot:str|None=None, reagent_type:str|None=None, expiry:date|None=None, name:str|None=None):
"""
Action to create new reagent in DB.
Args:
reagent_lot (str | None, optional): Parsed reagent from import form. Defaults to None.
reagent_type (str | None, optional): Parsed reagent type from import form. Defaults to None.
expiry (date | None, optional): Parsed reagent expiry data. Defaults to None.
name (str | None, optional): Parsed reagent name. Defaults to None.
Returns:
models.Reagent: the constructed reagent object to add to submission
"""
report = Report()
if isinstance(reagent_lot, bool):
reagent_lot = ""
# create form
dlg = AddReagentForm(reagent_lot=reagent_lot, reagent_type=reagent_type, expiry=expiry, reagent_name=name)
if dlg.exec():
# extract form info
info = dlg.parse_form()
logger.debug(f"Reagent info: {info}")
# create reagent object
reagent = PydReagent(ctx=self.ctx, **info)
# send reagent to db
sqlobj, result = reagent.toSQL()
sqlobj.save()
report.add_result(result)
self.result_reporter()
return reagent
def runSearch(self):
dlg = LogParser(self)
@@ -247,4 +218,4 @@ class AddSubForm(QWidget):
self.tab4.setLayout(self.tab4.layout)
# add tabs to main widget
self.layout.addWidget(self.tabs)
self.setLayout(self.layout)
self.setLayout(self.layout)

View File

@@ -16,6 +16,7 @@ from backend.db import (
)
from pprint import pformat
from .pop_ups import QuestionAsker, AlertPop
from .misc import AddReagentForm
from typing import List, Tuple
from datetime import date
@@ -65,49 +66,6 @@ class SubmissionFormContainer(QWidget):
self.report = Report()
self.app.result_reporter()
# def scrape_reagents(self, *args, **kwargs):
# """
# Called when a reagent is changed.
# """
# caller = inspect.stack()[1].function.__repr__().replace("'", "")
# logger.debug(f"Args: {args}, kwargs: {kwargs}")
# self.scrape_reagents_function(args[0], caller=caller)
# self.kit_integrity_completion()
# self.app.report.add_result(self.report)
# self.report = Report()
# match inspect.stack()[1].function.__repr__():
# case "import_submission_function":
# pass
# case _:
# self.app.result_reporter()
# def kit_integrity_completion(self):
# """
# Performs check of imported reagents
# NOTE: this will not change self.reagents which should be fine
# since it's only used when looking up
# """
# self.kit_integrity_completion_function()
# self.app.report.add_result(self.report)
# self.report = Report()
# match inspect.stack()[1].function.__repr__():
# case "import_submission_function":
# pass
# case _:
# self.app.result_reporter()
# def submit_new_sample(self):
# """
# Attempt to add sample to database when 'submit' button clicked
# """
# self.submit_new_sample_function()
# self.app.report.add_result(self.report)
# self.report = Report()
# self.app.result_reporter()
# def export_csv(self, fname:Path|None=None):
# self.export_csv_function(fname)
def import_submission_function(self, fname:Path|None=None):
"""
Import a new submission to the app window
@@ -148,180 +106,12 @@ class SubmissionFormContainer(QWidget):
logger.debug(f"Pydantic result: \n\n{pformat(self.pyd)}\n\n")
self.form = self.pyd.toForm(parent=self)
self.layout().addWidget(self.form)
# kit_widget = self.form.find_widgets(object_name="extraction_kit")[0].input
# logger.debug(f"Kitwidget {kit_widget}")
# self.scrape_reagents(kit_widget.currentText())
# kit_widget.currentTextChanged.connect(self.scrape_reagents)
# # compare obj.reagents with expected reagents in kit
if self.prsr.sample_result != None:
report.add_result(msg=self.prsr.sample_result, status="Warning")
self.report.add_result(report)
logger.debug(f"Outgoing report: {self.report.results}")
logger.debug(f"All attributes of submission container:\n{pformat(self.__dict__)}")
# def scrape_reagents_function(self, extraction_kit:str, caller:str|None=None):
# """
# Extracted scrape reagents function that will run when
# form 'extraction_kit' widget is updated.
# Args:
# obj (QMainWindow): updated main application
# extraction_kit (str): name of extraction kit (in 'extraction_kit' widget)
# Returns:
# Tuple[QMainWindow, dict]: Updated application and result
# """
# self.form.reagents = []
# logger.debug(f"\n\n{caller}\n\n")
# report = Report()
# logger.debug(f"Extraction kit: {extraction_kit}")
# # Remove previous reagent widgets
# try:
# old_reagents = self.form.find_widgets()
# except AttributeError:
# logger.error(f"Couldn't find old reagents.")
# old_reagents = []
# # logger.debug(f"\n\nAttempting to clear: {old_reagents}\n\n")
# for reagent in old_reagents:
# if isinstance(reagent, ReagentFormWidget) or isinstance(reagent, QPushButton):
# reagent.setParent(None)
# match caller:
# case "import_submission_function":
# self.form.reagents = self.prsr.sub['reagents']
# case _:
# already_have = [reagent for reagent in self.prsr.sub['reagents'] if not reagent.missing]
# names = list(set([item.type for item in already_have]))
# logger.debug(f"reagents: {already_have}")
# reagents = [item.to_pydantic() for item in KitType.query(name=extraction_kit).get_reagents(submission_type=self.pyd.submission_type) if item.name not in names]
# self.form.reagents = already_have + reagents
# # logger.debug(f"Imported reagents: {obj.reagents}")
# # logger.debug(f"Missing reagents: {obj.missing_reagents}")
# self.report.add_result(report)
# logger.debug(f"Outgoing report: {self.report.results}")
# def kit_integrity_completion_function(self):
# """
# Compare kit contents to parsed contents
# Args:
# obj (QMainWindow): The original app window
# Returns:
# Tuple[QMainWindow, dict]: Collection of new main app window and result dict
# """
# report = Report()
# missing_reagents = []
# # logger.debug(inspect.currentframe().f_back.f_code.co_name)
# # find the widget that contains kit info
# kit_widget = self.form.find_widgets(object_name="extraction_kit")[0].input
# logger.debug(f"Kit selector: {kit_widget}")
# # get current kit being used
# self.ext_kit = kit_widget.currentText()
# for reagent in self.form.reagents:
# logger.debug(f"Creating widget for {reagent}")
# add_widget = ReagentFormWidget(parent=self, reagent=reagent, extraction_kit=self.ext_kit)
# # self.form.layout().addWidget(add_widget)
# self.form.layout.addWidget(add_widget)
# if reagent.missing:
# missing_reagents.append(reagent)
# logger.debug(f"Checking integrity of {self.ext_kit}")
# # TODO: put check_kit_integrity here instead of what's here?
# # see if there are any missing reagents
# if len(missing_reagents) > 0:
# result = Result(msg=f"""The submission you are importing is missing some reagents expected by the kit.\n\n
# It looks like you are missing: {[item.type.upper() for item in missing_reagents]}\n\n
# Alternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.
# \n\nPlease make sure you check the lots carefully!""".replace(" ", ""), status="Warning")
# report.add_result(result)
# if hasattr(self.pyd, "csv"):
# export_csv_btn = QPushButton("Export CSV")
# export_csv_btn.setObjectName("export_csv_btn")
# self.form.layout().addWidget(export_csv_btn)
# export_csv_btn.clicked.connect(self.export_csv)
# submit_btn = QPushButton("Submit")
# submit_btn.setObjectName("submit_btn")
# self.form.layout().addWidget(submit_btn)
# submit_btn.clicked.connect(self.submit_new_sample)
# self.report.add_result(report)
# logger.debug(f"Outgoing report: {self.report.results}")
# def submit_new_sample_function(self) -> QWidget:
# """
# Parse forms and add sample to the database.
# Args:
# obj (QMainWindow): original app window
# Returns:
# Tuple[QMainWindow, dict]: Collection of new main app window and result dict
# """
# logger.debug(f"\n\nBeginning Submission\n\n")
# report = Report()
# self.pyd: PydSubmission = self.form.parse_form()
# logger.debug(f"Submission: {pformat(self.pyd)}")
# logger.debug("Checking kit integrity...")
# result = self.pyd.check_kit_integrity()
# report.add_result(result)
# if len(result.results) > 0:
# self.report.add_result(report)
# return
# base_submission, result = self.pyd.toSQL()
# # logger.debug(f"Base submission: {base_submission.to_dict()}")
# # check output message for issues
# match result.code:
# # code 0: everything is fine.
# case 0:
# self.report.add_result(None)
# # code 1: ask for overwrite
# case 1:
# dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result.msg)
# if dlg.exec():
# # Do not add duplicate reagents.
# # base_submission.reagents = []
# result = None
# else:
# self.app.ctx.database_session.rollback()
# self.report.add_result(Result(msg="Overwrite cancelled", status="Information"))
# return
# # code 2: No RSL plate number given
# case 2:
# self.report.add_result(result)
# return
# case _:
# pass
# # add reagents to submission object
# for reagent in base_submission.reagents:
# # logger.debug(f"Updating: {reagent} with {reagent.lot}")
# reagent.update_last_used(kit=base_submission.extraction_kit)
# # logger.debug(f"Here is the final submission: {pformat(base_submission.__dict__)}")
# # logger.debug(f"Parsed reagents: {pformat(base_submission.reagents)}")
# # logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
# # logger.debug(f"Samples from pyd: {pformat(self.pyd.samples)}")
# # logger.debug(f"Samples SQL: {pformat([item.__dict__ for item in base_submission.samples])}")
# # logger.debug(f"")
# base_submission.save()
# # update summary sheet
# self.app.table_widget.sub_wid.setData()
# # reset form
# self.form.setParent(None)
# # logger.debug(f"All attributes of obj: {pformat(self.__dict__)}")
# self.report.add_result(report)
# def export_csv_function(self, fname:Path|None=None):
# """
# Save the submission's csv file.
# Args:
# fname (Path | None, optional): Input filename. Defaults to None.
# """
# if isinstance(fname, bool) or fname == None:
# fname = select_save_file(obj=self, default_name=self.pyd.construct_filename(), extension="csv")
# try:
# self.pyd.csv.to_csv(fname.__str__(), index=False)
# del self.pyd.csv
# except PermissionError:
# logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.")
def import_pcr_results(self):
"""
Pull QuantStudio results into db
@@ -392,6 +182,37 @@ class SubmissionFormContainer(QWidget):
sub.update_subsampassoc(sample=sample, input_dict=sample_dict)
self.report.add_result(Result(msg=f"We added PCR info to {sub.rsl_plate_num}.", status='Information'))
def add_reagent(self, reagent_lot:str|None=None, reagent_type:str|None=None, expiry:date|None=None, name:str|None=None):
"""
Action to create new reagent in DB.
Args:
reagent_lot (str | None, optional): Parsed reagent from import form. Defaults to None.
reagent_type (str | None, optional): Parsed reagent type from import form. Defaults to None.
expiry (date | None, optional): Parsed reagent expiry data. Defaults to None.
name (str | None, optional): Parsed reagent name. Defaults to None.
Returns:
models.Reagent: the constructed reagent object to add to submission
"""
report = Report()
if isinstance(reagent_lot, bool):
reagent_lot = ""
# create form
dlg = AddReagentForm(reagent_lot=reagent_lot, reagent_type=reagent_type, expiry=expiry, reagent_name=name)
if dlg.exec():
# extract form info
info = dlg.parse_form()
logger.debug(f"Reagent info: {info}")
# create reagent object
reagent = PydReagent(ctx=self.ctx, **info)
# send reagent to db
sqlobj, result = reagent.toSQL()
sqlobj.save()
report.add_result(result)
self.app.result_reporter()
return reagent
class SubmissionFormWidget(QWidget):
def __init__(self, parent: QWidget, **kwargs) -> None:
@@ -576,7 +397,8 @@ class SubmissionFormWidget(QWidget):
result = self.pyd.check_kit_integrity()
report.add_result(result)
if len(result.results) > 0:
self.report.add_result(report)
self.app.report.add_result(report)
self.app.result_reporter()
return
base_submission, result = self.pyd.toSQL()
# logger.debug(f"Base submission: {base_submission.to_dict()}")
@@ -618,7 +440,8 @@ class SubmissionFormWidget(QWidget):
# reset form
self.setParent(None)
# logger.debug(f"All attributes of obj: {pformat(self.__dict__)}")
self.report.add_result(report)
self.app.report.add_result(report)
self.app.result_reporter()
def export_csv_function(self, fname:Path|None=None):
"""
@@ -866,7 +689,7 @@ class SubmissionFormWidget(QWidget):
if wanted_reagent == None:
dlg = QuestionAsker(title=f"Add {lot}?", message=f"Couldn't find reagent type {self.reagent.type}: {lot} in the database.\n\nWould you like to add it?")
if dlg.exec():
wanted_reagent = self.app.add_reagent(reagent_lot=lot, reagent_type=self.reagent.type, expiry=self.reagent.expiry, name=self.reagent.name)
wanted_reagent = self.parent().parent().add_reagent(reagent_lot=lot, reagent_type=self.reagent.type, expiry=self.reagent.expiry, name=self.reagent.name)
return wanted_reagent, None
else:
# In this case we will have an empty reagent and the submission will fail kit integrity check

View File

@@ -14,6 +14,7 @@ from sqlalchemy import create_engine
from pydantic import field_validator, BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Any, Tuple, Literal, List
from __init__ import bcolors
logger = logging.getLogger(f"submissions.{__name__}")
@@ -295,20 +296,14 @@ class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
class CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
# format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)"
format = "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
logging.DEBUG: bcolors.ENDC + format + bcolors.ENDC,
logging.INFO: bcolors.ENDC + format + bcolors.ENDC,
logging.WARNING: bcolors.WARNING + format + bcolors.ENDC,
logging.ERROR: bcolors.FAIL + format + bcolors.ENDC,
logging.CRITICAL: bcolors.FAIL + format + bcolors.ENDC
}
def format(self, record):