Pydantic added for validation.

This commit is contained in:
Landon Wark
2023-07-07 14:27:26 -05:00
parent 0c81c74f70
commit 1c804bfc6a
12 changed files with 497 additions and 141 deletions

View File

@@ -2,20 +2,23 @@
Contains miscellaneous widgets for frontend functions
'''
from datetime import date
import typing
import difflib
from typing import Tuple
from PyQt6.QtWidgets import (
QLabel, QVBoxLayout,
QLineEdit, QComboBox, QDialog,
QDialogButtonBox, QDateEdit, QSizePolicy, QWidget,
QGridLayout, QPushButton, QSpinBox, QDoubleSpinBox,
QHBoxLayout,
QHBoxLayout, QMainWindow
)
from PyQt6.QtCore import Qt, QDate, QSize
# from submissions.backend.db.functions import lookup_kittype_by_use
# from submissions.backend.db import lookup_regent_by_type_name_and_kit_name
from tools import check_not_nan
from ..all_window_functions import extract_form_info
from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_kittype_by_use#, lookup_regent_by_type_name_and_kit_name
from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, \
lookup_regent_by_type_name, lookup_kittype_by_use, lookup_all_orgs
#, lookup_regent_by_type_name_and_kit_name
from backend.excel.parser import SheetParser
from jinja2 import Environment, FileSystemLoader
import sys
@@ -297,55 +300,103 @@ class ControlsDatePicker(QWidget):
return QSize(80,20)
# class ImportReagent(QComboBox):
# def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None):
# super().__init__()
# self.setEditable(True)
# # Ensure that all reagenttypes have a name that matches the items in the excel parser
# query_var = item.replace("lot_", "")
# if prsr != None:
# logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}")
# else:
# logger.debug(f"Import Reagent is going to retrieve all reagents for {item}")
# logger.debug(f"Query for: {query_var}")
# if prsr != None:
# if isinstance(prsr.sub[item], np.float64):
# logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
# try:
# prsr.sub[item] = int(prsr.sub[item]['lot'])
# except ValueError:
# pass
# # query for reagents using type name from sheet and kit from sheet
# logger.debug(f"Attempting lookup of reagents by type: {query_var}")
# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
# # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
# output_reg = []
# for reagent in relevant_reagents:
# # extract strings from any sets.
# if isinstance(reagent, set):
# for thing in reagent:
# output_reg.append(thing)
# elif isinstance(reagent, str):
# output_reg.append(reagent)
# relevant_reagents = output_reg
# # if reagent in sheet is not found insert it into the front of relevant reagents so it shows
# if prsr != None:
# logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
# if str(prsr.sub[item]['lot']) not in relevant_reagents:
# if check_not_nan(prsr.sub[item]['lot']):
# relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
# else:
# if len(relevant_reagents) > 1:
# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
# idx = relevant_reagents.index(str(prsr.sub[item]['lot']))
# logger.debug(f"The index we got for {prsr.sub[item]['lot']} in {relevant_reagents} was {idx}")
# moved_reag = relevant_reagents.pop(idx)
# relevant_reagents.insert(0, moved_reag)
# else:
# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
# logger.debug(f"New relevant reagents: {relevant_reagents}")
# self.setObjectName(f"lot_{item}")
# self.addItems(relevant_reagents)
class ImportReagent(QComboBox):
def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None):
def __init__(self, ctx:dict, reagent:dict):
super().__init__()
self.setEditable(True)
# Ensure that all reagenttypes have a name that matches the items in the excel parser
query_var = item.replace("lot_", "")
if prsr != None:
logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}")
else:
logger.debug(f"Import Reagent is going to retrieve all reagents for {item}")
logger.debug(f"Query for: {query_var}")
if prsr != None:
if isinstance(prsr.sub[item], np.float64):
logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
try:
prsr.sub[item] = int(prsr.sub[item]['lot'])
except ValueError:
pass
query_var = reagent['type']
logger.debug(f"Import Reagent is looking at: {reagent['lot']} for {reagent['type']}")
if isinstance(reagent['lot'], np.float64):
logger.debug(f"{reagent['lot']} is a numpy float!")
try:
reagent['lot'] = int(reagent['lot'])
except ValueError:
pass
# query for reagents using type name from sheet and kit from sheet
logger.debug(f"Attempting lookup of reagents by type: {query_var}")
# below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
output_reg = []
for reagent in relevant_reagents:
for rel_reagent in relevant_reagents:
# extract strings from any sets.
if isinstance(reagent, set):
for thing in reagent:
if isinstance(rel_reagent, set):
for thing in rel_reagent:
output_reg.append(thing)
elif isinstance(reagent, str):
output_reg.append(reagent)
elif isinstance(rel_reagent, str):
output_reg.append(rel_reagent)
relevant_reagents = output_reg
# if reagent in sheet is not found insert it into the front of relevant reagents so it shows
if prsr != None:
logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
if str(prsr.sub[item]['lot']) not in relevant_reagents:
if check_not_nan(prsr.sub[item]['lot']):
relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
# if prsr != None:
logger.debug(f"Relevant reagents for {reagent['lot']}: {relevant_reagents}")
if str(reagent['lot']) not in relevant_reagents:
if check_not_nan(reagent['lot']):
relevant_reagents.insert(0, str(reagent['lot']))
else:
if len(relevant_reagents) > 1:
logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
idx = relevant_reagents.index(str(reagent['lot']))
logger.debug(f"The index we got for {reagent['lot']} in {relevant_reagents} was {idx}")
moved_reag = relevant_reagents.pop(idx)
relevant_reagents.insert(0, moved_reag)
else:
if len(relevant_reagents) > 1:
logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
idx = relevant_reagents.index(str(prsr.sub[item]['lot']))
logger.debug(f"The index we got for {prsr.sub[item]['lot']} in {relevant_reagents} was {idx}")
moved_reag = relevant_reagents.pop(idx)
relevant_reagents.insert(0, moved_reag)
else:
logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
logger.debug(f"New relevant reagents: {relevant_reagents}")
self.setObjectName(f"lot_{item}")
self.addItems(relevant_reagents)
self.setObjectName(f"lot_{reagent['type']}")
self.addItems(relevant_reagents)

View File

@@ -7,7 +7,6 @@ from getpass import getuser
import inspect
from pathlib import Path
import pprint
import re
import yaml
import json
from typing import Tuple
@@ -30,7 +29,7 @@ from backend.db.functions import (
)
from backend.excel.parser import SheetParser, PCRParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
from tools import RSLNamer, check_not_nan, check_kit_integrity
from tools import check_not_nan, check_kit_integrity
from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker
from .custom_widgets.misc import ImportReagent
@@ -40,6 +39,16 @@ from .visualizations.control_charts import create_charts, construct_html
logger = logging.getLogger(f"submissions.{__name__}")
def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]:
"""
_summary_
Args:
obj (QMainWindow): _description_
Returns:
Tuple[QMainWindow, dict|None]: _description_
"""
logger.debug(f"\n\nStarting Import...\n\n")
result = None
logger.debug(obj.ctx)
# initialize samples
@@ -57,102 +66,192 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return
if prsr.sub['rsl_plate_num'] == None:
prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name
logger.debug(f"prsr.sub = {prsr.sub}")
for sample in prsr.sub['samples']:
try:
pyd = prsr.to_pydantic()
logger.debug(f"Pydantic result: \n\n{pyd}\n\n")
# with open("pickled.pkl", "wb") as f:
# pickle.dump(pyd, f)
except Exception as e:
return obj, dict(message= f"Problem creating pydantic model:\n\n{e}", status="critical")
# moved to pydantic model
# if prsr.sub['rsl_plate_num'] == None:
# prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name
# logger.debug(f"prsr.sub = {prsr.sub}")
for sample in pyd.samples:
if hasattr(sample, "elution_well"):
logger.debug(f"Sample from import: {sample.elution_well}")
obj.current_submission_type = prsr.sub['submission_type']
# obj.current_submission_type = prsr.sub['submission_type']
obj.current_submission_type = pyd.submission_type
# destroy any widgets from previous imports
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
# regex to parser out different variable types for decision making
variable_parser = re.compile(r"""
(?P<extraction_kit>^extraction_kit$) |
(?P<submitted_date>^submitted_date$) |
(?P<submitting_lab>)^submitting_lab$ |
(?P<samples>)^samples$ |
(?P<reagent>^lot_.*$) |
(?P<csv>^csv$)
""", re.VERBOSE)
for item in prsr.sub:
logger.debug(f"Item: {item}")
# attempt to match variable name to regex group
try:
mo = variable_parser.fullmatch(item).lastgroup
except AttributeError:
mo = "other"
logger.debug(f"Mo: {mo}")
match mo:
# # regex to parser out different variable types for decision making
# variable_parser = re.compile(r"""
# (?P<extraction_kit>^extraction_kit$) |
# (?P<submitted_date>^submitted_date$) |
# (?P<submitting_lab>)^submitting_lab$ |
# (?P<samples>)^samples$ |
# (?P<reagent>^lot_.*$) |
# (?P<csv>^csv$)
# """, re.VERBOSE)
# for item in prsr.sub:
# logger.debug(f"Item: {item}")
# # attempt to match variable name to regex group
# try:
# mo = variable_parser.fullmatch(item).lastgroup
# except AttributeError:
# mo = "other"
# logger.debug(f"Mo: {mo}")
# match mo:
# case 'submitting_lab':
# # create label
# obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# logger.debug(f"{item}: {prsr.sub[item]}")
# # create combobox to hold looked up submitting labs
# add_widget = QComboBox()
# labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
# # try to set closest match to top of list
# try:
# labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0)
# except (TypeError, ValueError):
# pass
# # set combobox values to lookedup values
# add_widget.addItems(labs)
# case 'extraction_kit':
# # create label
# obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# # if extraction kit not available, all other values fail
# if not check_not_nan(prsr.sub[item]):
# msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
# msg.exec()
# # create combobox to hold looked up kits
# add_widget = QComboBox()
# # lookup existing kits by 'submission_type' decided on by sheetparser
# uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])]
# if check_not_nan(prsr.sub[item]):
# logger.debug(f"The extraction kit in parser was: {prsr.sub[item]}")
# uses.insert(0, uses.pop(uses.index(prsr.sub[item])))
# obj.ext_kit = prsr.sub[item]
# else:
# logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
# obj.ext_kit = uses[0]
# add_widget.addItems(uses)
# case 'submitted_date':
# # create label
# obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# # uses base calendar
# add_widget = QDateEdit(calendarPopup=True)
# # sets submitted date based on date found in excel sheet
# try:
# add_widget.setDate(prsr.sub[item])
# # if not found, use today
# except:
# add_widget.setDate(date.today())
# case 'reagent':
# # create label
# reg_label = QLabel(item.replace("_", " ").title())
# reg_label.setObjectName(f"lot_{item}_label")
# obj.table_widget.formlayout.addWidget(reg_label)
# # create reagent choice widget
# add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr)
# obj.reagents[item] = prsr.sub[item]
# case 'samples':
# # hold samples in 'obj' until form submitted
# logger.debug(f"{item}: {prsr.sub[item]}")
# obj.samples = prsr.sub[item]
# add_widget = None
# case 'csv':
# obj.csv = prsr.sub[item]
# case _:
# # anything else gets added in as a line edit
# obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# add_widget = QLineEdit()
# logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}")
# add_widget.setText(str(prsr.sub[item]).replace("_", " "))
fields = list(pyd.model_fields.keys())
fields.remove('filepath')
logger.debug(f"pydantic fields: {fields}")
for field in fields:
value = getattr(pyd, field)
if not check_not_nan(value):
continue
match field:
case 'submitting_lab':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
logger.debug(f"{item}: {prsr.sub[item]}")
label = QLabel(field.replace("_", " ").title())
logger.debug(f"{field}: {value}")
# create combobox to hold looked up submitting labs
add_widget = QComboBox()
labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
# try to set closest match to top of list
try:
labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0)
labs = difflib.get_close_matches(value, labs, len(labs), 0)
except (TypeError, ValueError):
pass
# set combobox values to lookedup values
add_widget.addItems(labs)
case 'extraction_kit':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
label = QLabel(field.replace("_", " ").title())
# if extraction kit not available, all other values fail
if not check_not_nan(prsr.sub[item]):
if not check_not_nan(value):
msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
msg.exec()
# create combobox to hold looked up kits
add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])]
if check_not_nan(prsr.sub[item]):
logger.debug(f"The extraction kit in parser was: {prsr.sub[item]}")
uses.insert(0, uses.pop(uses.index(prsr.sub[item])))
obj.ext_kit = prsr.sub[item]
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type)]
if check_not_nan(value):
logger.debug(f"The extraction kit in parser was: {value}")
uses.insert(0, uses.pop(uses.index(value)))
obj.ext_kit = value
else:
logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
obj.ext_kit = uses[0]
add_widget.addItems(uses)
case 'submitted_date':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
label = QLabel(field.replace("_", " ").title())
# uses base calendar
add_widget = QDateEdit(calendarPopup=True)
# sets submitted date based on date found in excel sheet
try:
add_widget.setDate(prsr.sub[item])
add_widget.setDate(value)
# if not found, use today
except:
add_widget.setDate(date.today())
case 'reagent':
# create label
reg_label = QLabel(item.replace("_", " ").title())
reg_label.setObjectName(f"lot_{item}_label")
obj.table_widget.formlayout.addWidget(reg_label)
# create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr)
obj.reagents[item] = prsr.sub[item]
case 'samples':
# hold samples in 'obj' until form submitted
logger.debug(f"{item}: {prsr.sub[item]}")
obj.samples = prsr.sub[item]
add_widget = None
logger.debug(f"{field}:\n\t{value}")
obj.samples = value
continue
case 'csv':
obj.csv = prsr.sub[item]
obj.csv = value
continue
case 'reagents':
for reagent in value:
# create label
reg_label = QLabel(reagent['type'].replace("_", " ").title())
reg_label.setObjectName(f"lot_{reagent['type']}_label")
# obj.table_widget.formlayout.addWidget(reg_label)
# create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent)
add_widget.setObjectName(f"lot_{reagent['type']}")
logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(reg_label)
obj.table_widget.formlayout.addWidget(add_widget)
obj.reagents[reagent['type']] = reagent['lot']
continue
case _:
# anything else gets added in as a line edit
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
label = QLabel(field.replace("_", " ").title())
add_widget = QLineEdit()
logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}")
add_widget.setText(str(prsr.sub[item]).replace("_", " "))
logger.debug(f"Setting widget text to {str(value).replace('_', ' ')}")
add_widget.setText(str(value).replace("_", " "))
try:
add_widget.setObjectName(item)
add_widget.setObjectName(field)
logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(label)
obj.table_widget.formlayout.addWidget(add_widget)
except AttributeError as e:
logger.error(e)
@@ -199,7 +298,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
result = dict(message=kit_integrity['message'], status="Warning")
for item in kit_integrity['missing']:
obj.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}"))
add_widget = ImportReagent(ctx=obj.ctx, item=item)
add_widget = ImportReagent(ctx=obj.ctx, reagent=dict(type=item, lot=None, exp=None))#item=item)
obj.table_widget.formlayout.addWidget(add_widget)
submit_btn = QPushButton("Submit")
submit_btn.setObjectName("lot_submit_btn")
@@ -208,6 +307,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
return obj, result
def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
logger.debug(f"\n\nBeginning Submission\n\n")
result = None
# extract info from the form widgets
info = extract_form_info(obj.table_widget.tab1)
@@ -219,7 +319,8 @@ def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
parsed_reagents = []
# compare reagents in form to reagent database
for reagent in reagents:
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent])
# TODO: have this lookup by type and lot
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent)
logger.debug(f"Looked up reagent: {wanted_reagent}")
# if reagent not found offer to add to database
if wanted_reagent == None:
@@ -431,7 +532,7 @@ def chart_maker_function(obj:QMainWindow) -> QMainWindow:
data = [control.convert_by_mode(mode=obj.mode) for control in controls]
# flatten data to one dimensional list
data = [item for sublist in data for item in sublist]
logger.debug(f"Control objects going into df conversion: {data}")
logger.debug(f"Control objects going into df conversion: {type(data)}")
if data == []:
return obj, dict(status="Critical", message="No data found for controls in given date range.")
# send to dataframe creator