diff --git a/CHANGELOG.md b/CHANGELOG.md index b9eb88d..b2ca958 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ +## 202304.01 + +- Improved function results output to ui. +- Added Well Call Assessment to PCR scraping. + ## 202303.05 +- Increased robustness of RSL plate number enforcement. - Added in ability to scrape and include PCR results for wastewater. ## 202303.04 diff --git a/README.md b/README.md index 73a796a..2717a48 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ a. Both an excel sheet and a pdf should be generated containing summary information for submissions made by each client lab. ## Importing PCR results: + +This is meant to import .xslx files created from the Design & Analysis Software 1. Click on 'File' -> 'Import PCR Results'. 2. Use the file dialog to locate the .xlsx file you want to import. 3. Click 'Okay'. diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..51f2dff --- /dev/null +++ b/TODO.md @@ -0,0 +1 @@ +- [ ] Move bulk of functions from frontend.__init__ to frontend.functions as __init__ is getting bloated. \ No newline at end of file diff --git a/alembic/versions/00de69ad6eab_added_target_status_to_ww_samples.py b/alembic/versions/00de69ad6eab_added_target_status_to_ww_samples.py new file mode 100644 index 0000000..ef592bf --- /dev/null +++ b/alembic/versions/00de69ad6eab_added_target_status_to_ww_samples.py @@ -0,0 +1,57 @@ +"""added target status to ww samples + +Revision ID: 00de69ad6eab +Revises: 8adc85dd9b92 +Create Date: 2023-03-31 14:51:40.705301 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import sqlite + +# revision identifiers, used by Alembic. +revision = '00de69ad6eab' +down_revision = '8adc85dd9b92' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # op.drop_table('_alembic_tmp__submissions') + with op.batch_alter_table('_ww_samples', schema=None) as batch_op: + batch_op.add_column(sa.Column('n1_status', sa.String(length=32), nullable=True)) + batch_op.add_column(sa.Column('n2_status', sa.String(length=32), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('_ww_samples', schema=None) as batch_op: + batch_op.drop_column('n2_status') + batch_op.drop_column('n1_status') + + # op.create_table('_alembic_tmp__submissions', + # sa.Column('id', sa.INTEGER(), nullable=False), + # sa.Column('rsl_plate_num', sa.VARCHAR(length=32), nullable=False), + # sa.Column('submitter_plate_num', sa.VARCHAR(length=127), nullable=True), + # sa.Column('submitted_date', sa.TIMESTAMP(), nullable=True), + # sa.Column('submitting_lab_id', sa.INTEGER(), nullable=True), + # sa.Column('sample_count', sa.INTEGER(), nullable=True), + # sa.Column('extraction_kit_id', sa.INTEGER(), nullable=True), + # sa.Column('submission_type', sa.VARCHAR(length=32), nullable=True), + # sa.Column('technician', sa.VARCHAR(length=64), nullable=True), + # sa.Column('reagents_id', sa.VARCHAR(), nullable=True), + # sa.Column('extraction_info', sqlite.JSON(), nullable=True), + # sa.Column('run_cost', sa.FLOAT(), nullable=True), + # sa.Column('uploaded_by', sa.VARCHAR(length=32), nullable=True), + # sa.Column('pcr_info', sqlite.JSON(), nullable=True), + # sa.ForeignKeyConstraint(['extraction_kit_id'], ['_kits.id'], ondelete='SET NULL'), + # sa.ForeignKeyConstraint(['reagents_id'], ['_reagents.id'], ondelete='SET NULL'), + # sa.ForeignKeyConstraint(['submitting_lab_id'], ['_organizations.id'], ondelete='SET NULL'), + # sa.PrimaryKeyConstraint('id'), + # sa.UniqueConstraint('rsl_plate_num'), + # sa.UniqueConstraint('submitter_plate_num') + # ) + # ### end Alembic commands ### diff --git a/src/submissions/__init__.py b/src/submissions/__init__.py index 26d63dc..9d515b6 100644 --- a/src/submissions/__init__.py +++ b/src/submissions/__init__.py @@ -4,7 +4,7 @@ from pathlib import Path # Version of the realpython-reader package __project__ = "submissions" -__version__ = "202303.4b" +__version__ = "202304.1b" __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __copyright__ = "2022-2023, Government of Canada" @@ -20,3 +20,11 @@ class bcolors: ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' + +# Hello Landon, this is your past self here. I'm trying not to screw you over like I usually do, so I will +# set out the workflow I've imagined for creating new submission types. +# First of all, you will need to write new parsing methods in backend.excel.parser to pull information out of the submission form +# for the submission itself as well as for any samples you can pull out of that same sheet. +# Second, you will have to update the model in backend.db.models.submissions and provide a new polymorph to the BasicSubmission object. +# The BSO should hold the majority of the general info. +# You can also update any of the parsers to pull out any custom info you need, like enforcing RSL plate numbers, scraping PCR results, etc. \ No newline at end of file diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py index a8f2cb6..54fc09e 100644 --- a/src/submissions/backend/db/functions.py +++ b/src/submissions/backend/db/functions.py @@ -5,7 +5,7 @@ Convenience functions for interacting with the database. from . import models from .models.kits import reagenttypes_kittypes from .models.submissions import reagents_submissions -from .models.samples import WWSample +# from .models.samples import WWSample import pandas as pd import sqlalchemy.exc import sqlite3 @@ -18,7 +18,6 @@ from sqlalchemy.engine import Engine import json from getpass import getuser import numpy as np - import yaml from pathlib import Path @@ -42,8 +41,10 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d Returns: None|dict : object that indicates issue raised for reporting in gui """ + from tools import format_rsl_number logger.debug(f"Hello from store_submission") # Add all samples to sample table + base_submission.rsl_plate_num = format_rsl_number(base_submission.rsl_plate_num) for sample in base_submission.samples: sample.rsl_plate = base_submission logger.debug(f"Attempting to add sample: {sample.to_string()}") @@ -60,14 +61,13 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: logger.debug(f"Hit an integrity error : {e}") ctx['database_session'].rollback() - return {"message":"This plate number already exists, so we can't add it."} + return {"message":"This plate number already exists, so we can't add it.", "status":"Critical"} except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e: logger.debug(f"Hit an operational error: {e}") ctx['database_session'].rollback() - return {"message":"The database is locked for editing."} + return {"message":"The database is locked for editing.", "status":"Critical"} return None - def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict: """ Inserts a reagent into the database. @@ -87,7 +87,6 @@ def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict: return {"message":"The database is locked for editing."} return None - def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission: """ Construct submission object from dictionary @@ -99,14 +98,17 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio Returns: models.BasicSubmission: Constructed submission object """ - from tools import check_not_nan + from tools import check_regex_match, RSLNamer # convert submission type into model name query = info_dict['submission_type'].replace(" ", "") # Ensure an rsl plate number exists for the plate - if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]): + # if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]): + if not check_regex_match("^RSL", info_dict["rsl_plate_num"]): instance = None msg = "A proper RSL plate number is required." return instance, {'code': 2, 'message': "A proper RSL plate number is required."} + else: + info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name # check database for existing object instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() # get model based on submission type converted above @@ -171,7 +173,6 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio logger.debug(f"Constructed submissions message: {msg}") return instance, {'code':code, 'message':msg} - def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: """ Construct reagent object from dictionary @@ -204,7 +205,6 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: # pass return reagent - def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: """ Query db for reagent based on lot number @@ -219,7 +219,6 @@ def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() return lookedup - def get_all_reagenttype_names(ctx:dict) -> list[str]: """ Lookup all reagent types and get names @@ -233,7 +232,6 @@ def get_all_reagenttype_names(ctx:dict) -> list[str]: lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()] return lookedup - def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType: """ Lookup a single reagent type by name @@ -250,7 +248,6 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType: logger.debug(f"Found ReagentType: {lookedup}") return lookedup - def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]: """ Lookup kits by a sample type its used for @@ -264,7 +261,6 @@ def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]: """ return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() - def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType: """ Lookup a kit type by name @@ -279,7 +275,6 @@ def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType: logger.debug(f"Querying kittype: {name}") return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first() - def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]: """ Lookup reagents by their type name @@ -293,7 +288,6 @@ def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]: """ return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() - def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]: """ Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.) @@ -325,7 +319,6 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st output = rt_types.instances return output - def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]: """ Get all submissions, filtering by type if given @@ -399,7 +392,6 @@ def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame: except: logger.warning(f"Couldn't drop 'pcr_info' column from submissionsheet df.") return df - def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission: """ @@ -414,7 +406,6 @@ def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission: """ return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() - def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]: """ Lookup submissions greater than start_date and less than end_date @@ -432,7 +423,6 @@ def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_dat end_date = end_date.strftime("%Y-%m-%d") return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() - def get_all_Control_Types_names(ctx:dict) -> list[str]: """ Grabs all control type names from db. @@ -448,7 +438,6 @@ def get_all_Control_Types_names(ctx:dict) -> list[str]: logger.debug(f"Control Types: {conTypes}") return conTypes - def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: """ Create and store a new kit in the database based on a .yml file @@ -491,7 +480,6 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: ctx['database_session'].commit() return {'code':0, 'message':'Kit has been added', 'status': 'information'} - def create_org_from_yaml(ctx:dict, org:dict) -> dict: """ Create and store a new organization based on a .yml file @@ -528,7 +516,6 @@ def create_org_from_yaml(ctx:dict, org:dict) -> dict: ctx["database_session"].commit() return {"code":0, "message":"Organization has been added."} - def lookup_all_sample_types(ctx:dict) -> list[str]: """ Lookup all sample types and get names @@ -544,7 +531,6 @@ def lookup_all_sample_types(ctx:dict) -> list[str]: uses = list(set([item for sublist in uses for item in sublist])) return uses - def get_all_available_modes(ctx:dict) -> list[str]: """ Get types of analysis for controls @@ -564,7 +550,6 @@ def get_all_available_modes(ctx:dict) -> list[str]: cols = [] return cols - def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]: """ Returns a list of control objects that are instances of the input controltype. @@ -589,7 +574,6 @@ def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, logger.debug(f"Returned controls between dates: {output}") return output - def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: """ Get subtypes for a control analysis mode @@ -617,7 +601,6 @@ def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] return subtypes - def get_all_controls(ctx:dict) -> list[models.Control]: """ Retrieve a list of all controls from the database @@ -630,7 +613,6 @@ def get_all_controls(ctx:dict) -> list[models.Control]: """ return ctx['database_session'].query(models.Control).all() - def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission: """ Retrieve a submission from the database based on rsl plate number @@ -644,7 +626,6 @@ def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmissio """ return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() - def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]: """ Retrieves each submission using a specified reagent. @@ -658,7 +639,6 @@ def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[m """ return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() - def delete_submission_by_id(ctx:dict, id:int) -> None: """ Deletes a submission and its associated samples from the database. @@ -683,13 +663,12 @@ def delete_submission_by_id(ctx:dict, id:int) -> None: ctx["database_session"].delete(sub) ctx["database_session"].commit() - def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WWSample: """ - Retrieves wastewater sampel from database by rsl sample number + Retrieves wastewater sample from database by rsl sample number Args: - ctx (dict): settings passed dwon from gui + ctx (dict): settings passed down from gui rsl_number (str): sample number assigned by robotics lab Returns: @@ -697,6 +676,39 @@ def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WW """ return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first() +def lookup_ww_sample_by_sub_sample_rsl(ctx:dict, sample_rsl:str, plate_rsl:str) -> models.WWSample: + """ + Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number. + This will likely replace simply looking up by the sample rsl above cine I need to control for repeats. + + Args: + ctx (dict): settings passed down from the gui + sample_rsl (str): rsl number of the relevant sample + plate_rsl (str): rsl number of the parent plate + + Returns: + models.WWSample: Relevant wastewater object + """ + return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first() + +def lookup_ww_sample_by_sub_sample_well(ctx:dict, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WWSample: + """ + Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number. + This will likely replace simply looking up by the sample rsl above cine I need to control for repeats. + + Args: + ctx (dict): settings passed down from the gui + sample_rsl (str): rsl number of the relevant sample + well_num (str): well number of the relevant sample + plate_rsl (str): rsl number of the parent plate + + Returns: + models.WWSample: Relevant wastewater object + """ + return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission) \ + .filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \ + .filter(models.WWSample.rsl_number==sample_rsl) \ + .filter(models.WWSample.well_number==well_num).first() def update_ww_sample(ctx:dict, sample_obj:dict): """ @@ -706,7 +718,10 @@ def update_ww_sample(ctx:dict, sample_obj:dict): ctx (dict): settings passed down from gui sample_obj (dict): dictionary representing new values for database object """ - ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample']) + # ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample']) + logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}") + ww_samp = lookup_ww_sample_by_sub_sample_rsl(ctx=ctx, sample_rsl=sample_obj['sample'], plate_rsl=sample_obj['plate_rsl']) + # ww_samp = lookup_ww_sample_by_sub_sample_well(ctx=ctx, sample_rsl=sample_obj['sample'], well_num=sample_obj['well_num'], plate_rsl=sample_obj['plate_rsl']) if ww_samp != None: for key, value in sample_obj.items(): logger.debug(f"Setting {key} to {value}") diff --git a/src/submissions/backend/db/models/samples.py b/src/submissions/backend/db/models/samples.py index c458ff3..2620290 100644 --- a/src/submissions/backend/db/models/samples.py +++ b/src/submissions/backend/db/models/samples.py @@ -27,6 +27,8 @@ class WWSample(Base): notes = Column(String(2000)) ct_n1 = Column(FLOAT(2)) #: AKA ct for N1 ct_n2 = Column(FLOAT(2)) #: AKA ct for N2 + n1_status = Column(String(32)) + n2_status = Column(String(32)) seq_submitted = Column(BOOLEAN()) ww_seq_run_id = Column(String(64)) sample_type = Column(String(8)) @@ -50,7 +52,7 @@ class WWSample(Base): dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below """ if self.ct_n1 != None and self.ct_n2 != None: - name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)}, ct N2: {'{:.2f}'.format(self.ct_n1)}" + name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})" else: name = self.ww_sample_full_id return { diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 1269467..0efae25 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -2,17 +2,18 @@ contains parser object for pulling values from client generated submission sheets. ''' from getpass import getuser +from typing import Tuple import pandas as pd from pathlib import Path from backend.db.models import WWSample, BCSample -from backend.db import lookup_ww_sample_by_rsl_sample_number +# from backend.db import lookup_ww_sample_by_rsl_sample_number import logging from collections import OrderedDict import re import numpy as np from datetime import date import uuid -from tools import check_not_nan, retrieve_rsl_number +from tools import check_not_nan, RSLNamer logger = logging.getLogger(f"submissions.{__name__}") @@ -84,7 +85,7 @@ class SheetParser(object): # self.xl is a pd.ExcelFile so we need to parse it into a df submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object) self.sub['submitter_plate_num'] = submission_info.iloc[0][1] - self.sub['rsl_plate_num'] = submission_info.iloc[10][1] + self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name self.sub['submitted_date'] = submission_info.iloc[1][1] self.sub['submitting_lab'] = submission_info.iloc[0][3] self.sub['sample_count'] = submission_info.iloc[2][3] @@ -202,7 +203,7 @@ class SheetParser(object): parse_reagents(ext_reagent_range) parse_reagents(pcr_reagent_range) # parse samples - sample_parser = SampleParser(submission_info.iloc[16:40]) + sample_parser = SampleParser(submission_info.iloc[16:]) sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") self.sub['samples'] = sample_parse() self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object) @@ -260,24 +261,20 @@ class SampleParser(object): new_list = [] for sample in self.samples: new = WWSample() + if check_not_nan(sample["Unnamed: 9"]): + new.rsl_number = sample['Unnamed: 9'] + else: + logger.error(f"No RSL sample number found for this sample.") + continue new.ww_processing_num = sample['Unnamed: 2'] # need to ensure we have a sample id for database integrity - try: - not_a_nan = not np.isnan(sample['Unnamed: 3']) - except TypeError: - not_a_nan = True # if we don't have a sample full id, make one up - if not_a_nan: + if check_not_nan(sample['Unnamed: 3']): new.ww_sample_full_id = sample['Unnamed: 3'] else: new.ww_sample_full_id = uuid.uuid4().hex.upper() - new.rsl_number = sample['Unnamed: 9'] # need to ensure we get a collection date - try: - not_a_nan = not np.isnan(sample['Unnamed: 5']) - except TypeError: - not_a_nan = True - if not_a_nan: + if check_not_nan(sample['Unnamed: 5']): new.collection_date = sample['Unnamed: 5'] else: new.collection_date = date.today() @@ -317,7 +314,9 @@ class PCRParser(object): return # self.pcr = OrderedDict() self.pcr = {} - self.plate_num, self.submission_type = retrieve_rsl_number(filepath.__str__()) + namer = RSLNamer(filepath.__str__()) + self.plate_num = namer.parsed_name + self.submission_type = namer.submission_type logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") self.samples = [] parser = getattr(self, f"parse_{self.submission_type}") @@ -362,14 +361,25 @@ class PCRParser(object): Parse specific to wastewater samples. """ df = self.parse_general(sheet_name="Results") + column_names = ["Well", "Well Position", "Omit","Sample","Target","Task"," Reporter","Quencher","Amp Status","Amp Score","Curve Quality","Result Quality Issues","Cq","Cq Confidence","Cq Mean","Cq SD","Auto Threshold","Threshold", "Auto Baseline", "Baseline Start", "Baseline End"] self.samples_df = df.iloc[23:][0:] + self.samples_df.columns = column_names + logger.debug(f"Samples columns: {self.samples_df.columns}") + well_call_df = self.xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:,-1:] + try: + self.samples_df['Assessment'] = well_call_df.values + except ValueError: + logger.error("Well call number doesn't match sample number") + logger.debug(f"Well call dr: {well_call_df}") # iloc is [row][column] for ii, row in self.samples_df.iterrows(): try: sample_obj = [sample for sample in self.samples if sample['sample'] == row[3]][0] except IndexError: sample_obj = dict( - sample = row[3], + sample = row['Sample'], + plate_rsl = self.plate_num, + well_num = row['Well Position'] ) logger.debug(f"Got sample obj: {sample_obj}") # logger.debug(f"row: {row}") @@ -377,22 +387,30 @@ class PCRParser(object): # # logger.debug(f"Looking up: {rsl_num}") # ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=self.ctx, rsl_number=rsl_num) # logger.debug(f"Got: {ww_samp}") - match row[4]: - case "N1": - if isinstance(row[12], float): - sample_obj['ct_n1'] = row[12] - else: - sample_obj['ct_n1'] = 0.0 - case "N2": - if isinstance(row[12], float): - sample_obj['ct_n2'] = row[12] - else: - sample_obj['ct_n2'] = 0.0 - case _: - logger.warning(f"Unexpected input for row[4]: {row[4]}") + if isinstance(row['Cq'], float): + sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] + else: + sample_obj[f"ct_{row['Target'].lower()}"] = 0.0 + try: + sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment'] + except KeyError: + logger.error(f"No assessment for {sample_obj['sample']}") + # match row["Target"]: + # case "N1": + # if isinstance(row['Cq'], float): + # sample_obj['ct_n1'] = row["Cq"] + # else: + # sample_obj['ct_n1'] = 0.0 + # sample_obj['n1_status'] = row['Assessment'] + # case "N2": + # if isinstance(row['Cq'], float): + # sample_obj['ct_n2'] = row['Assessment'] + # else: + # sample_obj['ct_n2'] = 0.0 + # case _: + # logger.warning(f"Unexpected input for row[4]: {row["Target"]}") self.samples.append(sample_obj) - - + diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py index f58d2aa..63af3f2 100644 --- a/src/submissions/frontend/__init__.py +++ b/src/submissions/frontend/__init__.py @@ -1,47 +1,26 @@ ''' Operations for all user interactions. ''' -import inspect -import json -import re import sys from PyQt6.QtWidgets import ( - QMainWindow, QLabel, QToolBar, + QMainWindow, QToolBar, QTabWidget, QWidget, QVBoxLayout, - QPushButton, QFileDialog, - QLineEdit, QMessageBox, QComboBox, QDateEdit, QHBoxLayout, + QComboBox, QHBoxLayout, QScrollArea ) from PyQt6.QtGui import QAction -from PyQt6.QtCore import QSignalBlocker from PyQt6.QtWebEngineWidgets import QWebEngineView # import pandas as pd from pathlib import Path -import plotly -import pandas as pd -from openpyxl.utils import get_column_letter -from xhtml2pdf import pisa -# import plotly.express as px -import yaml -import pprint -from backend.excel import convert_data_list_to_df, make_report_xlsx, make_report_html, SheetParser, PCRParser -from backend.db import (construct_submission_info, lookup_reagent, - construct_reagent, store_submission, lookup_kittype_by_use, - lookup_regent_by_type_name, lookup_all_orgs, lookup_submissions_by_date_range, - get_all_Control_Types_names, create_kit_from_yaml, get_all_available_modes, get_all_controls_by_type, - get_control_subtypes, lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, - create_org_from_yaml, store_reagent, lookup_ww_sample_by_rsl_sample_number, lookup_kittype_by_name, - update_ww_sample +from backend.db import ( + construct_reagent, get_all_Control_Types_names, get_all_available_modes, store_reagent ) -from .functions import extract_form_info -from tools import check_not_nan, check_kit_integrity, check_if_app +from .main_window_functions import * +from tools import check_if_app # from backend.excel.reports import -from frontend.custom_widgets import SubmissionsSheet, AlertPop, QuestionAsker, AddReagentForm, ReportDatePicker, KitAdder, ControlsDatePicker, ImportReagent +from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker import logging -import difflib -from getpass import getuser from datetime import date -from frontend.visualizations import create_charts import webbrowser logger = logging.getLogger(f'submissions.{__name__}') @@ -156,147 +135,158 @@ class App(QMainWindow): logger.debug(f"Attempting to open {url}") webbrowser.get('windows-default').open(f"file://{url.__str__()}") + def result_reporter(self, result:dict|None=None): + if result != None: + msg = AlertPop(message=result['message'], status=result['status']) + msg.exec() def importSubmission(self): """ import submission from excel sheet into form """ - logger.debug(self.ctx) - # initialize samples - self.samples = [] - self.reagents = {} - # set file dialog - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir)[0]) - logger.debug(f"Attempting to parse file: {fname}") - assert fname.exists() - # create sheetparser using excel sheet and context from gui - try: - prsr = SheetParser(fname, **self.ctx) - except PermissionError: - logger.error(f"Couldn't get permission to access file: {fname}") - return - logger.debug(f"prsr.sub = {prsr.sub}") - # destroy any widgets from previous imports - for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): - item.setParent(None) - # regex to parser out different variable types for decision making - variable_parser = re.compile(r""" - # (?x) - (?P^extraction_kit$) | - (?P^submitted_date$) | - (?P)^submitting_lab$ | - (?P)^samples$ | - (?P^lot_.*$) | - (?P^csv$) - """, re.VERBOSE) - for item in prsr.sub: - logger.debug(f"Item: {item}") - # attempt to match variable name to regex group - try: - mo = variable_parser.fullmatch(item).lastgroup - except AttributeError: - mo = "other" - logger.debug(f"Mo: {mo}") - match mo: - case 'submitting_lab': - # create label - self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) - logger.debug(f"{item}: {prsr.sub[item]}") - # create combobox to hold looked up submitting labs - add_widget = QComboBox() - labs = [item.__str__() for item in lookup_all_orgs(ctx=self.ctx)] - # try to set closest match to top of list - try: - labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0) - except (TypeError, ValueError): - pass - # set combobox values to lookedup values - add_widget.addItems(labs) - case 'extraction_kit': - # create label - self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) - # if extraction kit not available, all other values fail - if not check_not_nan(prsr.sub[item]): - msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") - msg.exec() - # create combobox to hold looked up kits - # add_widget = KitSelector(ctx=self.ctx, submission_type=prsr.sub['submission_type'], parent=self) - add_widget = QComboBox() - # add_widget.currentTextChanged.connect(self.kit_reload) - # lookup existing kits by 'submission_type' decided on by sheetparser - uses = [item.__str__() for item in lookup_kittype_by_use(ctx=self.ctx, used_by=prsr.sub['submission_type'])] - # if len(uses) > 0: - add_widget.addItems(uses) - # else: - # add_widget.addItems(['bacterial_culture']) - if check_not_nan(prsr.sub[item]): - self.ext_kit = prsr.sub[item] - else: - self.ext_kit = add_widget.currentText() - case 'submitted_date': - # create label - self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) - # uses base calendar - add_widget = QDateEdit(calendarPopup=True) - # sets submitted date based on date found in excel sheet - try: - add_widget.setDate(prsr.sub[item]) - # if not found, use today - except: - add_widget.setDate(date.today()) - case 'reagent': - # create label - reg_label = QLabel(item.replace("_", " ").title()) - reg_label.setObjectName(f"lot_{item}_label") - self.table_widget.formlayout.addWidget(reg_label) - # create reagent choice widget - add_widget = ImportReagent(ctx=self.ctx, item=item, prsr=prsr) - self.reagents[item] = prsr.sub[item] - case 'samples': - # hold samples in 'self' until form submitted - logger.debug(f"{item}: {prsr.sub[item]}") - self.samples = prsr.sub[item] - add_widget = None - case 'csv': - self.csv = prsr.sub[item] - case _: - # anything else gets added in as a line edit - self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) - add_widget = QLineEdit() - logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}") - add_widget.setText(str(prsr.sub[item]).replace("_", " ")) - try: - add_widget.setObjectName(item) - logger.debug(f"Widget name set to: {add_widget.objectName()}") - self.table_widget.formlayout.addWidget(add_widget) - except AttributeError as e: - logger.error(e) - # compare self.reagents with expected reagents in kit - if hasattr(self, 'ext_kit'): - self.kit_integrity_completion() - # create submission button - # submit_btn = QPushButton("Submit") - # submit_btn.setObjectName("submit_btn") - # self.table_widget.formlayout.addWidget(submit_btn) - # submit_btn.clicked.connect(self.submit_new_sample) - logger.debug(f"Imported reagents: {self.reagents}") + self, result = import_submission_function(self) + logger.debug(f"Import result: {result}") + self.result_reporter(result) + # logger.debug(self.ctx) + # # initialize samples + # self.samples = [] + # self.reagents = {} + # # set file dialog + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir)[0]) + # logger.debug(f"Attempting to parse file: {fname}") + # assert fname.exists() + # # create sheetparser using excel sheet and context from gui + # try: + # prsr = SheetParser(fname, **self.ctx) + # except PermissionError: + # logger.error(f"Couldn't get permission to access file: {fname}") + # return + # if prsr.sub['rsl_plate_num'] == None: + # prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name + # logger.debug(f"prsr.sub = {prsr.sub}") + # # destroy any widgets from previous imports + # for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): + # item.setParent(None) + # # regex to parser out different variable types for decision making + # variable_parser = re.compile(r""" + # # (?x) + # (?P^extraction_kit$) | + # (?P^submitted_date$) | + # (?P)^submitting_lab$ | + # (?P)^samples$ | + # (?P^lot_.*$) | + # (?P^csv$) + # """, re.VERBOSE) + # for item in prsr.sub: + # logger.debug(f"Item: {item}") + # # attempt to match variable name to regex group + # try: + # mo = variable_parser.fullmatch(item).lastgroup + # except AttributeError: + # mo = "other" + # logger.debug(f"Mo: {mo}") + # match mo: + # case 'submitting_lab': + # # create label + # self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # logger.debug(f"{item}: {prsr.sub[item]}") + # # create combobox to hold looked up submitting labs + # add_widget = QComboBox() + # labs = [item.__str__() for item in lookup_all_orgs(ctx=self.ctx)] + # # try to set closest match to top of list + # try: + # labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0) + # except (TypeError, ValueError): + # pass + # # set combobox values to lookedup values + # add_widget.addItems(labs) + # case 'extraction_kit': + # # create label + # self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # # if extraction kit not available, all other values fail + # if not check_not_nan(prsr.sub[item]): + # msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") + # msg.exec() + # # create combobox to hold looked up kits + # # add_widget = KitSelector(ctx=self.ctx, submission_type=prsr.sub['submission_type'], parent=self) + # add_widget = QComboBox() + # # add_widget.currentTextChanged.connect(self.kit_reload) + # # lookup existing kits by 'submission_type' decided on by sheetparser + # uses = [item.__str__() for item in lookup_kittype_by_use(ctx=self.ctx, used_by=prsr.sub['submission_type'])] + # # if len(uses) > 0: + # add_widget.addItems(uses) + # # else: + # # add_widget.addItems(['bacterial_culture']) + # if check_not_nan(prsr.sub[item]): + # self.ext_kit = prsr.sub[item] + # else: + # self.ext_kit = add_widget.currentText() + # case 'submitted_date': + # # create label + # self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # # uses base calendar + # add_widget = QDateEdit(calendarPopup=True) + # # sets submitted date based on date found in excel sheet + # try: + # add_widget.setDate(prsr.sub[item]) + # # if not found, use today + # except: + # add_widget.setDate(date.today()) + # case 'reagent': + # # create label + # reg_label = QLabel(item.replace("_", " ").title()) + # reg_label.setObjectName(f"lot_{item}_label") + # self.table_widget.formlayout.addWidget(reg_label) + # # create reagent choice widget + # add_widget = ImportReagent(ctx=self.ctx, item=item, prsr=prsr) + # self.reagents[item] = prsr.sub[item] + # case 'samples': + # # hold samples in 'self' until form submitted + # logger.debug(f"{item}: {prsr.sub[item]}") + # self.samples = prsr.sub[item] + # add_widget = None + # case 'csv': + # self.csv = prsr.sub[item] + # case _: + # # anything else gets added in as a line edit + # self.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # add_widget = QLineEdit() + # logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}") + # add_widget.setText(str(prsr.sub[item]).replace("_", " ")) + # try: + # add_widget.setObjectName(item) + # logger.debug(f"Widget name set to: {add_widget.objectName()}") + # self.table_widget.formlayout.addWidget(add_widget) + # except AttributeError as e: + # logger.error(e) + # # compare self.reagents with expected reagents in kit + # if hasattr(self, 'ext_kit'): + # self.kit_integrity_completion() + # # create submission button + # # submit_btn = QPushButton("Submit") + # # submit_btn.setObjectName("submit_btn") + # # self.table_widget.formlayout.addWidget(submit_btn) + # # submit_btn.clicked.connect(self.submit_new_sample) + # logger.debug(f"Imported reagents: {self.reagents}") def kit_reload(self): """ Removes all reagents from form before running kit integrity completion. """ - for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): - # item.setParent(None) - if isinstance(item, QLabel): - if item.text().startswith("Lot"): - item.setParent(None) - else: - logger.debug(f"Type of {item.objectName()} is {type(item)}") - if item.objectName().startswith("lot_"): - item.setParent(None) - self.kit_integrity_completion() + self, result = kit_reload_function(self) + self.result_reporter(result) + # for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): + # # item.setParent(None) + # if isinstance(item, QLabel): + # if item.text().startswith("Lot"): + # item.setParent(None) + # else: + # logger.debug(f"Type of {item.objectName()} is {type(item)}") + # if item.objectName().startswith("lot_"): + # item.setParent(None) + # self.kit_integrity_completion() def kit_integrity_completion(self): @@ -305,107 +295,111 @@ class App(QMainWindow): NOTE: this will not change self.reagents which should be fine since it's only used when looking up """ - logger.debug(inspect.currentframe().f_back.f_code.co_name) - kit_widget = self.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit') - logger.debug(f"Kit selector: {kit_widget}") - self.ext_kit = kit_widget.currentText() - logger.debug(f"Checking integrity of {self.ext_kit}") - kit = lookup_kittype_by_name(ctx=self.ctx, name=self.ext_kit) - reagents_to_lookup = [item.replace("lot_", "") for item in self.reagents] - logger.debug(f"Reagents for lookup for {kit.name}: {reagents_to_lookup}") - kit_integrity = check_kit_integrity(kit, reagents_to_lookup) - if kit_integrity != None: - msg = AlertPop(message=kit_integrity['message'], status="critical") - msg.exec() - for item in kit_integrity['missing']: - self.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}")) - add_widget = ImportReagent(ctx=self.ctx, item=item) - self.table_widget.formlayout.addWidget(add_widget) - submit_btn = QPushButton("Submit") - submit_btn.setObjectName("lot_submit_btn") - self.table_widget.formlayout.addWidget(submit_btn) - submit_btn.clicked.connect(self.submit_new_sample) + self, result = kit_integrity_completion_function(self) + self.result_reporter(result) + # logger.debug(inspect.currentframe().f_back.f_code.co_name) + # kit_widget = self.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit') + # logger.debug(f"Kit selector: {kit_widget}") + # self.ext_kit = kit_widget.currentText() + # logger.debug(f"Checking integrity of {self.ext_kit}") + # kit = lookup_kittype_by_name(ctx=self.ctx, name=self.ext_kit) + # reagents_to_lookup = [item.replace("lot_", "") for item in self.reagents] + # logger.debug(f"Reagents for lookup for {kit.name}: {reagents_to_lookup}") + # kit_integrity = check_kit_integrity(kit, reagents_to_lookup) + # if kit_integrity != None: + # msg = AlertPop(message=kit_integrity['message'], status="critical") + # msg.exec() + # for item in kit_integrity['missing']: + # self.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}")) + # add_widget = ImportReagent(ctx=self.ctx, item=item) + # self.table_widget.formlayout.addWidget(add_widget) + # submit_btn = QPushButton("Submit") + # submit_btn.setObjectName("lot_submit_btn") + # self.table_widget.formlayout.addWidget(submit_btn) + # submit_btn.clicked.connect(self.submit_new_sample) def submit_new_sample(self): """ Attempt to add sample to database when 'submit' button clicked """ + self, result = submit_new_sample_function(self) + self.result_reporter(result) # get info from form - info = extract_form_info(self.table_widget.tab1) - reagents = {k:v for k,v in info.items() if k.startswith("lot_")} - info = {k:v for k,v in info.items() if not k.startswith("lot_")} - logger.debug(f"Info: {info}") - logger.debug(f"Reagents: {reagents}") - parsed_reagents = [] - # compare reagents in form to reagent database - for reagent in reagents: - wanted_reagent = lookup_reagent(ctx=self.ctx, reagent_lot=reagents[reagent]) - logger.debug(f"Looked up reagent: {wanted_reagent}") - # if reagent not found offer to add to database - if wanted_reagent == None: - r_lot = reagents[reagent] - dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?") - if dlg.exec(): - logger.debug(f"checking reagent: {reagent} in self.reagents. Result: {self.reagents[reagent]}") - expiry_date = self.reagents[reagent]['exp'] - wanted_reagent = self.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date) - else: - # In this case we will have an empty reagent and the submission will fail kit integrity check - logger.debug("Will not add reagent.") - if wanted_reagent != None: - parsed_reagents.append(wanted_reagent) - # move samples into preliminary submission dict - info['samples'] = self.samples - info['uploaded_by'] = getuser() - # construct submission object - logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") - base_submission, result = construct_submission_info(ctx=self.ctx, info_dict=info) - # check output message for issues - match result['code']: - # code 1: ask for overwrite - case 1: - dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message']) - if dlg.exec(): - base_submission.reagents = [] - else: - return - # code 2: No RSL plate number given - case 2: - dlg = AlertPop(message=result['message'], status='critical') - dlg.exec() - return - case _: - pass - # add reagents to submission object - for reagent in parsed_reagents: - base_submission.reagents.append(reagent) - logger.debug("Checking kit integrity...") - kit_integrity = check_kit_integrity(base_submission) - if kit_integrity != None: - msg = AlertPop(message=kit_integrity['message'], status="critical") - msg.exec() - return - logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") - result = store_submission(ctx=self.ctx, base_submission=base_submission) - # check result of storing for issues - if result != None: - msg = AlertPop(result['message']) - msg.exec() - # update summary sheet - self.table_widget.sub_wid.setData() - # reset form - for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): - item.setParent(None) - if hasattr(self, 'csv'): - dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") - if dlg.exec(): - home_dir = Path(self.ctx["directory_path"]).joinpath(f"{base_submission.rsl_plate_num}.csv").resolve().__str__() - fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".csv")[0]) - try: - self.csv.to_csv(fname.__str__(), index=False) - except PermissionError: - logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.") + # info = extract_form_info(self.table_widget.tab1) + # reagents = {k:v for k,v in info.items() if k.startswith("lot_")} + # info = {k:v for k,v in info.items() if not k.startswith("lot_")} + # logger.debug(f"Info: {info}") + # logger.debug(f"Reagents: {reagents}") + # parsed_reagents = [] + # # compare reagents in form to reagent database + # for reagent in reagents: + # wanted_reagent = lookup_reagent(ctx=self.ctx, reagent_lot=reagents[reagent]) + # logger.debug(f"Looked up reagent: {wanted_reagent}") + # # if reagent not found offer to add to database + # if wanted_reagent == None: + # r_lot = reagents[reagent] + # dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?") + # if dlg.exec(): + # logger.debug(f"checking reagent: {reagent} in self.reagents. Result: {self.reagents[reagent]}") + # expiry_date = self.reagents[reagent]['exp'] + # wanted_reagent = self.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date) + # else: + # # In this case we will have an empty reagent and the submission will fail kit integrity check + # logger.debug("Will not add reagent.") + # if wanted_reagent != None: + # parsed_reagents.append(wanted_reagent) + # # move samples into preliminary submission dict + # info['samples'] = self.samples + # info['uploaded_by'] = getuser() + # # construct submission object + # logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") + # base_submission, result = construct_submission_info(ctx=self.ctx, info_dict=info) + # # check output message for issues + # match result['code']: + # # code 1: ask for overwrite + # case 1: + # dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message']) + # if dlg.exec(): + # base_submission.reagents = [] + # else: + # return + # # code 2: No RSL plate number given + # case 2: + # dlg = AlertPop(message=result['message'], status='critical') + # dlg.exec() + # return + # case _: + # pass + # # add reagents to submission object + # for reagent in parsed_reagents: + # base_submission.reagents.append(reagent) + # logger.debug("Checking kit integrity...") + # kit_integrity = check_kit_integrity(base_submission) + # if kit_integrity != None: + # msg = AlertPop(message=kit_integrity['message'], status="critical") + # msg.exec() + # return + # logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") + # result = store_submission(ctx=self.ctx, base_submission=base_submission) + # # check result of storing for issues + # if result != None: + # msg = AlertPop(result['message']) + # msg.exec() + # # update summary sheet + # self.table_widget.sub_wid.setData() + # # reset form + # for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget): + # item.setParent(None) + # if hasattr(self, 'csv'): + # dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") + # if dlg.exec(): + # home_dir = Path(self.ctx["directory_path"]).joinpath(f"{base_submission.rsl_plate_num}.csv").resolve().__str__() + # fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".csv")[0]) + # try: + # self.csv.to_csv(fname.__str__(), index=False) + # except PermissionError: + # logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.") def add_reagent(self, reagent_lot:str|None=None, reagent_type:str|None=None, expiry:date|None=None): @@ -440,382 +434,407 @@ class App(QMainWindow): """ Action to create a summary of sheet data per client """ + self, result = generate_report_function(self) + self.result_reporter(result) # Custom two date picker for start & end dates - dlg = ReportDatePicker() - if dlg.exec(): - info = extract_form_info(dlg) - logger.debug(f"Report info: {info}") - # find submissions based on date range - subs = lookup_submissions_by_date_range(ctx=self.ctx, start_date=info['start_date'], end_date=info['end_date']) - # convert each object to dict - records = [item.report_dict() for item in subs] - # make dataframe from record dictionaries - df = make_report_xlsx(records=records) - html = make_report_html(df=df, start_date=info['start_date'], end_date=info['end_date']) - # setup filedialog to handle save location of report - home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__() - fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0]) - # logger.debug(f"report output name: {fname}") - with open(fname, "w+b") as f: - pisa.CreatePDF(html, dest=f) - writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl') - df.to_excel(writer, sheet_name="Report") - worksheet = writer.sheets['Report'] - for idx, col in enumerate(df): # loop through all columns - series = df[col] - max_len = max(( - series.astype(str).map(len).max(), # len of largest item - len(str(series.name)) # len of column name/header - )) + 20 # adding a little extra space - try: - worksheet.column_dimensions[get_column_letter(idx)].width = max_len - except ValueError: - pass - for cell in worksheet['D']: - if cell.row > 1: - cell.style = 'Currency' - writer.close() + # dlg = ReportDatePicker() + # if dlg.exec(): + # info = extract_form_info(dlg) + # logger.debug(f"Report info: {info}") + # # find submissions based on date range + # subs = lookup_submissions_by_date_range(ctx=self.ctx, start_date=info['start_date'], end_date=info['end_date']) + # # convert each object to dict + # records = [item.report_dict() for item in subs] + # # make dataframe from record dictionaries + # df = make_report_xlsx(records=records) + # html = make_report_html(df=df, start_date=info['start_date'], end_date=info['end_date']) + # # setup filedialog to handle save location of report + # home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__() + # fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0]) + # # logger.debug(f"report output name: {fname}") + # with open(fname, "w+b") as f: + # pisa.CreatePDF(html, dest=f) + # writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl') + # df.to_excel(writer, sheet_name="Report") + # worksheet = writer.sheets['Report'] + # for idx, col in enumerate(df): # loop through all columns + # series = df[col] + # max_len = max(( + # series.astype(str).map(len).max(), # len of largest item + # len(str(series.name)) # len of column name/header + # )) + 20 # adding a little extra space + # try: + # worksheet.column_dimensions[get_column_letter(idx)].width = max_len + # except ValueError: + # pass + # for cell in worksheet['D']: + # if cell.row > 1: + # cell.style = 'Currency' + # writer.close() def add_kit(self): """ Constructs new kit from yaml and adds to DB. """ + self, result = add_kit_function(self) + self.result_reporter(result) # setup file dialog to find yaml flie - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0]) - assert fname.exists() - # read yaml file - try: - with open(fname.__str__(), "r") as stream: - try: - exp = yaml.load(stream, Loader=yaml.Loader) - except yaml.YAMLError as exc: - logger.error(f'Error reading yaml file {fname}: {exc}') - return {} - except PermissionError: - return - # send to kit creator function - result = create_kit_from_yaml(ctx=self.ctx, exp=exp) - match result['code']: - case 0: - msg = AlertPop(message=result['message'], status='info') - case 1: - msg = AlertPop(message=result['message'], status='critical') - msg.exec() + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0]) + # assert fname.exists() + # # read yaml file + # try: + # with open(fname.__str__(), "r") as stream: + # try: + # exp = yaml.load(stream, Loader=yaml.Loader) + # except yaml.YAMLError as exc: + # logger.error(f'Error reading yaml file {fname}: {exc}') + # return {} + # except PermissionError: + # return + # # send to kit creator function + # result = create_kit_from_yaml(ctx=self.ctx, exp=exp) + # match result['code']: + # case 0: + # msg = AlertPop(message=result['message'], status='info') + # case 1: + # msg = AlertPop(message=result['message'], status='critical') + # msg.exec() def add_org(self): """ Constructs new kit from yaml and adds to DB. """ - # setup file dialog to find yaml flie - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0]) - assert fname.exists() - # read yaml file - try: - with open(fname.__str__(), "r") as stream: - try: - org = yaml.load(stream, Loader=yaml.Loader) - except yaml.YAMLError as exc: - logger.error(f'Error reading yaml file {fname}: {exc}') - return {} - except PermissionError: - return - # send to kit creator function - result = create_org_from_yaml(ctx=self.ctx, org=org) - match result['code']: - case 0: - msg = AlertPop(message=result['message'], status='information') - case 1: - msg = AlertPop(message=result['message'], status='critical') - msg.exec() + self, result = add_org_function(self) + self.result_reporter(result) + # # setup file dialog to find yaml flie + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0]) + # assert fname.exists() + # # read yaml file + # try: + # with open(fname.__str__(), "r") as stream: + # try: + # org = yaml.load(stream, Loader=yaml.Loader) + # except yaml.YAMLError as exc: + # logger.error(f'Error reading yaml file {fname}: {exc}') + # return {} + # except PermissionError: + # return + # # send to kit creator function + # result = create_org_from_yaml(ctx=self.ctx, org=org) + # match result['code']: + # case 0: + # msg = AlertPop(message=result['message'], status='information') + # case 1: + # msg = AlertPop(message=result['message'], status='critical') + # msg.exec() def _controls_getter(self): """ Lookup controls from database and send to chartmaker - """ - # subtype defaults to disabled - try: - self.table_widget.sub_typer.disconnect() - except TypeError: - pass - # correct start date being more recent than end date and rerun - if self.table_widget.datepicker.start_date.date() > self.table_widget.datepicker.end_date.date(): - logger.warning("Start date after end date is not allowed!") - threemonthsago = self.table_widget.datepicker.end_date.date().addDays(-60) - # block signal that will rerun controls getter and set start date - with QSignalBlocker(self.table_widget.datepicker.start_date) as blocker: - self.table_widget.datepicker.start_date.setDate(threemonthsago) - self._controls_getter() - return - # convert to python useable date object - self.start_date = self.table_widget.datepicker.start_date.date().toPyDate() - self.end_date = self.table_widget.datepicker.end_date.date().toPyDate() - self.con_type = self.table_widget.control_typer.currentText() - self.mode = self.table_widget.mode_typer.currentText() - self.table_widget.sub_typer.clear() - # lookup subtypes - sub_types = get_control_subtypes(ctx=self.ctx, type=self.con_type, mode=self.mode) - if sub_types != []: - # block signal that will rerun controls getter and update sub_typer - with QSignalBlocker(self.table_widget.sub_typer) as blocker: - self.table_widget.sub_typer.addItems(sub_types) - self.table_widget.sub_typer.setEnabled(True) - self.table_widget.sub_typer.currentTextChanged.connect(self._chart_maker) - else: - self.table_widget.sub_typer.clear() - self.table_widget.sub_typer.setEnabled(False) - self._chart_maker() + """ + self, result = controls_getter_function(self) + self.result_reporter(result) + # # subtype defaults to disabled + # try: + # self.table_widget.sub_typer.disconnect() + # except TypeError: + # pass + # # correct start date being more recent than end date and rerun + # if self.table_widget.datepicker.start_date.date() > self.table_widget.datepicker.end_date.date(): + # logger.warning("Start date after end date is not allowed!") + # threemonthsago = self.table_widget.datepicker.end_date.date().addDays(-60) + # # block signal that will rerun controls getter and set start date + # with QSignalBlocker(self.table_widget.datepicker.start_date) as blocker: + # self.table_widget.datepicker.start_date.setDate(threemonthsago) + # self._controls_getter() + # return + # # convert to python useable date object + # self.start_date = self.table_widget.datepicker.start_date.date().toPyDate() + # self.end_date = self.table_widget.datepicker.end_date.date().toPyDate() + # self.con_type = self.table_widget.control_typer.currentText() + # self.mode = self.table_widget.mode_typer.currentText() + # self.table_widget.sub_typer.clear() + # # lookup subtypes + # sub_types = get_control_subtypes(ctx=self.ctx, type=self.con_type, mode=self.mode) + # if sub_types != []: + # # block signal that will rerun controls getter and update sub_typer + # with QSignalBlocker(self.table_widget.sub_typer) as blocker: + # self.table_widget.sub_typer.addItems(sub_types) + # self.table_widget.sub_typer.setEnabled(True) + # self.table_widget.sub_typer.currentTextChanged.connect(self._chart_maker) + # else: + # self.table_widget.sub_typer.clear() + # self.table_widget.sub_typer.setEnabled(False) + # self._chart_maker() def _chart_maker(self): """ Creates plotly charts for webview - """ - logger.debug(f"Control getter context: \n\tControl type: {self.con_type}\n\tMode: {self.mode}\n\tStart Date: {self.start_date}\n\tEnd Date: {self.end_date}") - if self.table_widget.sub_typer.currentText() == "": - self.subtype = None - else: - self.subtype = self.table_widget.sub_typer.currentText() - logger.debug(f"Subtype: {self.subtype}") - # query all controls using the type/start and end dates from the gui - controls = get_all_controls_by_type(ctx=self.ctx, con_type=self.con_type, start_date=self.start_date, end_date=self.end_date) - # if no data found from query set fig to none for reporting in webview - if controls == None: - fig = None - else: - # change each control to list of dicts - data = [control.convert_by_mode(mode=self.mode) for control in controls] - # flatten data to one dimensional list - data = [item for sublist in data for item in sublist] - # send to dataframe creator - df = convert_data_list_to_df(ctx=self.ctx, input=data, subtype=self.subtype) - if self.subtype == None: - title = self.mode - else: - title = f"{self.mode} - {self.subtype}" - # send dataframe to chart maker - fig = create_charts(ctx=self.ctx, df=df, ytitle=title) - logger.debug(f"Updating figure...") - # construct html for webview - html = '' - if fig != None: - html += plotly.offline.plot(fig, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image') - else: - html += "

