From 1c804bfc6abe23cd31bf96b64baa2238e755dd19 Mon Sep 17 00:00:00 2001 From: Landon Wark Date: Fri, 7 Jul 2023 14:27:26 -0500 Subject: [PATCH] Pydantic added for validation. --- CHANGELOG.md | 3 + TODO.md | 5 + alembic.ini | 4 +- ...dd731ff63_added_required_to_reagenttype.py | 42 ++++ src/submissions/backend/db/functions.py | 45 ++-- src/submissions/backend/db/models/kits.py | 15 +- src/submissions/backend/excel/parser.py | 39 +++- src/submissions/backend/excel/reports.py | 71 ++++-- src/submissions/backend/pydant/__init__.py | 65 ++++++ .../frontend/custom_widgets/misc.py | 127 +++++++---- .../frontend/main_window_functions.py | 213 +++++++++++++----- src/submissions/tools/__init__.py | 9 +- 12 files changed, 497 insertions(+), 141 deletions(-) create mode 100644 alembic/versions/7aadd731ff63_added_required_to_reagenttype.py create mode 100644 src/submissions/backend/pydant/__init__.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d1e64c..5209313 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 202307.01 +- Fixed bug where date increment of controls not working for multiple same dates. +- Fixed bug by having lookup of reagents by lot *and* reagenttype instead of just lot. +- Added in pydantic to validate submission info. - Moved parser to metadata based recognition of submission type. ## 202306.03 diff --git a/TODO.md b/TODO.md index ca56500..85324ba 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,8 @@ +- [ ] Think about trying to migrate required column in reagenttypes to reagenttypes_kittypes + - In case reagent type is required for one kit, but not another. Possible? +- [x] Insert column into reagent type to indicate if reagent is required for kit. + - Needed to keep interchangeable bead plates from being forced into forms. +- [ ] Migrate context settings to pydantic-settings model. - [ ] Migrate the parser.sub dictionary to pydantic models. - [x] Move type_decider to metadata based method rather than excel map. - [x] Solve bug for plate mapping when two samples of same name are in different rows. diff --git a/alembic.ini b/alembic.ini index b8f47d4..986a474 100644 --- a/alembic.ini +++ b/alembic.ini @@ -55,8 +55,8 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne # are written from script.py.mako # output_encoding = utf-8 -; sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db -sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\DB_backups\submissions-20230605.db +sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db +; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\DB_backups\submissions-20230705.db ; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\tests\test_assets\submissions_test.db diff --git a/alembic/versions/7aadd731ff63_added_required_to_reagenttype.py b/alembic/versions/7aadd731ff63_added_required_to_reagenttype.py new file mode 100644 index 0000000..43cbe24 --- /dev/null +++ b/alembic/versions/7aadd731ff63_added_required_to_reagenttype.py @@ -0,0 +1,42 @@ +"""added required to reagenttype + +Revision ID: 7aadd731ff63 +Revises: 8d32abdafe2b +Create Date: 2023-07-06 07:58:36.545604 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '7aadd731ff63' +down_revision = '8d32abdafe2b' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('_reagent_types', schema=None) as batch_op: + batch_op.add_column(sa.Column('required', sa.INTEGER(), nullable=True)) + + # with op.batch_alter_table('_submissions', schema=None) as batch_op: + # batch_op.alter_column('rsl_plate_num', + # existing_type=sa.VARCHAR(length=32), + # nullable=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # with op.batch_alter_table('_submissions', schema=None) as batch_op: + # batch_op.alter_column('rsl_plate_num', + # existing_type=sa.VARCHAR(length=32), + # nullable=True) + + with op.batch_alter_table('_reagent_types', schema=None) as batch_op: + batch_op.drop_column('required') + + # ### end Alembic commands ### diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py index 06db8c1..3236628 100644 --- a/src/submissions/backend/db/functions.py +++ b/src/submissions/backend/db/functions.py @@ -233,19 +233,19 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: # pass return reagent -def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: - """ - Query db for reagent based on lot number +# def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: +# """ +# Query db for reagent based on lot number - Args: - ctx (dict): settings passed down from gui - reagent_lot (str): lot number to query +# Args: +# ctx (dict): settings passed down from gui +# reagent_lot (str): lot number to query - Returns: - models.Reagent: looked up reagent - """ - lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() - return lookedup +# Returns: +# models.Reagent: looked up reagent +# """ +# lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() +# return lookedup def get_all_reagenttype_names(ctx:dict) -> list[str]: """ @@ -501,7 +501,7 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: r = massage_common_reagents(r) look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first() if look_up == None: - rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit]) + rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit], required=1) else: rt = look_up rt.kits.append(kit) @@ -609,7 +609,7 @@ def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all() else: output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all() - logger.debug(f"Returned controls between dates: {output}") + logger.debug(f"Returned controls between dates: {[item.submitted_date for item in output]}") return output def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: @@ -870,4 +870,21 @@ def platemap_plate(submission:models.BasicSubmission) -> list: plate_dicto.append(this_sample) # append to all samples # image = make_plate_map(plate_dicto) - return plate_dicto \ No newline at end of file + return plate_dicto + + +def lookup_reagent(ctx:dict, reagent_lot:str|None=None, type_name:str|None=None) -> models.Reagent: + """ + Query db for reagent based on lot number + + Args: + ctx (dict): settings passed down from gui + reagent_lot (str): lot number to query + + Returns: + models.Reagent: looked up reagent + """ + if reagent_lot != None and type_name != None: + return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).all() + elif type_name == None: + return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() \ No newline at end of file diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 43730d1..caf0fc6 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -2,8 +2,8 @@ All kit and reagent related models ''' from . import Base -from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT -from sqlalchemy.orm import relationship +from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, CheckConstraint +from sqlalchemy.orm import relationship, validates from datetime import date import logging @@ -54,6 +54,17 @@ class ReagentType(Base): kits = relationship("KitType", back_populates="reagent_types", uselist=True, foreign_keys=[kit_id]) #: kits this reagent is used in instances = relationship("Reagent", back_populates="type") #: concrete instances of this reagent type eol_ext = Column(Interval()) #: extension of life interval + required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit + # __table_args__ = ( + # CheckConstraint(required >= 0, name='check_required_positive'), + # CheckConstraint(required < 2, name='check_required_less_2'), + # {}) + + @validates('required') + def validate_age(self, key, value): + if not 0 <= value < 2: + raise ValueError(f'Invalid required value {value}') + return value def __str__(self) -> str: """ diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 4ff1f17..a3ef871 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -8,6 +8,7 @@ import pandas as pd from pathlib import Path from backend.db.models import WWSample, BCSample from backend.db import lookup_ww_sample_by_ww_sample_num +from backend.pydant import PydSubmission import logging from collections import OrderedDict import re @@ -149,19 +150,22 @@ class SheetParser(object): else: logger.debug(f"Date: {row[3]}") expiry = date.today() - self.sub[f"lot_{reagent_type}"] = {'lot':output_var, 'exp':expiry} + # self.sub[f"lot_{reagent_type}"] = {'lot':output_var, 'exp':expiry} + self.sub['reagents'].append(dict(type=reagent_type, lot=output_var, exp=expiry)) submission_info = self.parse_generic("Sample List") # iloc is [row][column] and the first row is set as header row so -2 tech = str(submission_info.iloc[11][1]) - if tech == "nan": - tech = "Unknown" - elif len(tech.split(",")) > 1: - tech_reg = re.compile(r"[A-Z]{2}") - tech = ", ".join(tech_reg.findall(tech)) + # moved to pydantic model + # if tech == "nan": + # tech = "Unknown" + # elif len(tech.split(",")) > 1: + # tech_reg = re.compile(r"[A-Z]{2}") + # tech = ", ".join(tech_reg.findall(tech)) self.sub['technician'] = tech # reagents # must be prefixed with 'lot_' to be recognized by gui - # Todo: find a more adaptable way to read reagents. + # TODO: find a more adaptable way to read reagents. + self.sub['reagents'] = [] reagent_range = submission_info.iloc[1:14, 4:8] logger.debug(reagent_range) parse_reagents(reagent_range) @@ -210,7 +214,8 @@ class SheetParser(object): expiry = date.today() else: expiry = date.today() - self.sub[f"lot_{output_key}"] = {'lot':output_var, 'exp':expiry} + # self.sub[f"lot_{output_key}"] = {'lot':output_var, 'exp':expiry} + self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry)) # parse submission sheet submission_info = self.parse_generic("WW Submissions (ENTER HERE)") # parse enrichment sheet @@ -227,6 +232,7 @@ class SheetParser(object): pcr_reagent_range = qprc_info.iloc[0:5, 9:20] # compile technician info self.sub['technician'] = f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}" + self.sub['reagents'] = [] parse_reagents(enr_reagent_range) parse_reagents(ext_reagent_range) parse_reagents(pcr_reagent_range) @@ -271,7 +277,7 @@ class SheetParser(object): else: logger.debug(f"Date: {row[2]}") expiry = date.today() - self.sub[f"lot_{output_key}"] = {'lot':output_var, 'exp':expiry} + self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry)) else: continue def massage_samples(df:pd.DataFrame) -> pd.DataFrame: @@ -303,6 +309,7 @@ class SheetParser(object): self.sub['sample_count'] = submission_info.iloc[4][6] self.sub['extraction_kit'] = "ArticV4.1" self.sub['technician'] = f"MM: {biomek_info.iloc[2][1]}, Bio: {biomek_info.iloc[3][1]}" + self.sub['reagents'] = [] parse_reagents(sub_reagent_range) parse_reagents(biomek_reagent_range) samples = massage_samples(biomek_info.iloc[22:31, 0:]) @@ -311,6 +318,18 @@ class SheetParser(object): self.sample_result, self.sub['samples'] = sample_parse() + def to_pydantic(self) -> PydSubmission: + """ + Generates a pydantic model of scraped data for validation + + Returns: + PydSubmission: output pydantic model + """ + psm = PydSubmission(filepath=self.filepath, **self.sub) + delattr(psm, "filepath") + return psm + + class SampleParser(object): """ @@ -366,7 +385,7 @@ class SampleParser(object): list[WWSample]: list of sample objects """ def search_df_for_sample(sample_rsl:str): - logger.debug(f"Attempting to find sample {sample_rsl} in \n {self.elution_map}") + # logger.debug(f"Attempting to find sample {sample_rsl} in \n {self.elution_map}") well = self.elution_map.where(self.elution_map==sample_rsl) # logger.debug(f"Well: {well}") well = well.dropna(how='all').dropna(axis=1, how="all") diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index e5d54c1..efa1c0d 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -9,6 +9,7 @@ import sys from pathlib import Path import re from tools import check_if_app +from typing import Tuple logger = logging.getLogger(f"submissions.{__name__}") @@ -154,23 +155,61 @@ def displace_date(df:DataFrame) -> DataFrame: # get submitted dates for each control dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in sorted(df['name'].unique())] previous_dates = [] - for ii, item in enumerate(dict_list): - try: - # check = item['date'] == dict_list[ii-1]['date'] - check = item['date'] in previous_dates - except IndexError: - check = False - if check: - # occurences = previous_dates.count(item['date']) - logger.debug(f"We found one! Increment date!\n\t{item['date'] - timedelta(days=1)}") - # get df locations where name == item name - mask = df['name'] == item['name'] - # increment date in dataframe - df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1)) - previous_dates.append(item['date'] + timedelta(days=1)) - else: - previous_dates.append(item['date']) + for _, item in enumerate(dict_list): + # try: + # # check = item['date'] == dict_list[ii-1]['date'] + # check = item['date'] in previous_dates + # except IndexError: + # check = False + # if check: + # # occurences = previous_dates.count(item['date']) + # logger.debug(f"We found one! Increment date!\n\t{item['date']} to {item['date'] + timedelta(days=1)}") + # # get df locations where name == item name + # mask = df['name'] == item['name'] + # # increment date in dataframe + # df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1)) + # outdate = item['date'] + timedelta(days=1) + # # previous_dates.append(item['date'] + timedelta(days=1)) + # else: + # outdate = item['date'] + # previous_dates.append(outdate) + # logger.debug(f"\n\tCurrent date: {outdate}\n\tPrevious dates:{previous_dates}") + # logger.debug(type(item)) + df, previous_dates = check_date(df=df, item=item, previous_dates=previous_dates) return df + +def check_date(df:DataFrame, item:dict, previous_dates:list) -> Tuple[DataFrame, list]: + + try: + # check = item['date'] == dict_list[ii-1]['date'] + check = item['date'] in previous_dates + except IndexError: + check = False + previous_dates.append(item['date']) + if check: + # occurences = previous_dates.count(item['date']) + logger.debug(f"We found one! Increment date!\n\t{item['date']} to {item['date'] + timedelta(days=1)}") + # get df locations where name == item name + mask = df['name'] == item['name'] + # increment date in dataframe + df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1)) + + item['date'] += timedelta(days=1) + # previous_dates.append(item['date'] + timedelta(days=1)) + passed = False + else: + passed = True + logger.debug(f"\n\tCurrent date: {item['date']}\n\tPrevious dates:{previous_dates}") + logger.debug(f"DF: {type(df)}, previous_dates: {type(previous_dates)}") + # if run didn't lead to changed date, return values + if passed: + logger.debug(f"Date check passed, returning.") + return df, previous_dates + # if date was changed, rerun with new date + else: + logger.warning(f"Date check failed, running recursion") + df, previous_dates = check_date(df, item, previous_dates) + return df, previous_dates def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list: diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py new file mode 100644 index 0000000..62d6270 --- /dev/null +++ b/src/submissions/backend/pydant/__init__.py @@ -0,0 +1,65 @@ +import uuid +from pydantic import BaseModel, validator +from datetime import date +from typing import List, Any +from tools import RSLNamer +from pathlib import Path +import re +import logging + +logger = logging.getLogger(f"submissions.{__name__}") + +class PydSubmission(BaseModel): + filepath: Path + submission_type: str + submitter_plate_num: str|None + rsl_plate_num: str + submitted_date: date + submitting_lab: str + sample_count: int + extraction_kit: str + technician: str + reagents: List[dict] + samples: List[Any] + + @validator("submitted_date", pre=True) + @classmethod + def strip_datetime_string(cls, value): + return re.sub(r"_\d$", "", value) + + @validator("submitter_plate_num") + @classmethod + def enforce_with_uuid(cls, value): + if value == None or value == "" or value == "None": + return uuid.uuid4().hex.upper() + + @validator("rsl_plate_num", pre=True) + @classmethod + def rsl_from_file(cls, value, values): + if value == None: + logger.debug(f"Pydant values:\n{values}") + return RSLNamer(values['filepath'].__str__()).parsed_name + else: + return value + + @validator("technician") + @classmethod + def enforce_tech(cls, value): + if value == "nan" or value == "None": + value = "Unknown" + # elif len(value.split(",")) > 1: + # tech_reg = re.compile(r"\b[A-Z]{2}\b") + # value = ", ".join(tech_reg.findall(value)) + return value + + @validator("reagents") + @classmethod + def remove_atcc(cls, value): + return_val = [] + for reagent in value: + match reagent['type']: + case 'atcc': + continue + case _: + return_val.append(reagent) + return return_val diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index 36f0e61..0898d51 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -2,20 +2,23 @@ Contains miscellaneous widgets for frontend functions ''' from datetime import date -import typing +import difflib +from typing import Tuple from PyQt6.QtWidgets import ( QLabel, QVBoxLayout, QLineEdit, QComboBox, QDialog, QDialogButtonBox, QDateEdit, QSizePolicy, QWidget, QGridLayout, QPushButton, QSpinBox, QDoubleSpinBox, - QHBoxLayout, + QHBoxLayout, QMainWindow ) from PyQt6.QtCore import Qt, QDate, QSize # from submissions.backend.db.functions import lookup_kittype_by_use # from submissions.backend.db import lookup_regent_by_type_name_and_kit_name from tools import check_not_nan from ..all_window_functions import extract_form_info -from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_kittype_by_use#, lookup_regent_by_type_name_and_kit_name +from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, \ + lookup_regent_by_type_name, lookup_kittype_by_use, lookup_all_orgs + #, lookup_regent_by_type_name_and_kit_name from backend.excel.parser import SheetParser from jinja2 import Environment, FileSystemLoader import sys @@ -297,55 +300,103 @@ class ControlsDatePicker(QWidget): return QSize(80,20) +# class ImportReagent(QComboBox): + +# def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None): +# super().__init__() +# self.setEditable(True) +# # Ensure that all reagenttypes have a name that matches the items in the excel parser +# query_var = item.replace("lot_", "") +# if prsr != None: +# logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}") +# else: +# logger.debug(f"Import Reagent is going to retrieve all reagents for {item}") +# logger.debug(f"Query for: {query_var}") +# if prsr != None: +# if isinstance(prsr.sub[item], np.float64): +# logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!") +# try: +# prsr.sub[item] = int(prsr.sub[item]['lot']) +# except ValueError: +# pass +# # query for reagents using type name from sheet and kit from sheet +# logger.debug(f"Attempting lookup of reagents by type: {query_var}") +# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. +# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])] +# # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])] +# output_reg = [] +# for reagent in relevant_reagents: +# # extract strings from any sets. +# if isinstance(reagent, set): +# for thing in reagent: +# output_reg.append(thing) +# elif isinstance(reagent, str): +# output_reg.append(reagent) +# relevant_reagents = output_reg +# # if reagent in sheet is not found insert it into the front of relevant reagents so it shows +# if prsr != None: +# logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}") +# if str(prsr.sub[item]['lot']) not in relevant_reagents: +# if check_not_nan(prsr.sub[item]['lot']): +# relevant_reagents.insert(0, str(prsr.sub[item]['lot'])) +# else: +# if len(relevant_reagents) > 1: +# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.") +# idx = relevant_reagents.index(str(prsr.sub[item]['lot'])) +# logger.debug(f"The index we got for {prsr.sub[item]['lot']} in {relevant_reagents} was {idx}") +# moved_reag = relevant_reagents.pop(idx) +# relevant_reagents.insert(0, moved_reag) +# else: +# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.") +# logger.debug(f"New relevant reagents: {relevant_reagents}") +# self.setObjectName(f"lot_{item}") +# self.addItems(relevant_reagents) + + class ImportReagent(QComboBox): - def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None): + def __init__(self, ctx:dict, reagent:dict): super().__init__() self.setEditable(True) # Ensure that all reagenttypes have a name that matches the items in the excel parser - query_var = item.replace("lot_", "") - if prsr != None: - logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}") - else: - logger.debug(f"Import Reagent is going to retrieve all reagents for {item}") - logger.debug(f"Query for: {query_var}") - if prsr != None: - if isinstance(prsr.sub[item], np.float64): - logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!") - try: - prsr.sub[item] = int(prsr.sub[item]['lot']) - except ValueError: - pass + query_var = reagent['type'] + logger.debug(f"Import Reagent is looking at: {reagent['lot']} for {reagent['type']}") + + if isinstance(reagent['lot'], np.float64): + logger.debug(f"{reagent['lot']} is a numpy float!") + try: + reagent['lot'] = int(reagent['lot']) + except ValueError: + pass # query for reagents using type name from sheet and kit from sheet logger.debug(f"Attempting lookup of reagents by type: {query_var}") # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])] # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])] output_reg = [] - for reagent in relevant_reagents: + for rel_reagent in relevant_reagents: # extract strings from any sets. - if isinstance(reagent, set): - for thing in reagent: + if isinstance(rel_reagent, set): + for thing in rel_reagent: output_reg.append(thing) - elif isinstance(reagent, str): - output_reg.append(reagent) + elif isinstance(rel_reagent, str): + output_reg.append(rel_reagent) relevant_reagents = output_reg # if reagent in sheet is not found insert it into the front of relevant reagents so it shows - if prsr != None: - logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}") - if str(prsr.sub[item]['lot']) not in relevant_reagents: - if check_not_nan(prsr.sub[item]['lot']): - relevant_reagents.insert(0, str(prsr.sub[item]['lot'])) + # if prsr != None: + logger.debug(f"Relevant reagents for {reagent['lot']}: {relevant_reagents}") + if str(reagent['lot']) not in relevant_reagents: + if check_not_nan(reagent['lot']): + relevant_reagents.insert(0, str(reagent['lot'])) + else: + if len(relevant_reagents) > 1: + logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.") + idx = relevant_reagents.index(str(reagent['lot'])) + logger.debug(f"The index we got for {reagent['lot']} in {relevant_reagents} was {idx}") + moved_reag = relevant_reagents.pop(idx) + relevant_reagents.insert(0, moved_reag) else: - if len(relevant_reagents) > 1: - logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.") - idx = relevant_reagents.index(str(prsr.sub[item]['lot'])) - logger.debug(f"The index we got for {prsr.sub[item]['lot']} in {relevant_reagents} was {idx}") - moved_reag = relevant_reagents.pop(idx) - relevant_reagents.insert(0, moved_reag) - else: - logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.") + logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.") logger.debug(f"New relevant reagents: {relevant_reagents}") - self.setObjectName(f"lot_{item}") - self.addItems(relevant_reagents) - + self.setObjectName(f"lot_{reagent['type']}") + self.addItems(relevant_reagents) \ No newline at end of file diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index 29bd489..28d65b6 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -7,7 +7,6 @@ from getpass import getuser import inspect from pathlib import Path import pprint -import re import yaml import json from typing import Tuple @@ -30,7 +29,7 @@ from backend.db.functions import ( ) from backend.excel.parser import SheetParser, PCRParser from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df -from tools import RSLNamer, check_not_nan, check_kit_integrity +from tools import check_not_nan, check_kit_integrity from .custom_widgets.pop_ups import AlertPop, QuestionAsker from .custom_widgets import ReportDatePicker from .custom_widgets.misc import ImportReagent @@ -40,6 +39,16 @@ from .visualizations.control_charts import create_charts, construct_html logger = logging.getLogger(f"submissions.{__name__}") def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]: + """ + _summary_ + + Args: + obj (QMainWindow): _description_ + + Returns: + Tuple[QMainWindow, dict|None]: _description_ + """ + logger.debug(f"\n\nStarting Import...\n\n") result = None logger.debug(obj.ctx) # initialize samples @@ -57,102 +66,192 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] except PermissionError: logger.error(f"Couldn't get permission to access file: {fname}") return - if prsr.sub['rsl_plate_num'] == None: - prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name - logger.debug(f"prsr.sub = {prsr.sub}") - for sample in prsr.sub['samples']: + try: + pyd = prsr.to_pydantic() + logger.debug(f"Pydantic result: \n\n{pyd}\n\n") + # with open("pickled.pkl", "wb") as f: + # pickle.dump(pyd, f) + except Exception as e: + return obj, dict(message= f"Problem creating pydantic model:\n\n{e}", status="critical") + # moved to pydantic model + # if prsr.sub['rsl_plate_num'] == None: + # prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name + # logger.debug(f"prsr.sub = {prsr.sub}") + for sample in pyd.samples: if hasattr(sample, "elution_well"): logger.debug(f"Sample from import: {sample.elution_well}") - obj.current_submission_type = prsr.sub['submission_type'] + # obj.current_submission_type = prsr.sub['submission_type'] + obj.current_submission_type = pyd.submission_type # destroy any widgets from previous imports for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): item.setParent(None) - # regex to parser out different variable types for decision making - variable_parser = re.compile(r""" - (?P^extraction_kit$) | - (?P^submitted_date$) | - (?P)^submitting_lab$ | - (?P)^samples$ | - (?P^lot_.*$) | - (?P^csv$) - """, re.VERBOSE) - for item in prsr.sub: - logger.debug(f"Item: {item}") - # attempt to match variable name to regex group - try: - mo = variable_parser.fullmatch(item).lastgroup - except AttributeError: - mo = "other" - logger.debug(f"Mo: {mo}") - match mo: + # # regex to parser out different variable types for decision making + # variable_parser = re.compile(r""" + # (?P^extraction_kit$) | + # (?P^submitted_date$) | + # (?P)^submitting_lab$ | + # (?P)^samples$ | + # (?P^lot_.*$) | + # (?P^csv$) + # """, re.VERBOSE) + # for item in prsr.sub: + # logger.debug(f"Item: {item}") + # # attempt to match variable name to regex group + # try: + # mo = variable_parser.fullmatch(item).lastgroup + # except AttributeError: + # mo = "other" + # logger.debug(f"Mo: {mo}") + # match mo: + # case 'submitting_lab': + # # create label + # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # logger.debug(f"{item}: {prsr.sub[item]}") + # # create combobox to hold looked up submitting labs + # add_widget = QComboBox() + # labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)] + # # try to set closest match to top of list + # try: + # labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0) + # except (TypeError, ValueError): + # pass + # # set combobox values to lookedup values + # add_widget.addItems(labs) + # case 'extraction_kit': + # # create label + # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # # if extraction kit not available, all other values fail + # if not check_not_nan(prsr.sub[item]): + # msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") + # msg.exec() + # # create combobox to hold looked up kits + # add_widget = QComboBox() + # # lookup existing kits by 'submission_type' decided on by sheetparser + # uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])] + # if check_not_nan(prsr.sub[item]): + # logger.debug(f"The extraction kit in parser was: {prsr.sub[item]}") + # uses.insert(0, uses.pop(uses.index(prsr.sub[item]))) + # obj.ext_kit = prsr.sub[item] + # else: + # logger.error(f"Couldn't find {prsr.sub['extraction_kit']}") + # obj.ext_kit = uses[0] + # add_widget.addItems(uses) + # case 'submitted_date': + # # create label + # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # # uses base calendar + # add_widget = QDateEdit(calendarPopup=True) + # # sets submitted date based on date found in excel sheet + # try: + # add_widget.setDate(prsr.sub[item]) + # # if not found, use today + # except: + # add_widget.setDate(date.today()) + # case 'reagent': + # # create label + # reg_label = QLabel(item.replace("_", " ").title()) + # reg_label.setObjectName(f"lot_{item}_label") + # obj.table_widget.formlayout.addWidget(reg_label) + # # create reagent choice widget + # add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr) + # obj.reagents[item] = prsr.sub[item] + # case 'samples': + # # hold samples in 'obj' until form submitted + # logger.debug(f"{item}: {prsr.sub[item]}") + # obj.samples = prsr.sub[item] + # add_widget = None + # case 'csv': + # obj.csv = prsr.sub[item] + # case _: + # # anything else gets added in as a line edit + # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # add_widget = QLineEdit() + # logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}") + # add_widget.setText(str(prsr.sub[item]).replace("_", " ")) + fields = list(pyd.model_fields.keys()) + fields.remove('filepath') + logger.debug(f"pydantic fields: {fields}") + for field in fields: + value = getattr(pyd, field) + if not check_not_nan(value): + continue + match field: case 'submitting_lab': # create label - obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) - logger.debug(f"{item}: {prsr.sub[item]}") + label = QLabel(field.replace("_", " ").title()) + logger.debug(f"{field}: {value}") # create combobox to hold looked up submitting labs add_widget = QComboBox() labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)] # try to set closest match to top of list try: - labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0) + labs = difflib.get_close_matches(value, labs, len(labs), 0) except (TypeError, ValueError): pass # set combobox values to lookedup values add_widget.addItems(labs) case 'extraction_kit': # create label - obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + label = QLabel(field.replace("_", " ").title()) # if extraction kit not available, all other values fail - if not check_not_nan(prsr.sub[item]): + if not check_not_nan(value): msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") msg.exec() # create combobox to hold looked up kits add_widget = QComboBox() # lookup existing kits by 'submission_type' decided on by sheetparser - uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])] - if check_not_nan(prsr.sub[item]): - logger.debug(f"The extraction kit in parser was: {prsr.sub[item]}") - uses.insert(0, uses.pop(uses.index(prsr.sub[item]))) - obj.ext_kit = prsr.sub[item] + uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type)] + if check_not_nan(value): + logger.debug(f"The extraction kit in parser was: {value}") + uses.insert(0, uses.pop(uses.index(value))) + obj.ext_kit = value else: logger.error(f"Couldn't find {prsr.sub['extraction_kit']}") obj.ext_kit = uses[0] add_widget.addItems(uses) case 'submitted_date': # create label - obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + label = QLabel(field.replace("_", " ").title()) # uses base calendar add_widget = QDateEdit(calendarPopup=True) # sets submitted date based on date found in excel sheet try: - add_widget.setDate(prsr.sub[item]) + add_widget.setDate(value) # if not found, use today except: add_widget.setDate(date.today()) - case 'reagent': - # create label - reg_label = QLabel(item.replace("_", " ").title()) - reg_label.setObjectName(f"lot_{item}_label") - obj.table_widget.formlayout.addWidget(reg_label) - # create reagent choice widget - add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr) - obj.reagents[item] = prsr.sub[item] case 'samples': # hold samples in 'obj' until form submitted - logger.debug(f"{item}: {prsr.sub[item]}") - obj.samples = prsr.sub[item] - add_widget = None + logger.debug(f"{field}:\n\t{value}") + obj.samples = value + continue case 'csv': - obj.csv = prsr.sub[item] + obj.csv = value + continue + case 'reagents': + for reagent in value: + # create label + reg_label = QLabel(reagent['type'].replace("_", " ").title()) + reg_label.setObjectName(f"lot_{reagent['type']}_label") + # obj.table_widget.formlayout.addWidget(reg_label) + # create reagent choice widget + add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent) + add_widget.setObjectName(f"lot_{reagent['type']}") + logger.debug(f"Widget name set to: {add_widget.objectName()}") + obj.table_widget.formlayout.addWidget(reg_label) + obj.table_widget.formlayout.addWidget(add_widget) + obj.reagents[reagent['type']] = reagent['lot'] + continue case _: # anything else gets added in as a line edit - obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + label = QLabel(field.replace("_", " ").title()) add_widget = QLineEdit() - logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}") - add_widget.setText(str(prsr.sub[item]).replace("_", " ")) + logger.debug(f"Setting widget text to {str(value).replace('_', ' ')}") + add_widget.setText(str(value).replace("_", " ")) try: - add_widget.setObjectName(item) + add_widget.setObjectName(field) logger.debug(f"Widget name set to: {add_widget.objectName()}") + obj.table_widget.formlayout.addWidget(label) obj.table_widget.formlayout.addWidget(add_widget) except AttributeError as e: logger.error(e) @@ -199,7 +298,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow: result = dict(message=kit_integrity['message'], status="Warning") for item in kit_integrity['missing']: obj.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}")) - add_widget = ImportReagent(ctx=obj.ctx, item=item) + add_widget = ImportReagent(ctx=obj.ctx, reagent=dict(type=item, lot=None, exp=None))#item=item) obj.table_widget.formlayout.addWidget(add_widget) submit_btn = QPushButton("Submit") submit_btn.setObjectName("lot_submit_btn") @@ -208,6 +307,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow: return obj, result def submit_new_sample_function(obj:QMainWindow) -> QMainWindow: + logger.debug(f"\n\nBeginning Submission\n\n") result = None # extract info from the form widgets info = extract_form_info(obj.table_widget.tab1) @@ -219,7 +319,8 @@ def submit_new_sample_function(obj:QMainWindow) -> QMainWindow: parsed_reagents = [] # compare reagents in form to reagent database for reagent in reagents: - wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent]) + # TODO: have this lookup by type and lot + wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent) logger.debug(f"Looked up reagent: {wanted_reagent}") # if reagent not found offer to add to database if wanted_reagent == None: @@ -431,7 +532,7 @@ def chart_maker_function(obj:QMainWindow) -> QMainWindow: data = [control.convert_by_mode(mode=obj.mode) for control in controls] # flatten data to one dimensional list data = [item for sublist in data for item in sublist] - logger.debug(f"Control objects going into df conversion: {data}") + logger.debug(f"Control objects going into df conversion: {type(data)}") if data == []: return obj, dict(status="Critical", message="No data found for controls in given date range.") # send to dataframe creator diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index f013ee8..6623115 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -24,6 +24,8 @@ def check_not_nan(cell_contents) -> bool: # check for nan as a string first if cell_contents == 'nan': cell_contents = np.nan + if cell_contents == None: + cell_contents = np.nan try: return not np.isnan(cell_contents) except TypeError: @@ -81,14 +83,15 @@ def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None # What type is sub? match sub: case BasicSubmission(): - ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types] + # very hacky method to ensure interchangeable plates are not + ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1] # Overwrite function parameter reagenttypes try: reagenttypes = [reagent.type.name for reagent in sub.reagents] except AttributeError as e: logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}") case KitType(): - ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types] + ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types if reagenttype.required == 1] logger.debug(f"Kit reagents: {ext_kit_rtypes}") logger.debug(f"Submission reagents: {reagenttypes}") # check if lists are equal @@ -256,7 +259,7 @@ class RSLNamer(object): def massage_common_reagents(reagent_name:str): logger.debug(f"Attempting to massage {reagent_name}") - if reagent_name.endswith("water") or "H2O" in reagent_name: + if reagent_name.endswith("water") or "H2O" in reagent_name.upper(): reagent_name = "molecular_grade_water" reagent_name = reagent_name.replace("ยต", "u") return reagent_name