No data was retrieved for the given parameters.

" - html += '' - self.table_widget.webengineview.setHtml(html) - self.table_widget.webengineview.update() - logger.debug("Figure updated... I hope.") + """ + self, result = chart_maker_function(self) + self.result_reporter(result) + # logger.debug(f"Control getter context: \n\tControl type: {self.con_type}\n\tMode: {self.mode}\n\tStart Date: {self.start_date}\n\tEnd Date: {self.end_date}") + # if self.table_widget.sub_typer.currentText() == "": + # self.subtype = None + # else: + # self.subtype = self.table_widget.sub_typer.currentText() + # logger.debug(f"Subtype: {self.subtype}") + # # query all controls using the type/start and end dates from the gui + # controls = get_all_controls_by_type(ctx=self.ctx, con_type=self.con_type, start_date=self.start_date, end_date=self.end_date) + # # if no data found from query set fig to none for reporting in webview + # if controls == None: + # fig = None + # else: + # # change each control to list of dicts + # data = [control.convert_by_mode(mode=self.mode) for control in controls] + # # flatten data to one dimensional list + # data = [item for sublist in data for item in sublist] + # # send to dataframe creator + # df = convert_data_list_to_df(ctx=self.ctx, input=data, subtype=self.subtype) + # if self.subtype == None: + # title = self.mode + # else: + # title = f"{self.mode} - {self.subtype}" + # # send dataframe to chart maker + # fig = create_charts(ctx=self.ctx, df=df, ytitle=title) + # logger.debug(f"Updating figure...") + # # construct html for webview + # html = '' + # if fig != None: + # html += plotly.offline.plot(fig, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image') + # else: + # html += "

No data was retrieved for the given parameters.

" + # html += '' + # self.table_widget.webengineview.setHtml(html) + # self.table_widget.webengineview.update() + # logger.debug("Figure updated... I hope.") def linkControls(self): """ Adds controls pulled from irida to relevant submissions - """ - all_bcs = lookup_all_submissions_by_type(self.ctx, "Bacterial Culture") - logger.debug(all_bcs) - all_controls = get_all_controls(self.ctx) - ac_list = [control.name for control in all_controls] - count = 0 - for bcs in all_bcs: - logger.debug(f"Running for {bcs.rsl_plate_num}") - logger.debug(f"Here is the current control: {[control.name for control in bcs.controls]}") - samples = [sample.sample_id for sample in bcs.samples] - logger.debug(bcs.controls) - for sample in samples: - # replace below is a stopgap method because some dingus decided to add spaces in some of the ATCC49... so it looks like "ATCC 49"... - if " " in sample: - logger.warning(f"There is not supposed to be a space in the sample name!!!") - sample = sample.replace(" ", "") - # if sample not in ac_list: - if not any([ac.startswith(sample) for ac in ac_list]): - continue - else: - for control in all_controls: - diff = difflib.SequenceMatcher(a=sample, b=control.name).ratio() - if control.name.startswith(sample): - logger.debug(f"Checking {sample} against {control.name}... {diff}") - logger.debug(f"Found match:\n\tSample: {sample}\n\tControl: {control.name}\n\tDifference: {diff}") - if control in bcs.controls: - logger.debug(f"{control.name} already in {bcs.rsl_plate_num}, skipping") - continue - else: - logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control") - bcs.controls.append(control) - # bcs.control_id.append(control.id) - control.submission = bcs - control.submission_id = bcs.id - self.ctx["database_session"].add(control) - count += 1 - self.ctx["database_session"].add(bcs) - logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}") - result = f"We added {count} controls to bacterial cultures." - logger.debug(result) - self.ctx['database_session'].commit() - msg = QMessageBox() - msg.setText("Controls added") - msg.setInformativeText(result) - msg.setWindowTitle("Controls added") - msg.exec() + """ + self, result = link_controls_function(self) + self.result_reporter(result) + # all_bcs = lookup_all_submissions_by_type(self.ctx, "Bacterial Culture") + # logger.debug(all_bcs) + # all_controls = get_all_controls(self.ctx) + # ac_list = [control.name for control in all_controls] + # count = 0 + # for bcs in all_bcs: + # logger.debug(f"Running for {bcs.rsl_plate_num}") + # logger.debug(f"Here is the current control: {[control.name for control in bcs.controls]}") + # samples = [sample.sample_id for sample in bcs.samples] + # logger.debug(bcs.controls) + # for sample in samples: + # # replace below is a stopgap method because some dingus decided to add spaces in some of the ATCC49... so it looks like "ATCC 49"... + # if " " in sample: + # logger.warning(f"There is not supposed to be a space in the sample name!!!") + # sample = sample.replace(" ", "") + # # if sample not in ac_list: + # if not any([ac.startswith(sample) for ac in ac_list]): + # continue + # else: + # for control in all_controls: + # diff = difflib.SequenceMatcher(a=sample, b=control.name).ratio() + # if control.name.startswith(sample): + # logger.debug(f"Checking {sample} against {control.name}... {diff}") + # logger.debug(f"Found match:\n\tSample: {sample}\n\tControl: {control.name}\n\tDifference: {diff}") + # if control in bcs.controls: + # logger.debug(f"{control.name} already in {bcs.rsl_plate_num}, skipping") + # continue + # else: + # logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control") + # bcs.controls.append(control) + # # bcs.control_id.append(control.id) + # control.submission = bcs + # control.submission_id = bcs.id + # self.ctx["database_session"].add(control) + # count += 1 + # self.ctx["database_session"].add(bcs) + # logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}") + # result = f"We added {count} controls to bacterial cultures." + # logger.debug(result) + # self.ctx['database_session'].commit() + # msg = QMessageBox() + # msg.setText("Controls added") + # msg.setInformativeText(result) + # msg.setWindowTitle("Controls added") + # msg.exec() def linkExtractions(self): """ Links extraction logs from .csv files to relevant submissions. - """ - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0]) - with open(fname.__str__(), 'r') as f: - runs = [col.strip().split(",") for col in f.readlines()] - count = 0 - for run in runs: - obj = dict( - start_time=run[0].strip(), - rsl_plate_num=run[1].strip(), - sample_count=run[2].strip(), - status=run[3].strip(), - experiment_name=run[4].strip(), - end_time=run[5].strip() - ) - for ii in range(6, len(run)): - obj[f"column{str(ii-5)}_vol"] = run[ii] - sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num']) - try: - logger.debug(f"Found submission: {sub.rsl_plate_num}") - count += 1 - except AttributeError: - continue - if sub.extraction_info != None: - existing = json.loads(sub.extraction_info) - else: - existing = None - try: - if json.dumps(obj) in sub.extraction_info: - logger.debug(f"Looks like we already have that info.") - continue - except TypeError: - pass - if existing != None: - try: - logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}") - existing.append(obj) - logger.debug(f"Setting: {existing}") - sub.extraction_info = json.dumps(existing) - except TypeError: - logger.error(f"Error updating!") - sub.extraction_info = json.dumps([obj]) - logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}") - else: - sub.extraction_info = json.dumps([obj]) - self.ctx['database_session'].add(sub) - self.ctx["database_session"].commit() - dlg = AlertPop(message=f"We added {count} logs to the database.", status='information') - dlg.exec() + """ + self, result = link_extractions_function(self) + self.result_reporter(result) + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0]) + # with open(fname.__str__(), 'r') as f: + # runs = [col.strip().split(",") for col in f.readlines()] + # count = 0 + # for run in runs: + # obj = dict( + # start_time=run[0].strip(), + # rsl_plate_num=run[1].strip(), + # sample_count=run[2].strip(), + # status=run[3].strip(), + # experiment_name=run[4].strip(), + # end_time=run[5].strip() + # ) + # for ii in range(6, len(run)): + # obj[f"column{str(ii-5)}_vol"] = run[ii] + # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num']) + # try: + # logger.debug(f"Found submission: {sub.rsl_plate_num}") + # count += 1 + # except AttributeError: + # continue + # if sub.extraction_info != None: + # existing = json.loads(sub.extraction_info) + # else: + # existing = None + # try: + # if json.dumps(obj) in sub.extraction_info: + # logger.debug(f"Looks like we already have that info.") + # continue + # except TypeError: + # pass + # if existing != None: + # try: + # logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}") + # existing.append(obj) + # logger.debug(f"Setting: {existing}") + # sub.extraction_info = json.dumps(existing) + # except TypeError: + # logger.error(f"Error updating!") + # sub.extraction_info = json.dumps([obj]) + # logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}") + # else: + # sub.extraction_info = json.dumps([obj]) + # self.ctx['database_session'].add(sub) + # self.ctx["database_session"].commit() + # dlg = AlertPop(message=f"We added {count} logs to the database.", status='information') + # dlg.exec() def linkPCR(self): """ Links PCR logs from .csv files to relevant submissions. """ - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0]) - with open(fname.__str__(), 'r') as f: - runs = [col.strip().split(",") for col in f.readlines()] - count = 0 - for run in runs: - obj = dict( - start_time=run[0].strip(), - rsl_plate_num=run[1].strip(), - biomek_status=run[2].strip(), - quant_status=run[3].strip(), - experiment_name=run[4].strip(), - end_time=run[5].strip() - ) - # for ii in range(6, len(run)): - # obj[f"column{str(ii-5)}_vol"] = run[ii] - sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num']) - try: - logger.debug(f"Found submission: {sub.rsl_plate_num}") - except AttributeError: - continue - if hasattr(sub, 'pcr_info') and sub.pcr_info != None: - existing = json.loads(sub.pcr_info) - else: - existing = None - try: - if json.dumps(obj) in sub.pcr_info: - logger.debug(f"Looks like we already have that info.") - continue - else: - count += 1 - except TypeError: - logger.error(f"No json to dump") - if existing != None: - try: - logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}") - existing.append(obj) - logger.debug(f"Setting: {existing}") - sub.pcr_info = json.dumps(existing) - except TypeError: - logger.error(f"Error updating!") - sub.pcr_info = json.dumps([obj]) - logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}") - else: - sub.pcr_info = json.dumps([obj]) - self.ctx['database_session'].add(sub) - self.ctx["database_session"].commit() - dlg = AlertPop(message=f"We added {count} logs to the database.", status='information') - dlg.exec() + self, result = link_pcr_function(self) + self.result_reporter(result) + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0]) + # with open(fname.__str__(), 'r') as f: + # runs = [col.strip().split(",") for col in f.readlines()] + # count = 0 + # for run in runs: + # obj = dict( + # start_time=run[0].strip(), + # rsl_plate_num=run[1].strip(), + # biomek_status=run[2].strip(), + # quant_status=run[3].strip(), + # experiment_name=run[4].strip(), + # end_time=run[5].strip() + # ) + # # for ii in range(6, len(run)): + # # obj[f"column{str(ii-5)}_vol"] = run[ii] + # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num']) + # try: + # logger.debug(f"Found submission: {sub.rsl_plate_num}") + # except AttributeError: + # continue + # if hasattr(sub, 'pcr_info') and sub.pcr_info != None: + # existing = json.loads(sub.pcr_info) + # else: + # existing = None + # try: + # if json.dumps(obj) in sub.pcr_info: + # logger.debug(f"Looks like we already have that info.") + # continue + # else: + # count += 1 + # except TypeError: + # logger.error(f"No json to dump") + # if existing != None: + # try: + # logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}") + # existing.append(obj) + # logger.debug(f"Setting: {existing}") + # sub.pcr_info = json.dumps(existing) + # except TypeError: + # logger.error(f"Error updating!") + # sub.pcr_info = json.dumps([obj]) + # logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}") + # else: + # sub.pcr_info = json.dumps([obj]) + # self.ctx['database_session'].add(sub) + # self.ctx["database_session"].commit() + # dlg = AlertPop(message=f"We added {count} logs to the database.", status='information') + # dlg.exec() def importPCRResults(self): """ Imports results exported from Design and Analysis .eds files """ - home_dir = str(Path(self.ctx["directory_path"])) - fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0]) - parser = PCRParser(ctx=self.ctx, filepath=fname) - logger.debug(f"Attempting lookup for {parser.plate_num}") - sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=parser.plate_num) - try: - logger.debug(f"Found submission: {sub.rsl_plate_num}") - except AttributeError: - logger.error(f"Submission of number {parser.plate_num} not found.") - return - # jout = json.dumps(parser.pcr) - count = 0 - if hasattr(sub, 'pcr_info') and sub.pcr_info != None: - existing = json.loads(sub.pcr_info) - else: - # jout = None - existing = None - if existing != None: - try: - logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}") - if json.dumps(parser.pcr) not in sub.pcr_info: - existing.append(parser.pcr) - logger.debug(f"Setting: {existing}") - sub.pcr_info = json.dumps(existing) - except TypeError: - logger.error(f"Error updating!") - sub.pcr_info = json.dumps([parser.pcr]) - logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}") - else: - sub.pcr_info = json.dumps([parser.pcr]) - self.ctx['database_session'].add(sub) - logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}") - logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}") - self.ctx["database_session"].commit() - logger.debug(f"Got {len(parser.samples)} to update!") - for sample in parser.samples: - logger.debug(f"Running update on: {sample['sample']}") - update_ww_sample(ctx=self.ctx, sample_obj=sample) - dlg = AlertPop(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information') - dlg.exec() + self, result = import_pcr_results_function(self) + self.result_reporter(result) + # home_dir = str(Path(self.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0]) + # parser = PCRParser(ctx=self.ctx, filepath=fname) + # logger.debug(f"Attempting lookup for {parser.plate_num}") + # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=parser.plate_num) + # try: + # logger.debug(f"Found submission: {sub.rsl_plate_num}") + # except AttributeError: + # logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.") + # parser.plate_num = "-".join(parser.plate_num.split("-")[:-1]) + # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=parser.plate_num) + # try: + # logger.debug(f"Found submission: {sub.rsl_plate_num}") + # except AttributeError: + # logger.error(f"Rescue of {parser.plate_num} failed.") + # return + # # jout = json.dumps(parser.pcr) + # count = 0 + # if hasattr(sub, 'pcr_info') and sub.pcr_info != None: + # existing = json.loads(sub.pcr_info) + # else: + # # jout = None + # existing = None + # if existing != None: + # try: + # logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}") + # if json.dumps(parser.pcr) not in sub.pcr_info: + # existing.append(parser.pcr) + # logger.debug(f"Setting: {existing}") + # sub.pcr_info = json.dumps(existing) + # except TypeError: + # logger.error(f"Error updating!") + # sub.pcr_info = json.dumps([parser.pcr]) + # logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}") + # else: + # sub.pcr_info = json.dumps([parser.pcr]) + # self.ctx['database_session'].add(sub) + # logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}") + # logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}") + # self.ctx["database_session"].commit() + # logger.debug(f"Got {len(parser.samples)} to update!") + # for sample in parser.samples: + # logger.debug(f"Running update on: {sample['sample']}") + # sample['plate_rsl'] = sub.rsl_plate_num + # update_ww_sample(ctx=self.ctx, sample_obj=sample) + # dlg = AlertPop(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information') + # dlg.exec() class AddSubForm(QWidget): diff --git a/src/submissions/frontend/functions.py b/src/submissions/frontend/all_window_functions.py similarity index 73% rename from src/submissions/frontend/functions.py rename to src/submissions/frontend/all_window_functions.py index 321c1a0..4e24c1c 100644 --- a/src/submissions/frontend/functions.py +++ b/src/submissions/frontend/all_window_functions.py @@ -1,7 +1,4 @@ -''' -contains operations used by multiple widgets. -''' -from backend.db.models import * +from pathlib import Path import logging from PyQt6.QtWidgets import ( QMainWindow, QLabel, QToolBar, @@ -11,9 +8,18 @@ from PyQt6.QtWidgets import ( QSpinBox, QScrollArea ) - logger = logging.getLogger(f"submissions.{__name__}") +def select_open_file(obj:QMainWindow, extension:str) -> Path: + home_dir = str(Path(obj.ctx["directory_path"])) + fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{extension}(*.{extension})")[0]) + return fname + +def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path: + home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() + fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0]) + return fname + def extract_form_info(object) -> dict: """ retrieves object names and values from form @@ -24,6 +30,7 @@ def extract_form_info(object) -> dict: Returns: dict: dictionary of objectName:text items """ + from frontend.custom_widgets import ReagentTypeForm dicto = {} reagents = {} @@ -51,4 +58,4 @@ def extract_form_info(object) -> dict: # value for ad hoc check above if reagents != {}: return dicto, reagents - return dicto \ No newline at end of file + return dicto diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index b49d4cc..109f081 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -14,7 +14,7 @@ from PyQt6.QtCore import Qt, QDate, QSize # from submissions.backend.db.functions import lookup_kittype_by_use # from submissions.backend.db import lookup_regent_by_type_name_and_kit_name from tools import check_not_nan -from ..functions import extract_form_info +from ..all_window_functions import extract_form_info from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_kittype_by_use#, lookup_regent_by_type_name_and_kit_name from backend.excel.parser import SheetParser from jinja2 import Environment, FileSystemLoader diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py new file mode 100644 index 0000000..84763f4 --- /dev/null +++ b/src/submissions/frontend/main_window_functions.py @@ -0,0 +1,658 @@ +''' +contains operations used by multiple widgets. +''' +from datetime import date +import difflib +from getpass import getuser +import inspect +from pathlib import Path +import pprint +import re +import yaml +import json +from typing import Tuple +from openpyxl.utils import get_column_letter +from xhtml2pdf import pisa +import pandas as pd +from backend.db.models import * +import logging +from PyQt6.QtWidgets import ( + QMainWindow, QLabel, QWidget, QPushButton, QFileDialog, + QLineEdit, QMessageBox, QComboBox, QDateEdit +) +from .all_window_functions import extract_form_info, select_open_file, select_save_file +from PyQt6.QtCore import QSignalBlocker +from backend.db.functions import ( + lookup_all_orgs, lookup_kittype_by_use, lookup_kittype_by_name, + construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range, + create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type, + lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_ww_sample +) +from backend.excel.parser import SheetParser, PCRParser +from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df +from tools import RSLNamer, check_not_nan, check_kit_integrity +from .custom_widgets.pop_ups import AlertPop, QuestionAsker +from .custom_widgets import ReportDatePicker, ReagentTypeForm +from .custom_widgets.misc import ImportReagent +from .visualizations.control_charts import create_charts, construct_html + + +logger = logging.getLogger(f"submissions.{__name__}") + +def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]: + result = None + # from .custom_widgets.misc import ImportReagent + # from .custom_widgets.pop_ups import AlertPop + logger.debug(obj.ctx) + # initialize samples + obj.samples = [] + obj.reagents = {} + # set file dialog + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir)[0]) + fname = select_open_file(obj, extension="xlsx") + logger.debug(f"Attempting to parse file: {fname}") + if not fname.exists(): + result = dict(message=f"File {fname.__str__()} not found.", status="critical") + return obj, result + # create sheetparser using excel sheet and context from gui + try: + prsr = SheetParser(fname, **obj.ctx) + except PermissionError: + logger.error(f"Couldn't get permission to access file: {fname}") + return + if prsr.sub['rsl_plate_num'] == None: + prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name + logger.debug(f"prsr.sub = {prsr.sub}") + # destroy any widgets from previous imports + for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): + item.setParent(None) + # regex to parser out different variable types for decision making + variable_parser = re.compile(r""" + # (?x) + (?P^extraction_kit$) | + (?P^submitted_date$) | + (?P)^submitting_lab$ | + (?P)^samples$ | + (?P^lot_.*$) | + (?P^csv$) + """, re.VERBOSE) + for item in prsr.sub: + logger.debug(f"Item: {item}") + # attempt to match variable name to regex group + try: + mo = variable_parser.fullmatch(item).lastgroup + except AttributeError: + mo = "other" + logger.debug(f"Mo: {mo}") + match mo: + case 'submitting_lab': + # create label + obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + logger.debug(f"{item}: {prsr.sub[item]}") + # create combobox to hold looked up submitting labs + add_widget = QComboBox() + labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)] + # try to set closest match to top of list + try: + labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0) + except (TypeError, ValueError): + pass + # set combobox values to lookedup values + add_widget.addItems(labs) + case 'extraction_kit': + # create label + obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # if extraction kit not available, all other values fail + if not check_not_nan(prsr.sub[item]): + msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") + msg.exec() + # create combobox to hold looked up kits + add_widget = QComboBox() + # lookup existing kits by 'submission_type' decided on by sheetparser + uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])] + add_widget.addItems(uses) + if check_not_nan(prsr.sub[item]): + obj.ext_kit = prsr.sub[item] + else: + obj.ext_kit = add_widget.currentText() + case 'submitted_date': + # create label + obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + # uses base calendar + add_widget = QDateEdit(calendarPopup=True) + # sets submitted date based on date found in excel sheet + try: + add_widget.setDate(prsr.sub[item]) + # if not found, use today + except: + add_widget.setDate(date.today()) + case 'reagent': + # create label + reg_label = QLabel(item.replace("_", " ").title()) + reg_label.setObjectName(f"lot_{item}_label") + obj.table_widget.formlayout.addWidget(reg_label) + # create reagent choice widget + add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr) + obj.reagents[item] = prsr.sub[item] + case 'samples': + # hold samples in 'obj' until form submitted + logger.debug(f"{item}: {prsr.sub[item]}") + obj.samples = prsr.sub[item] + add_widget = None + case 'csv': + obj.csv = prsr.sub[item] + case _: + # anything else gets added in as a line edit + obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title())) + add_widget = QLineEdit() + logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}") + add_widget.setText(str(prsr.sub[item]).replace("_", " ")) + try: + add_widget.setObjectName(item) + logger.debug(f"Widget name set to: {add_widget.objectName()}") + obj.table_widget.formlayout.addWidget(add_widget) + except AttributeError as e: + logger.error(e) + # compare obj.reagents with expected reagents in kit + if hasattr(obj, 'ext_kit'): + obj.kit_integrity_completion() + logger.debug(f"Imported reagents: {obj.reagents}") + + return obj, result + +def kit_reload_function(obj:QMainWindow) -> QMainWindow: + result = None + for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): + # item.setParent(None) + if isinstance(item, QLabel): + if item.text().startswith("Lot"): + item.setParent(None) + else: + logger.debug(f"Type of {item.objectName()} is {type(item)}") + if item.objectName().startswith("lot_"): + item.setParent(None) + obj.kit_integrity_completion_function() + return obj, result + +def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow: + result = None + # from .custom_widgets.misc import ImportReagent + # from .custom_widgets.pop_ups import AlertPop + logger.debug(inspect.currentframe().f_back.f_code.co_name) + kit_widget = obj.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit') + logger.debug(f"Kit selector: {kit_widget}") + obj.ext_kit = kit_widget.currentText() + logger.debug(f"Checking integrity of {obj.ext_kit}") + kit = lookup_kittype_by_name(ctx=obj.ctx, name=obj.ext_kit) + reagents_to_lookup = [item.replace("lot_", "") for item in obj.reagents] + logger.debug(f"Reagents for lookup for {kit.name}: {reagents_to_lookup}") + kit_integrity = check_kit_integrity(kit, reagents_to_lookup) + if kit_integrity != None: + # msg = AlertPop(message=kit_integrity['message'], status="critical") + # msg.exec() + result = dict(message=kit_integrity['message'], status="Warning") + for item in kit_integrity['missing']: + obj.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}")) + add_widget = ImportReagent(ctx=obj.ctx, item=item) + obj.table_widget.formlayout.addWidget(add_widget) + submit_btn = QPushButton("Submit") + submit_btn.setObjectName("lot_submit_btn") + obj.table_widget.formlayout.addWidget(submit_btn) + submit_btn.clicked.connect(obj.submit_new_sample) + return obj, result + +def submit_new_sample_function(obj:QMainWindow) -> QMainWindow: + result = None + # from .custom_widgets.misc import ImportReagent + # from .custom_widgets.pop_ups import AlertPop, QuestionAsker + info = extract_form_info(obj.table_widget.tab1) + reagents = {k:v for k,v in info.items() if k.startswith("lot_")} + info = {k:v for k,v in info.items() if not k.startswith("lot_")} + logger.debug(f"Info: {info}") + logger.debug(f"Reagents: {reagents}") + parsed_reagents = [] + # compare reagents in form to reagent database + for reagent in reagents: + wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent]) + logger.debug(f"Looked up reagent: {wanted_reagent}") + # if reagent not found offer to add to database + if wanted_reagent == None: + r_lot = reagents[reagent] + dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?") + if dlg.exec(): + logger.debug(f"checking reagent: {reagent} in obj.reagents. Result: {obj.reagents[reagent]}") + expiry_date = obj.reagents[reagent]['exp'] + wanted_reagent = obj.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date) + else: + # In this case we will have an empty reagent and the submission will fail kit integrity check + logger.debug("Will not add reagent.") + if wanted_reagent != None: + parsed_reagents.append(wanted_reagent) + # move samples into preliminary submission dict + info['samples'] = obj.samples + info['uploaded_by'] = getuser() + # construct submission object + logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") + base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info) + # check output message for issues + match result['code']: + # code 1: ask for overwrite + case 1: + dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message']) + if dlg.exec(): + # Do not add duplicate reagents. + base_submission.reagents = [] + else: + return obj, dict(message="Overwrite cancelled", status="Information") + # code 2: No RSL plate number given + case 2: + return obj, dict(message=result['message'], status='critical') + case _: + pass + # add reagents to submission object + for reagent in parsed_reagents: + base_submission.reagents.append(reagent) + logger.debug("Checking kit integrity...") + kit_integrity = check_kit_integrity(base_submission) + if kit_integrity != None: + return obj, dict(message=kit_integrity['message'], status="critical") + logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") + result = store_submission(ctx=obj.ctx, base_submission=base_submission) + # check result of storing for issues + # update summary sheet + obj.table_widget.sub_wid.setData() + # reset form + for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): + item.setParent(None) + if hasattr(obj, 'csv'): + dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") + if dlg.exec(): + # home_dir = Path(obj.ctx["directory_path"]).joinpath(f"{base_submission.rsl_plate_num}.csv").resolve().__str__() + # fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter=".csv")[0]) + fname = select_save_file(obj, f"{base_submission.rsl_plate_num}.csv", extension="csv") + try: + obj.csv.to_csv(fname.__str__(), index=False) + except PermissionError: + logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.") + return obj, result + +def generate_report_function(obj:QMainWindow) -> QMainWindow: + # from .custom_widgets import ReportDatePicker + result = None + dlg = ReportDatePicker() + if dlg.exec(): + info = extract_form_info(dlg) + logger.debug(f"Report info: {info}") + # find submissions based on date range + subs = lookup_submissions_by_date_range(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date']) + # convert each object to dict + records = [item.report_dict() for item in subs] + # make dataframe from record dictionaries + df = make_report_xlsx(records=records) + html = make_report_html(df=df, start_date=info['start_date'], end_date=info['end_date']) + # setup filedialog to handle save location of report + home_dir = Path(obj.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__() + fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter=".pdf")[0]) + # logger.debug(f"report output name: {fname}") + with open(fname, "w+b") as f: + pisa.CreatePDF(html, dest=f) + writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl') + df.to_excel(writer, sheet_name="Report") + worksheet = writer.sheets['Report'] + for idx, col in enumerate(df): # loop through all columns + series = df[col] + max_len = max(( + series.astype(str).map(len).max(), # len of largest item + len(str(series.name)) # len of column name/header + )) + 20 # adding a little extra space + try: + worksheet.column_dimensions[get_column_letter(idx)].width = max_len + except ValueError: + pass + for cell in worksheet['D']: + if cell.row > 1: + cell.style = 'Currency' + writer.close() + return obj, result + +def add_kit_function(obj:QMainWindow) -> QMainWindow: + result = None + # setup file dialog to find yaml flie + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "yml(*.yml)")[0]) + fname = select_open_file(obj, extension="yml") + assert fname.exists() + # read yaml file + try: + with open(fname.__str__(), "r") as stream: + try: + exp = yaml.load(stream, Loader=yaml.Loader) + except yaml.YAMLError as exc: + logger.error(f'Error reading yaml file {fname}: {exc}') + return {} + except PermissionError: + return + # send to kit creator function + result = create_kit_from_yaml(ctx=obj.ctx, exp=exp) + # match result['code']: + # case 0: + # msg = AlertPop(message=result['message'], status='info') + # case 1: + # msg = AlertPop(message=result['message'], status='critical') + # msg.exec() + return obj, result + +def add_org_function(obj:QMainWindow) -> QMainWindow: + result = None + # setup file dialog to find yaml flie + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "yml(*.yml)")[0]) + fname = select_open_file(obj, extension="yml") + assert fname.exists() + # read yaml file + try: + with open(fname.__str__(), "r") as stream: + try: + org = yaml.load(stream, Loader=yaml.Loader) + except yaml.YAMLError as exc: + logger.error(f'Error reading yaml file {fname}: {exc}') + return obj, dict(message=f"There was a problem reading yaml file {fname.__str__()}", status="critical") + except PermissionError: + return obj, result + # send to kit creator function + result = create_org_from_yaml(ctx=obj.ctx, org=org) + # match result['code']: + # case 0: + # msg = AlertPop(message=result['message'], status='information') + # case 1: + # msg = AlertPop(message=result['message'], status='critical') + # msg.exec() + return obj, result + +def controls_getter_function(obj:QMainWindow) -> QMainWindow: + result = None + # subtype defaults to disabled + try: + obj.table_widget.sub_typer.disconnect() + except TypeError: + pass + # correct start date being more recent than end date and rerun + if obj.table_widget.datepicker.start_date.date() > obj.table_widget.datepicker.end_date.date(): + logger.warning("Start date after end date is not allowed!") + threemonthsago = obj.table_widget.datepicker.end_date.date().addDays(-60) + # block signal that will rerun controls getter and set start date + with QSignalBlocker(obj.table_widget.datepicker.start_date) as blocker: + obj.table_widget.datepicker.start_date.setDate(threemonthsago) + obj._controls_getter() + return obj, result + # convert to python useable date object + obj.start_date = obj.table_widget.datepicker.start_date.date().toPyDate() + obj.end_date = obj.table_widget.datepicker.end_date.date().toPyDate() + obj.con_type = obj.table_widget.control_typer.currentText() + obj.mode = obj.table_widget.mode_typer.currentText() + obj.table_widget.sub_typer.clear() + # lookup subtypes + sub_types = get_control_subtypes(ctx=obj.ctx, type=obj.con_type, mode=obj.mode) + if sub_types != []: + # block signal that will rerun controls getter and update sub_typer + with QSignalBlocker(obj.table_widget.sub_typer) as blocker: + obj.table_widget.sub_typer.addItems(sub_types) + obj.table_widget.sub_typer.setEnabled(True) + obj.table_widget.sub_typer.currentTextChanged.connect(obj._chart_maker) + else: + obj.table_widget.sub_typer.clear() + obj.table_widget.sub_typer.setEnabled(False) + obj._chart_maker() + return obj, result + +def chart_maker_function(obj:QMainWindow) -> QMainWindow: + result = None + logger.debug(f"Control getter context: \n\tControl type: {obj.con_type}\n\tMode: {obj.mode}\n\tStart Date: {obj.start_date}\n\tEnd Date: {obj.end_date}") + if obj.table_widget.sub_typer.currentText() == "": + obj.subtype = None + else: + obj.subtype = obj.table_widget.sub_typer.currentText() + logger.debug(f"Subtype: {obj.subtype}") + # query all controls using the type/start and end dates from the gui + controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) + # if no data found from query set fig to none for reporting in webview + if controls == None: + fig = None + else: + # change each control to list of dicts + data = [control.convert_by_mode(mode=obj.mode) for control in controls] + # flatten data to one dimensional list + data = [item for sublist in data for item in sublist] + logger.debug(f"Control objects going into df conversion: {data}") + # send to dataframe creator + df = convert_data_list_to_df(ctx=obj.ctx, input=data, subtype=obj.subtype) + if obj.subtype == None: + title = obj.mode + else: + title = f"{obj.mode} - {obj.subtype}" + # send dataframe to chart maker + fig = create_charts(ctx=obj.ctx, df=df, ytitle=title) + logger.debug(f"Updating figure...") + # construct html for webview + html = construct_html(figure=fig) + logger.debug(f"The length of html code is: {len(html)}") + obj.table_widget.webengineview.setHtml(html) + obj.table_widget.webengineview.update() + logger.debug("Figure updated... I hope.") + return obj, result + +def link_controls_function(obj:QMainWindow) -> QMainWindow: + result = None + all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture") + logger.debug(all_bcs) + all_controls = get_all_controls(obj.ctx) + ac_list = [control.name for control in all_controls] + count = 0 + for bcs in all_bcs: + logger.debug(f"Running for {bcs.rsl_plate_num}") + logger.debug(f"Here is the current control: {[control.name for control in bcs.controls]}") + samples = [sample.sample_id for sample in bcs.samples] + logger.debug(bcs.controls) + for sample in samples: + # replace below is a stopgap method because some dingus decided to add spaces in some of the ATCC49... so it looks like "ATCC 49"... + if " " in sample: + logger.warning(f"There is not supposed to be a space in the sample name!!!") + sample = sample.replace(" ", "") + # if sample not in ac_list: + if not any([ac.startswith(sample) for ac in ac_list]): + continue + else: + for control in all_controls: + diff = difflib.SequenceMatcher(a=sample, b=control.name).ratio() + if control.name.startswith(sample): + logger.debug(f"Checking {sample} against {control.name}... {diff}") + logger.debug(f"Found match:\n\tSample: {sample}\n\tControl: {control.name}\n\tDifference: {diff}") + if control in bcs.controls: + logger.debug(f"{control.name} already in {bcs.rsl_plate_num}, skipping") + continue + else: + logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control") + bcs.controls.append(control) + # bcs.control_id.append(control.id) + control.submission = bcs + control.submission_id = bcs.id + obj.ctx["database_session"].add(control) + count += 1 + obj.ctx["database_session"].add(bcs) + logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}") + result = dict(message=f"We added {count} controls to bacterial cultures.", status="information") + logger.debug(result) + obj.ctx['database_session'].commit() + # msg = QMessageBox() + # msg.setText("Controls added") + # msg.setInformativeText(result) + # msg.setWindowTitle("Controls added") + # msg.exec() + return obj, result + +def link_extractions_function(obj:QMainWindow) -> QMainWindow: + result = None + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0]) + fname = select_open_file(obj, extension="csv") + with open(fname.__str__(), 'r') as f: + runs = [col.strip().split(",") for col in f.readlines()] + count = 0 + for run in runs: + new_run = dict( + start_time=run[0].strip(), + rsl_plate_num=run[1].strip(), + sample_count=run[2].strip(), + status=run[3].strip(), + experiment_name=run[4].strip(), + end_time=run[5].strip() + ) + for ii in range(6, len(run)): + new_run[f"column{str(ii-5)}_vol"] = run[ii] + sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) + try: + logger.debug(f"Found submission: {sub.rsl_plate_num}") + count += 1 + except AttributeError: + continue + if sub.extraction_info != None: + existing = json.loads(sub.extraction_info) + else: + existing = None + try: + if json.dumps(new_run) in sub.extraction_info: + logger.debug(f"Looks like we already have that info.") + continue + except TypeError: + pass + if existing != None: + try: + logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}") + existing.append(new_run) + logger.debug(f"Setting: {existing}") + sub.extraction_info = json.dumps(existing) + except TypeError: + logger.error(f"Error updating!") + sub.extraction_info = json.dumps([new_run]) + logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}") + else: + sub.extraction_info = json.dumps([new_run]) + obj.ctx['database_session'].add(sub) + obj.ctx["database_session"].commit() + result = dict(message=f"We added {count} logs to the database.", status='information') + return obj, result + +def link_pcr_function(obj:QMainWindow) -> QMainWindow: + result = None + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0]) + fname = select_open_file(obj, extension="csv") + with open(fname.__str__(), 'r') as f: + runs = [col.strip().split(",") for col in f.readlines()] + count = 0 + for run in runs: + new_run = dict( + start_time=run[0].strip(), + rsl_plate_num=run[1].strip(), + biomek_status=run[2].strip(), + quant_status=run[3].strip(), + experiment_name=run[4].strip(), + end_time=run[5].strip() + ) + # for ii in range(6, len(run)): + # obj[f"column{str(ii-5)}_vol"] = run[ii] + sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) + try: + logger.debug(f"Found submission: {sub.rsl_plate_num}") + except AttributeError: + continue + if hasattr(sub, 'pcr_info') and sub.pcr_info != None: + existing = json.loads(sub.pcr_info) + else: + existing = None + try: + if json.dumps(new_run) in sub.pcr_info: + logger.debug(f"Looks like we already have that info.") + continue + else: + count += 1 + except TypeError: + logger.error(f"No json to dump") + if existing != None: + try: + logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}") + existing.append(new_run) + logger.debug(f"Setting: {existing}") + sub.pcr_info = json.dumps(existing) + except TypeError: + logger.error(f"Error updating!") + sub.pcr_info = json.dumps([new_run]) + logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}") + else: + sub.pcr_info = json.dumps([new_run]) + obj.ctx['database_session'].add(sub) + obj.ctx["database_session"].commit() + result = dict(message=f"We added {count} logs to the database.", status='information') + return obj, result + +def import_pcr_results_function(obj:QMainWindow) -> QMainWindow: + result = None + # home_dir = str(Path(obj.ctx["directory_path"])) + # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0]) + fname = select_open_file(obj, extension="xlsx") + parser = PCRParser(ctx=obj.ctx, filepath=fname) + logger.debug(f"Attempting lookup for {parser.plate_num}") + sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) + try: + logger.debug(f"Found submission: {sub.rsl_plate_num}") + except AttributeError: + logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.") + parser.plate_num = "-".join(parser.plate_num.split("-")[:-1]) + sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) + try: + logger.debug(f"Found submission: {sub.rsl_plate_num}") + except AttributeError: + logger.error(f"Rescue of {parser.plate_num} failed.") + return obj, dict(message="Couldn't find a submission with that RSL number.", status="warning") + # jout = json.dumps(parser.pcr) + count = 0 + if hasattr(sub, 'pcr_info') and sub.pcr_info != None: + existing = json.loads(sub.pcr_info) + else: + # jout = None + existing = None + if existing != None: + try: + logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}") + if json.dumps(parser.pcr) not in sub.pcr_info: + existing.append(parser.pcr) + logger.debug(f"Setting: {existing}") + sub.pcr_info = json.dumps(existing) + except TypeError: + logger.error(f"Error updating!") + sub.pcr_info = json.dumps([parser.pcr]) + logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}") + else: + sub.pcr_info = json.dumps([parser.pcr]) + obj.ctx['database_session'].add(sub) + logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}") + logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}") + obj.ctx["database_session"].commit() + logger.debug(f"Got {len(parser.samples)} to update!") + for sample in parser.samples: + logger.debug(f"Running update on: {sample['sample']}") + sample['plate_rsl'] = sub.rsl_plate_num + update_ww_sample(ctx=obj.ctx, sample_obj=sample) + result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information') + return obj, result + # dlg.exec() + + + + + + + + + diff --git a/src/submissions/frontend/visualizations/control_charts.py b/src/submissions/frontend/visualizations/control_charts.py index b52b14f..7a41637 100644 --- a/src/submissions/frontend/visualizations/control_charts.py +++ b/src/submissions/frontend/visualizations/control_charts.py @@ -1,6 +1,7 @@ ''' Functions for constructing controls graphs using plotly. ''' +import plotly import plotly.express as px import pandas as pd from pathlib import Path @@ -276,3 +277,13 @@ def divide_chunks(input_list:list, chunk_count:int): """ k, m = divmod(len(input_list), chunk_count) return (input_list[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(chunk_count)) + + +def construct_html(figure:Figure) -> str: + html = '' + if figure != None: + html += plotly.offline.plot(figure, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image') + else: + html += "

No data was retrieved for the given parameters.

" + html += '' + return html \ No newline at end of file diff --git a/src/submissions/templates/submission_details.html b/src/submissions/templates/submission_details.html index 3e84286..4a9c55d 100644 --- a/src/submissions/templates/submission_details.html +++ b/src/submissions/templates/submission_details.html @@ -8,26 +8,26 @@

Submission Details for {{ sub['Plate Number'] }}

{% for key, value in sub.items() if key not in excluded %} {% if loop.index == 1 %} -    {% if key=='Cost' %}{{ key }}: {{ "${:,.2f}".format(value) }}{% else %}{{ key }}: {{ value }}{% endif %}
+    {{ key }}: {% if key=='Cost' %}{{ "${:,.2f}".format(value) }}{% else %}{{ value }}{% endif %}
{% else %} -     {% if key=='Cost' %}{{ key }}: {{ "${:,.2f}".format(value) }}{% else %}{{ key }}: {{ value }}{% endif %}
+     {{ key }}: {% if key=='Cost' %} {{ "${:,.2f}".format(value) }}{% else %}{{ value }}{% endif %}
{% endif %} {% endfor %}

Reagents:

{% for item in sub['reagents'] %} {% if loop.index == 1%} -    {{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})
+    {{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})
{% else %} -     {{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})
+     {{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})
{% endif %} {% endfor %}

{% if sub['samples'] %}

Samples:

{% for item in sub['samples'] %} {% if loop.index == 1 %} -    {{ item['well'] }}: {{ item['name'] }}
+    {{ item['well'] }}: {{ item['name']|replace('\n\t', '
        ') }}
{% else %} -     {{ item['well'] }}: {{ item['name'] }}
+     {{ item['well'] }}: {{ item['name']|replace('\n\t', '
        ') }}
{% endif %} {% endfor %}

{% endif %} @@ -52,12 +52,12 @@

Extraction Status:

{% for key, value in entry.items() %} {% if loop.index == 1%} -    {{ key|replace('_', ' ')|title() }}: {{ value }}
+    {{ key|replace('_', ' ')|title() }}: {{ value }}
{% else %} {% if "column" in key %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
+     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
{% else %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}
+     {{ key|replace('_', ' ')|title() }}: {{ value }}
{% endif %} {% endif %} {% endfor %}

@@ -72,12 +72,12 @@ {% endif %}

{% for key, value in entry.items() if key != 'imported_by'%} {% if loop.index == 1%} -    {{ key|replace('_', ' ')|title() }}: {{ value }}
+    {{ key|replace('_', ' ')|title() }}: {{ value }}
{% else %} {% if "column" in key %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
+     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
{% else %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}
+     {{ key|replace('_', ' ')|title() }}: {{ value }}
{% endif %} {% endif %} {% endfor %}

diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 2e2613e..60b3dca 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -121,11 +121,115 @@ def check_if_app(ctx:dict=None) -> bool: def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: + """ + Uses regex to retrieve the plate number and submission type from an input string + + Args: + in_str (str): string to be parsed + + Returns: + Tuple[str, str]: tuple of (output rsl number, submission_type) + """ in_str = in_str.split("\\")[-1] logger.debug(f"Attempting match of {in_str}") regex = re.compile(r""" - (?PRSL-WW-20\d{6})|(?PRSL-\d{2}-\d{4}) + (?PRSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?PRSL-\d{2}-\d{4}) """, re.VERBOSE) m = regex.search(in_str) - return (m.group(), m.lastgroup) - \ No newline at end of file + parsed = m.group().replace("_", "-") + return (parsed, m.lastgroup) + + +def format_rsl_number(instr:str) -> str: + """ + Enforces proper formatting on a plate number + Depreciated, replaced by RSLNamer class + + Args: + instr (str): input plate number + + Returns: + str: _description_ + """ + output = instr.upper() + output = output.replace("_", "-") + return output + + +def check_regex_match(pattern:str, check:str) -> bool: + try: + return bool(re.match(fr"{pattern}", check)) + except TypeError: + return False + + +class RSLNamer(object): + """ + Object that will enforce proper formatting on RSL plate names. + """ + def __init__(self, instr:str): + # self.parsed_name, self.submission_type = self.retrieve_rsl_number(instr) + self.retrieve_rsl_number(in_str=instr) + if self.submission_type != None: + parser = getattr(self, f"enforce_{self.submission_type}") + parser() + self.parsed_name = self.parsed_name.replace("_", "-") + + + def retrieve_rsl_number(self, in_str:str) -> Tuple[str, str]: + """ + Uses regex to retrieve the plate number and submission type from an input string + + Args: + in_str (str): string to be parsed + + Returns: + Tuple[str, str]: tuple of (output rsl number, submission_type) + """ + logger.debug(f"Attempting split of {in_str}") + try: + in_str = in_str.split("\\")[-1] + except AttributeError: + self.parsed_name = None + self.submission_type = None + return + logger.debug(f"Attempting match of {in_str}") + regex = re.compile(r""" + (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d(?!\d))?)| + (?PRSL-?\d{2}-?\d{4}) + """, flags = re.IGNORECASE | re.VERBOSE) + m = regex.search(in_str) + try: + self.parsed_name = m.group().upper() + self.submission_type = m.lastgroup + except AttributeError as e: + logger.critical("No RSL plate number found or submission type found!") + logger.debug(f"The cause of the above error was: {e}") + + def enforce_wastewater(self): + """ + Uses regex to enforce proper formatting of wastewater samples + """ + # self.parsed_name = re.sub(r"(\d)-(\d)", "\1\2", self.parsed_name) + # year = str(date.today().year)[:2] + self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) + self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") + # .replace(f"WW{year}", f"WW-{year}") + self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) + self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) + + def enforce_bacterial_culture(self): + """ + Uses regex to enforce proper formatting of bacterial culture samples + """ + # year = str(date.today().year)[2:] + # self.parsed_name = self.parsed_name.replace(f"RSL{year}", f"RSL-{year}") + # reg_year = re.compile(fr"{year}(?P\d\d\d\d)") + self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) + self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) + # year = regex.group('year') + # rsl = regex.group('rsl') + # self.parsed_name = re.sub(fr"{year}(\d\d\d\d)", fr"{year}-\1", self.parsed_name) + # plate_search = reg_year.search(self.parsed_name) + # if plate_search != None: + # self.parsed_name = re.sub(reg_year, f"{year}-{plate_search.group('rsl')}", self.parsed_name) \ No newline at end of file