Refactored to increase ui robustness.

This commit is contained in:
Landon Wark
2023-04-04 09:31:37 -05:00
parent 3c9f095937
commit 4398517a3e
15 changed files with 1580 additions and 672 deletions

View File

@@ -1,5 +1,11 @@
## 202304.01
- Improved function results output to ui.
- Added Well Call Assessment to PCR scraping.
## 202303.05 ## 202303.05
- Increased robustness of RSL plate number enforcement.
- Added in ability to scrape and include PCR results for wastewater. - Added in ability to scrape and include PCR results for wastewater.
## 202303.04 ## 202303.04

View File

@@ -31,6 +31,8 @@
a. Both an excel sheet and a pdf should be generated containing summary information for submissions made by each client lab. a. Both an excel sheet and a pdf should be generated containing summary information for submissions made by each client lab.
## Importing PCR results: ## Importing PCR results:
This is meant to import .xslx files created from the Design & Analysis Software
1. Click on 'File' -> 'Import PCR Results'. 1. Click on 'File' -> 'Import PCR Results'.
2. Use the file dialog to locate the .xlsx file you want to import. 2. Use the file dialog to locate the .xlsx file you want to import.
3. Click 'Okay'. 3. Click 'Okay'.

1
TODO.md Normal file
View File

@@ -0,0 +1 @@
- [ ] Move bulk of functions from frontend.__init__ to frontend.functions as __init__ is getting bloated.

View File

@@ -0,0 +1,57 @@
"""added target status to ww samples
Revision ID: 00de69ad6eab
Revises: 8adc85dd9b92
Create Date: 2023-03-31 14:51:40.705301
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import sqlite
# revision identifiers, used by Alembic.
revision = '00de69ad6eab'
down_revision = '8adc85dd9b92'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# op.drop_table('_alembic_tmp__submissions')
with op.batch_alter_table('_ww_samples', schema=None) as batch_op:
batch_op.add_column(sa.Column('n1_status', sa.String(length=32), nullable=True))
batch_op.add_column(sa.Column('n2_status', sa.String(length=32), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('_ww_samples', schema=None) as batch_op:
batch_op.drop_column('n2_status')
batch_op.drop_column('n1_status')
# op.create_table('_alembic_tmp__submissions',
# sa.Column('id', sa.INTEGER(), nullable=False),
# sa.Column('rsl_plate_num', sa.VARCHAR(length=32), nullable=False),
# sa.Column('submitter_plate_num', sa.VARCHAR(length=127), nullable=True),
# sa.Column('submitted_date', sa.TIMESTAMP(), nullable=True),
# sa.Column('submitting_lab_id', sa.INTEGER(), nullable=True),
# sa.Column('sample_count', sa.INTEGER(), nullable=True),
# sa.Column('extraction_kit_id', sa.INTEGER(), nullable=True),
# sa.Column('submission_type', sa.VARCHAR(length=32), nullable=True),
# sa.Column('technician', sa.VARCHAR(length=64), nullable=True),
# sa.Column('reagents_id', sa.VARCHAR(), nullable=True),
# sa.Column('extraction_info', sqlite.JSON(), nullable=True),
# sa.Column('run_cost', sa.FLOAT(), nullable=True),
# sa.Column('uploaded_by', sa.VARCHAR(length=32), nullable=True),
# sa.Column('pcr_info', sqlite.JSON(), nullable=True),
# sa.ForeignKeyConstraint(['extraction_kit_id'], ['_kits.id'], ondelete='SET NULL'),
# sa.ForeignKeyConstraint(['reagents_id'], ['_reagents.id'], ondelete='SET NULL'),
# sa.ForeignKeyConstraint(['submitting_lab_id'], ['_organizations.id'], ondelete='SET NULL'),
# sa.PrimaryKeyConstraint('id'),
# sa.UniqueConstraint('rsl_plate_num'),
# sa.UniqueConstraint('submitter_plate_num')
# )
# ### end Alembic commands ###

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package # Version of the realpython-reader package
__project__ = "submissions" __project__ = "submissions"
__version__ = "202303.4b" __version__ = "202304.1b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada" __copyright__ = "2022-2023, Government of Canada"
@@ -20,3 +20,11 @@ class bcolors:
ENDC = '\033[0m' ENDC = '\033[0m'
BOLD = '\033[1m' BOLD = '\033[1m'
UNDERLINE = '\033[4m' UNDERLINE = '\033[4m'
# Hello Landon, this is your past self here. I'm trying not to screw you over like I usually do, so I will
# set out the workflow I've imagined for creating new submission types.
# First of all, you will need to write new parsing methods in backend.excel.parser to pull information out of the submission form
# for the submission itself as well as for any samples you can pull out of that same sheet.
# Second, you will have to update the model in backend.db.models.submissions and provide a new polymorph to the BasicSubmission object.
# The BSO should hold the majority of the general info.
# You can also update any of the parsers to pull out any custom info you need, like enforcing RSL plate numbers, scraping PCR results, etc.

View File

@@ -5,7 +5,7 @@ Convenience functions for interacting with the database.
from . import models from . import models
from .models.kits import reagenttypes_kittypes from .models.kits import reagenttypes_kittypes
from .models.submissions import reagents_submissions from .models.submissions import reagents_submissions
from .models.samples import WWSample # from .models.samples import WWSample
import pandas as pd import pandas as pd
import sqlalchemy.exc import sqlalchemy.exc
import sqlite3 import sqlite3
@@ -18,7 +18,6 @@ from sqlalchemy.engine import Engine
import json import json
from getpass import getuser from getpass import getuser
import numpy as np import numpy as np
import yaml import yaml
from pathlib import Path from pathlib import Path
@@ -42,8 +41,10 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d
Returns: Returns:
None|dict : object that indicates issue raised for reporting in gui None|dict : object that indicates issue raised for reporting in gui
""" """
from tools import format_rsl_number
logger.debug(f"Hello from store_submission") logger.debug(f"Hello from store_submission")
# Add all samples to sample table # Add all samples to sample table
base_submission.rsl_plate_num = format_rsl_number(base_submission.rsl_plate_num)
for sample in base_submission.samples: for sample in base_submission.samples:
sample.rsl_plate = base_submission sample.rsl_plate = base_submission
logger.debug(f"Attempting to add sample: {sample.to_string()}") logger.debug(f"Attempting to add sample: {sample.to_string()}")
@@ -60,14 +61,13 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d
except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
logger.debug(f"Hit an integrity error : {e}") logger.debug(f"Hit an integrity error : {e}")
ctx['database_session'].rollback() ctx['database_session'].rollback()
return {"message":"This plate number already exists, so we can't add it."} return {"message":"This plate number already exists, so we can't add it.", "status":"Critical"}
except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e: except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e:
logger.debug(f"Hit an operational error: {e}") logger.debug(f"Hit an operational error: {e}")
ctx['database_session'].rollback() ctx['database_session'].rollback()
return {"message":"The database is locked for editing."} return {"message":"The database is locked for editing.", "status":"Critical"}
return None return None
def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict: def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
""" """
Inserts a reagent into the database. Inserts a reagent into the database.
@@ -87,7 +87,6 @@ def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
return {"message":"The database is locked for editing."} return {"message":"The database is locked for editing."}
return None return None
def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission: def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
""" """
Construct submission object from dictionary Construct submission object from dictionary
@@ -99,14 +98,17 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
Returns: Returns:
models.BasicSubmission: Constructed submission object models.BasicSubmission: Constructed submission object
""" """
from tools import check_not_nan from tools import check_regex_match, RSLNamer
# convert submission type into model name # convert submission type into model name
query = info_dict['submission_type'].replace(" ", "") query = info_dict['submission_type'].replace(" ", "")
# Ensure an rsl plate number exists for the plate # Ensure an rsl plate number exists for the plate
if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]): # if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
if not check_regex_match("^RSL", info_dict["rsl_plate_num"]):
instance = None instance = None
msg = "A proper RSL plate number is required." msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."} return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
else:
info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name
# check database for existing object # check database for existing object
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
# get model based on submission type converted above # get model based on submission type converted above
@@ -171,7 +173,6 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
logger.debug(f"Constructed submissions message: {msg}") logger.debug(f"Constructed submissions message: {msg}")
return instance, {'code':code, 'message':msg} return instance, {'code':code, 'message':msg}
def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
""" """
Construct reagent object from dictionary Construct reagent object from dictionary
@@ -204,7 +205,6 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
# pass # pass
return reagent return reagent
def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
""" """
Query db for reagent based on lot number Query db for reagent based on lot number
@@ -219,7 +219,6 @@ def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
return lookedup return lookedup
def get_all_reagenttype_names(ctx:dict) -> list[str]: def get_all_reagenttype_names(ctx:dict) -> list[str]:
""" """
Lookup all reagent types and get names Lookup all reagent types and get names
@@ -233,7 +232,6 @@ def get_all_reagenttype_names(ctx:dict) -> list[str]:
lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()] lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
return lookedup return lookedup
def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType: def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
""" """
Lookup a single reagent type by name Lookup a single reagent type by name
@@ -250,7 +248,6 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
logger.debug(f"Found ReagentType: {lookedup}") logger.debug(f"Found ReagentType: {lookedup}")
return lookedup return lookedup
def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]: def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
""" """
Lookup kits by a sample type its used for Lookup kits by a sample type its used for
@@ -264,7 +261,6 @@ def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
""" """
return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType: def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
""" """
Lookup a kit type by name Lookup a kit type by name
@@ -279,7 +275,6 @@ def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
logger.debug(f"Querying kittype: {name}") logger.debug(f"Querying kittype: {name}")
return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first() return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]: def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
""" """
Lookup reagents by their type name Lookup reagents by their type name
@@ -293,7 +288,6 @@ def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
""" """
return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]: def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]:
""" """
Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.) Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.)
@@ -325,7 +319,6 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st
output = rt_types.instances output = rt_types.instances
return output return output
def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]: def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]:
""" """
Get all submissions, filtering by type if given Get all submissions, filtering by type if given
@@ -400,7 +393,6 @@ def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
logger.warning(f"Couldn't drop 'pcr_info' column from submissionsheet df.") logger.warning(f"Couldn't drop 'pcr_info' column from submissionsheet df.")
return df return df
def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission: def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
""" """
Lookup submission by id number Lookup submission by id number
@@ -414,7 +406,6 @@ def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
""" """
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]: def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
""" """
Lookup submissions greater than start_date and less than end_date Lookup submissions greater than start_date and less than end_date
@@ -432,7 +423,6 @@ def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_dat
end_date = end_date.strftime("%Y-%m-%d") end_date = end_date.strftime("%Y-%m-%d")
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all()
def get_all_Control_Types_names(ctx:dict) -> list[str]: def get_all_Control_Types_names(ctx:dict) -> list[str]:
""" """
Grabs all control type names from db. Grabs all control type names from db.
@@ -448,7 +438,6 @@ def get_all_Control_Types_names(ctx:dict) -> list[str]:
logger.debug(f"Control Types: {conTypes}") logger.debug(f"Control Types: {conTypes}")
return conTypes return conTypes
def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
""" """
Create and store a new kit in the database based on a .yml file Create and store a new kit in the database based on a .yml file
@@ -491,7 +480,6 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
ctx['database_session'].commit() ctx['database_session'].commit()
return {'code':0, 'message':'Kit has been added', 'status': 'information'} return {'code':0, 'message':'Kit has been added', 'status': 'information'}
def create_org_from_yaml(ctx:dict, org:dict) -> dict: def create_org_from_yaml(ctx:dict, org:dict) -> dict:
""" """
Create and store a new organization based on a .yml file Create and store a new organization based on a .yml file
@@ -528,7 +516,6 @@ def create_org_from_yaml(ctx:dict, org:dict) -> dict:
ctx["database_session"].commit() ctx["database_session"].commit()
return {"code":0, "message":"Organization has been added."} return {"code":0, "message":"Organization has been added."}
def lookup_all_sample_types(ctx:dict) -> list[str]: def lookup_all_sample_types(ctx:dict) -> list[str]:
""" """
Lookup all sample types and get names Lookup all sample types and get names
@@ -544,7 +531,6 @@ def lookup_all_sample_types(ctx:dict) -> list[str]:
uses = list(set([item for sublist in uses for item in sublist])) uses = list(set([item for sublist in uses for item in sublist]))
return uses return uses
def get_all_available_modes(ctx:dict) -> list[str]: def get_all_available_modes(ctx:dict) -> list[str]:
""" """
Get types of analysis for controls Get types of analysis for controls
@@ -564,7 +550,6 @@ def get_all_available_modes(ctx:dict) -> list[str]:
cols = [] cols = []
return cols return cols
def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]: def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]:
""" """
Returns a list of control objects that are instances of the input controltype. Returns a list of control objects that are instances of the input controltype.
@@ -589,7 +574,6 @@ def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None,
logger.debug(f"Returned controls between dates: {output}") logger.debug(f"Returned controls between dates: {output}")
return output return output
def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
""" """
Get subtypes for a control analysis mode Get subtypes for a control analysis mode
@@ -617,7 +601,6 @@ def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
return subtypes return subtypes
def get_all_controls(ctx:dict) -> list[models.Control]: def get_all_controls(ctx:dict) -> list[models.Control]:
""" """
Retrieve a list of all controls from the database Retrieve a list of all controls from the database
@@ -630,7 +613,6 @@ def get_all_controls(ctx:dict) -> list[models.Control]:
""" """
return ctx['database_session'].query(models.Control).all() return ctx['database_session'].query(models.Control).all()
def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission: def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission:
""" """
Retrieve a submission from the database based on rsl plate number Retrieve a submission from the database based on rsl plate number
@@ -644,7 +626,6 @@ def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmissio
""" """
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first()
def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]: def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]:
""" """
Retrieves each submission using a specified reagent. Retrieves each submission using a specified reagent.
@@ -658,7 +639,6 @@ def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[m
""" """
return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all()
def delete_submission_by_id(ctx:dict, id:int) -> None: def delete_submission_by_id(ctx:dict, id:int) -> None:
""" """
Deletes a submission and its associated samples from the database. Deletes a submission and its associated samples from the database.
@@ -683,13 +663,12 @@ def delete_submission_by_id(ctx:dict, id:int) -> None:
ctx["database_session"].delete(sub) ctx["database_session"].delete(sub)
ctx["database_session"].commit() ctx["database_session"].commit()
def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WWSample: def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WWSample:
""" """
Retrieves wastewater sampel from database by rsl sample number Retrieves wastewater sample from database by rsl sample number
Args: Args:
ctx (dict): settings passed dwon from gui ctx (dict): settings passed down from gui
rsl_number (str): sample number assigned by robotics lab rsl_number (str): sample number assigned by robotics lab
Returns: Returns:
@@ -697,6 +676,39 @@ def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WW
""" """
return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first() return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first()
def lookup_ww_sample_by_sub_sample_rsl(ctx:dict, sample_rsl:str, plate_rsl:str) -> models.WWSample:
"""
Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number.
This will likely replace simply looking up by the sample rsl above cine I need to control for repeats.
Args:
ctx (dict): settings passed down from the gui
sample_rsl (str): rsl number of the relevant sample
plate_rsl (str): rsl number of the parent plate
Returns:
models.WWSample: Relevant wastewater object
"""
return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first()
def lookup_ww_sample_by_sub_sample_well(ctx:dict, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WWSample:
"""
Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number.
This will likely replace simply looking up by the sample rsl above cine I need to control for repeats.
Args:
ctx (dict): settings passed down from the gui
sample_rsl (str): rsl number of the relevant sample
well_num (str): well number of the relevant sample
plate_rsl (str): rsl number of the parent plate
Returns:
models.WWSample: Relevant wastewater object
"""
return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission) \
.filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \
.filter(models.WWSample.rsl_number==sample_rsl) \
.filter(models.WWSample.well_number==well_num).first()
def update_ww_sample(ctx:dict, sample_obj:dict): def update_ww_sample(ctx:dict, sample_obj:dict):
""" """
@@ -706,7 +718,10 @@ def update_ww_sample(ctx:dict, sample_obj:dict):
ctx (dict): settings passed down from gui ctx (dict): settings passed down from gui
sample_obj (dict): dictionary representing new values for database object sample_obj (dict): dictionary representing new values for database object
""" """
ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample']) # ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample'])
logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}")
ww_samp = lookup_ww_sample_by_sub_sample_rsl(ctx=ctx, sample_rsl=sample_obj['sample'], plate_rsl=sample_obj['plate_rsl'])
# ww_samp = lookup_ww_sample_by_sub_sample_well(ctx=ctx, sample_rsl=sample_obj['sample'], well_num=sample_obj['well_num'], plate_rsl=sample_obj['plate_rsl'])
if ww_samp != None: if ww_samp != None:
for key, value in sample_obj.items(): for key, value in sample_obj.items():
logger.debug(f"Setting {key} to {value}") logger.debug(f"Setting {key} to {value}")

View File

@@ -27,6 +27,8 @@ class WWSample(Base):
notes = Column(String(2000)) notes = Column(String(2000))
ct_n1 = Column(FLOAT(2)) #: AKA ct for N1 ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
ct_n2 = Column(FLOAT(2)) #: AKA ct for N2 ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
n1_status = Column(String(32))
n2_status = Column(String(32))
seq_submitted = Column(BOOLEAN()) seq_submitted = Column(BOOLEAN())
ww_seq_run_id = Column(String(64)) ww_seq_run_id = Column(String(64))
sample_type = Column(String(8)) sample_type = Column(String(8))
@@ -50,7 +52,7 @@ class WWSample(Base):
dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below
""" """
if self.ct_n1 != None and self.ct_n2 != None: if self.ct_n1 != None and self.ct_n2 != None:
name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)}, ct N2: {'{:.2f}'.format(self.ct_n1)}" name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})"
else: else:
name = self.ww_sample_full_id name = self.ww_sample_full_id
return { return {

View File

@@ -2,17 +2,18 @@
contains parser object for pulling values from client generated submission sheets. contains parser object for pulling values from client generated submission sheets.
''' '''
from getpass import getuser from getpass import getuser
from typing import Tuple
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
from backend.db.models import WWSample, BCSample from backend.db.models import WWSample, BCSample
from backend.db import lookup_ww_sample_by_rsl_sample_number # from backend.db import lookup_ww_sample_by_rsl_sample_number
import logging import logging
from collections import OrderedDict from collections import OrderedDict
import re import re
import numpy as np import numpy as np
from datetime import date from datetime import date
import uuid import uuid
from tools import check_not_nan, retrieve_rsl_number from tools import check_not_nan, RSLNamer
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -84,7 +85,7 @@ class SheetParser(object):
# self.xl is a pd.ExcelFile so we need to parse it into a df # self.xl is a pd.ExcelFile so we need to parse it into a df
submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object) submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object)
self.sub['submitter_plate_num'] = submission_info.iloc[0][1] self.sub['submitter_plate_num'] = submission_info.iloc[0][1]
self.sub['rsl_plate_num'] = submission_info.iloc[10][1] self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name
self.sub['submitted_date'] = submission_info.iloc[1][1] self.sub['submitted_date'] = submission_info.iloc[1][1]
self.sub['submitting_lab'] = submission_info.iloc[0][3] self.sub['submitting_lab'] = submission_info.iloc[0][3]
self.sub['sample_count'] = submission_info.iloc[2][3] self.sub['sample_count'] = submission_info.iloc[2][3]
@@ -202,7 +203,7 @@ class SheetParser(object):
parse_reagents(ext_reagent_range) parse_reagents(ext_reagent_range)
parse_reagents(pcr_reagent_range) parse_reagents(pcr_reagent_range)
# parse samples # parse samples
sample_parser = SampleParser(submission_info.iloc[16:40]) sample_parser = SampleParser(submission_info.iloc[16:])
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples")
self.sub['samples'] = sample_parse() self.sub['samples'] = sample_parse()
self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object) self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object)
@@ -260,24 +261,20 @@ class SampleParser(object):
new_list = [] new_list = []
for sample in self.samples: for sample in self.samples:
new = WWSample() new = WWSample()
if check_not_nan(sample["Unnamed: 9"]):
new.rsl_number = sample['Unnamed: 9']
else:
logger.error(f"No RSL sample number found for this sample.")
continue
new.ww_processing_num = sample['Unnamed: 2'] new.ww_processing_num = sample['Unnamed: 2']
# need to ensure we have a sample id for database integrity # need to ensure we have a sample id for database integrity
try:
not_a_nan = not np.isnan(sample['Unnamed: 3'])
except TypeError:
not_a_nan = True
# if we don't have a sample full id, make one up # if we don't have a sample full id, make one up
if not_a_nan: if check_not_nan(sample['Unnamed: 3']):
new.ww_sample_full_id = sample['Unnamed: 3'] new.ww_sample_full_id = sample['Unnamed: 3']
else: else:
new.ww_sample_full_id = uuid.uuid4().hex.upper() new.ww_sample_full_id = uuid.uuid4().hex.upper()
new.rsl_number = sample['Unnamed: 9']
# need to ensure we get a collection date # need to ensure we get a collection date
try: if check_not_nan(sample['Unnamed: 5']):
not_a_nan = not np.isnan(sample['Unnamed: 5'])
except TypeError:
not_a_nan = True
if not_a_nan:
new.collection_date = sample['Unnamed: 5'] new.collection_date = sample['Unnamed: 5']
else: else:
new.collection_date = date.today() new.collection_date = date.today()
@@ -317,7 +314,9 @@ class PCRParser(object):
return return
# self.pcr = OrderedDict() # self.pcr = OrderedDict()
self.pcr = {} self.pcr = {}
self.plate_num, self.submission_type = retrieve_rsl_number(filepath.__str__()) namer = RSLNamer(filepath.__str__())
self.plate_num = namer.parsed_name
self.submission_type = namer.submission_type
logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}")
self.samples = [] self.samples = []
parser = getattr(self, f"parse_{self.submission_type}") parser = getattr(self, f"parse_{self.submission_type}")
@@ -362,14 +361,25 @@ class PCRParser(object):
Parse specific to wastewater samples. Parse specific to wastewater samples.
""" """
df = self.parse_general(sheet_name="Results") df = self.parse_general(sheet_name="Results")
column_names = ["Well", "Well Position", "Omit","Sample","Target","Task"," Reporter","Quencher","Amp Status","Amp Score","Curve Quality","Result Quality Issues","Cq","Cq Confidence","Cq Mean","Cq SD","Auto Threshold","Threshold", "Auto Baseline", "Baseline Start", "Baseline End"]
self.samples_df = df.iloc[23:][0:] self.samples_df = df.iloc[23:][0:]
self.samples_df.columns = column_names
logger.debug(f"Samples columns: {self.samples_df.columns}")
well_call_df = self.xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:,-1:]
try:
self.samples_df['Assessment'] = well_call_df.values
except ValueError:
logger.error("Well call number doesn't match sample number")
logger.debug(f"Well call dr: {well_call_df}")
# iloc is [row][column] # iloc is [row][column]
for ii, row in self.samples_df.iterrows(): for ii, row in self.samples_df.iterrows():
try: try:
sample_obj = [sample for sample in self.samples if sample['sample'] == row[3]][0] sample_obj = [sample for sample in self.samples if sample['sample'] == row[3]][0]
except IndexError: except IndexError:
sample_obj = dict( sample_obj = dict(
sample = row[3], sample = row['Sample'],
plate_rsl = self.plate_num,
well_num = row['Well Position']
) )
logger.debug(f"Got sample obj: {sample_obj}") logger.debug(f"Got sample obj: {sample_obj}")
# logger.debug(f"row: {row}") # logger.debug(f"row: {row}")
@@ -377,22 +387,30 @@ class PCRParser(object):
# # logger.debug(f"Looking up: {rsl_num}") # # logger.debug(f"Looking up: {rsl_num}")
# ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=self.ctx, rsl_number=rsl_num) # ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=self.ctx, rsl_number=rsl_num)
# logger.debug(f"Got: {ww_samp}") # logger.debug(f"Got: {ww_samp}")
match row[4]: if isinstance(row['Cq'], float):
case "N1": sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq']
if isinstance(row[12], float):
sample_obj['ct_n1'] = row[12]
else: else:
sample_obj['ct_n1'] = 0.0 sample_obj[f"ct_{row['Target'].lower()}"] = 0.0
case "N2": try:
if isinstance(row[12], float): sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment']
sample_obj['ct_n2'] = row[12] except KeyError:
else: logger.error(f"No assessment for {sample_obj['sample']}")
sample_obj['ct_n2'] = 0.0 # match row["Target"]:
case _: # case "N1":
logger.warning(f"Unexpected input for row[4]: {row[4]}") # if isinstance(row['Cq'], float):
# sample_obj['ct_n1'] = row["Cq"]
# else:
# sample_obj['ct_n1'] = 0.0
# sample_obj['n1_status'] = row['Assessment']
# case "N2":
# if isinstance(row['Cq'], float):
# sample_obj['ct_n2'] = row['Assessment']
# else:
# sample_obj['ct_n2'] = 0.0
# case _:
# logger.warning(f"Unexpected input for row[4]: {row["Target"]}")
self.samples.append(sample_obj) self.samples.append(sample_obj)

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,4 @@
''' from pathlib import Path
contains operations used by multiple widgets.
'''
from backend.db.models import *
import logging import logging
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QMainWindow, QLabel, QToolBar, QMainWindow, QLabel, QToolBar,
@@ -11,9 +8,18 @@ from PyQt6.QtWidgets import (
QSpinBox, QScrollArea QSpinBox, QScrollArea
) )
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
def select_open_file(obj:QMainWindow, extension:str) -> Path:
home_dir = str(Path(obj.ctx["directory_path"]))
fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{extension}(*.{extension})")[0])
return fname
def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path:
home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__()
fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0])
return fname
def extract_form_info(object) -> dict: def extract_form_info(object) -> dict:
""" """
retrieves object names and values from form retrieves object names and values from form
@@ -24,6 +30,7 @@ def extract_form_info(object) -> dict:
Returns: Returns:
dict: dictionary of objectName:text items dict: dictionary of objectName:text items
""" """
from frontend.custom_widgets import ReagentTypeForm from frontend.custom_widgets import ReagentTypeForm
dicto = {} dicto = {}
reagents = {} reagents = {}

View File

@@ -14,7 +14,7 @@ from PyQt6.QtCore import Qt, QDate, QSize
# from submissions.backend.db.functions import lookup_kittype_by_use # from submissions.backend.db.functions import lookup_kittype_by_use
# from submissions.backend.db import lookup_regent_by_type_name_and_kit_name # from submissions.backend.db import lookup_regent_by_type_name_and_kit_name
from tools import check_not_nan from tools import check_not_nan
from ..functions import extract_form_info from ..all_window_functions import extract_form_info
from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_kittype_by_use#, lookup_regent_by_type_name_and_kit_name from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_kittype_by_use#, lookup_regent_by_type_name_and_kit_name
from backend.excel.parser import SheetParser from backend.excel.parser import SheetParser
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader

View File

@@ -0,0 +1,658 @@
'''
contains operations used by multiple widgets.
'''
from datetime import date
import difflib
from getpass import getuser
import inspect
from pathlib import Path
import pprint
import re
import yaml
import json
from typing import Tuple
from openpyxl.utils import get_column_letter
from xhtml2pdf import pisa
import pandas as pd
from backend.db.models import *
import logging
from PyQt6.QtWidgets import (
QMainWindow, QLabel, QWidget, QPushButton, QFileDialog,
QLineEdit, QMessageBox, QComboBox, QDateEdit
)
from .all_window_functions import extract_form_info, select_open_file, select_save_file
from PyQt6.QtCore import QSignalBlocker
from backend.db.functions import (
lookup_all_orgs, lookup_kittype_by_use, lookup_kittype_by_name,
construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range,
create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type,
lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_ww_sample
)
from backend.excel.parser import SheetParser, PCRParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
from tools import RSLNamer, check_not_nan, check_kit_integrity
from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker, ReagentTypeForm
from .custom_widgets.misc import ImportReagent
from .visualizations.control_charts import create_charts, construct_html
logger = logging.getLogger(f"submissions.{__name__}")
def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]:
result = None
# from .custom_widgets.misc import ImportReagent
# from .custom_widgets.pop_ups import AlertPop
logger.debug(obj.ctx)
# initialize samples
obj.samples = []
obj.reagents = {}
# set file dialog
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir)[0])
fname = select_open_file(obj, extension="xlsx")
logger.debug(f"Attempting to parse file: {fname}")
if not fname.exists():
result = dict(message=f"File {fname.__str__()} not found.", status="critical")
return obj, result
# create sheetparser using excel sheet and context from gui
try:
prsr = SheetParser(fname, **obj.ctx)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return
if prsr.sub['rsl_plate_num'] == None:
prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name
logger.debug(f"prsr.sub = {prsr.sub}")
# destroy any widgets from previous imports
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
# regex to parser out different variable types for decision making
variable_parser = re.compile(r"""
# (?x)
(?P<extraction_kit>^extraction_kit$) |
(?P<submitted_date>^submitted_date$) |
(?P<submitting_lab>)^submitting_lab$ |
(?P<samples>)^samples$ |
(?P<reagent>^lot_.*$) |
(?P<csv>^csv$)
""", re.VERBOSE)
for item in prsr.sub:
logger.debug(f"Item: {item}")
# attempt to match variable name to regex group
try:
mo = variable_parser.fullmatch(item).lastgroup
except AttributeError:
mo = "other"
logger.debug(f"Mo: {mo}")
match mo:
case 'submitting_lab':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
logger.debug(f"{item}: {prsr.sub[item]}")
# create combobox to hold looked up submitting labs
add_widget = QComboBox()
labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
# try to set closest match to top of list
try:
labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0)
except (TypeError, ValueError):
pass
# set combobox values to lookedup values
add_widget.addItems(labs)
case 'extraction_kit':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# if extraction kit not available, all other values fail
if not check_not_nan(prsr.sub[item]):
msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
msg.exec()
# create combobox to hold looked up kits
add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])]
add_widget.addItems(uses)
if check_not_nan(prsr.sub[item]):
obj.ext_kit = prsr.sub[item]
else:
obj.ext_kit = add_widget.currentText()
case 'submitted_date':
# create label
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
# uses base calendar
add_widget = QDateEdit(calendarPopup=True)
# sets submitted date based on date found in excel sheet
try:
add_widget.setDate(prsr.sub[item])
# if not found, use today
except:
add_widget.setDate(date.today())
case 'reagent':
# create label
reg_label = QLabel(item.replace("_", " ").title())
reg_label.setObjectName(f"lot_{item}_label")
obj.table_widget.formlayout.addWidget(reg_label)
# create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr)
obj.reagents[item] = prsr.sub[item]
case 'samples':
# hold samples in 'obj' until form submitted
logger.debug(f"{item}: {prsr.sub[item]}")
obj.samples = prsr.sub[item]
add_widget = None
case 'csv':
obj.csv = prsr.sub[item]
case _:
# anything else gets added in as a line edit
obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
add_widget = QLineEdit()
logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}")
add_widget.setText(str(prsr.sub[item]).replace("_", " "))
try:
add_widget.setObjectName(item)
logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(add_widget)
except AttributeError as e:
logger.error(e)
# compare obj.reagents with expected reagents in kit
if hasattr(obj, 'ext_kit'):
obj.kit_integrity_completion()
logger.debug(f"Imported reagents: {obj.reagents}")
return obj, result
def kit_reload_function(obj:QMainWindow) -> QMainWindow:
result = None
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
# item.setParent(None)
if isinstance(item, QLabel):
if item.text().startswith("Lot"):
item.setParent(None)
else:
logger.debug(f"Type of {item.objectName()} is {type(item)}")
if item.objectName().startswith("lot_"):
item.setParent(None)
obj.kit_integrity_completion_function()
return obj, result
def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
result = None
# from .custom_widgets.misc import ImportReagent
# from .custom_widgets.pop_ups import AlertPop
logger.debug(inspect.currentframe().f_back.f_code.co_name)
kit_widget = obj.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit')
logger.debug(f"Kit selector: {kit_widget}")
obj.ext_kit = kit_widget.currentText()
logger.debug(f"Checking integrity of {obj.ext_kit}")
kit = lookup_kittype_by_name(ctx=obj.ctx, name=obj.ext_kit)
reagents_to_lookup = [item.replace("lot_", "") for item in obj.reagents]
logger.debug(f"Reagents for lookup for {kit.name}: {reagents_to_lookup}")
kit_integrity = check_kit_integrity(kit, reagents_to_lookup)
if kit_integrity != None:
# msg = AlertPop(message=kit_integrity['message'], status="critical")
# msg.exec()
result = dict(message=kit_integrity['message'], status="Warning")
for item in kit_integrity['missing']:
obj.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}"))
add_widget = ImportReagent(ctx=obj.ctx, item=item)
obj.table_widget.formlayout.addWidget(add_widget)
submit_btn = QPushButton("Submit")
submit_btn.setObjectName("lot_submit_btn")
obj.table_widget.formlayout.addWidget(submit_btn)
submit_btn.clicked.connect(obj.submit_new_sample)
return obj, result
def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
result = None
# from .custom_widgets.misc import ImportReagent
# from .custom_widgets.pop_ups import AlertPop, QuestionAsker
info = extract_form_info(obj.table_widget.tab1)
reagents = {k:v for k,v in info.items() if k.startswith("lot_")}
info = {k:v for k,v in info.items() if not k.startswith("lot_")}
logger.debug(f"Info: {info}")
logger.debug(f"Reagents: {reagents}")
parsed_reagents = []
# compare reagents in form to reagent database
for reagent in reagents:
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent])
logger.debug(f"Looked up reagent: {wanted_reagent}")
# if reagent not found offer to add to database
if wanted_reagent == None:
r_lot = reagents[reagent]
dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?")
if dlg.exec():
logger.debug(f"checking reagent: {reagent} in obj.reagents. Result: {obj.reagents[reagent]}")
expiry_date = obj.reagents[reagent]['exp']
wanted_reagent = obj.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date)
else:
# In this case we will have an empty reagent and the submission will fail kit integrity check
logger.debug("Will not add reagent.")
if wanted_reagent != None:
parsed_reagents.append(wanted_reagent)
# move samples into preliminary submission dict
info['samples'] = obj.samples
info['uploaded_by'] = getuser()
# construct submission object
logger.debug(f"Here is the info_dict: {pprint.pformat(info)}")
base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info)
# check output message for issues
match result['code']:
# code 1: ask for overwrite
case 1:
dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message'])
if dlg.exec():
# Do not add duplicate reagents.
base_submission.reagents = []
else:
return obj, dict(message="Overwrite cancelled", status="Information")
# code 2: No RSL plate number given
case 2:
return obj, dict(message=result['message'], status='critical')
case _:
pass
# add reagents to submission object
for reagent in parsed_reagents:
base_submission.reagents.append(reagent)
logger.debug("Checking kit integrity...")
kit_integrity = check_kit_integrity(base_submission)
if kit_integrity != None:
return obj, dict(message=kit_integrity['message'], status="critical")
logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
result = store_submission(ctx=obj.ctx, base_submission=base_submission)
# check result of storing for issues
# update summary sheet
obj.table_widget.sub_wid.setData()
# reset form
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
if hasattr(obj, 'csv'):
dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?")
if dlg.exec():
# home_dir = Path(obj.ctx["directory_path"]).joinpath(f"{base_submission.rsl_plate_num}.csv").resolve().__str__()
# fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter=".csv")[0])
fname = select_save_file(obj, f"{base_submission.rsl_plate_num}.csv", extension="csv")
try:
obj.csv.to_csv(fname.__str__(), index=False)
except PermissionError:
logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.")
return obj, result
def generate_report_function(obj:QMainWindow) -> QMainWindow:
# from .custom_widgets import ReportDatePicker
result = None
dlg = ReportDatePicker()
if dlg.exec():
info = extract_form_info(dlg)
logger.debug(f"Report info: {info}")
# find submissions based on date range
subs = lookup_submissions_by_date_range(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date'])
# convert each object to dict
records = [item.report_dict() for item in subs]
# make dataframe from record dictionaries
df = make_report_xlsx(records=records)
html = make_report_html(df=df, start_date=info['start_date'], end_date=info['end_date'])
# setup filedialog to handle save location of report
home_dir = Path(obj.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__()
fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter=".pdf")[0])
# logger.debug(f"report output name: {fname}")
with open(fname, "w+b") as f:
pisa.CreatePDF(html, dest=f)
writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl')
df.to_excel(writer, sheet_name="Report")
worksheet = writer.sheets['Report']
for idx, col in enumerate(df): # loop through all columns
series = df[col]
max_len = max((
series.astype(str).map(len).max(), # len of largest item
len(str(series.name)) # len of column name/header
)) + 20 # adding a little extra space
try:
worksheet.column_dimensions[get_column_letter(idx)].width = max_len
except ValueError:
pass
for cell in worksheet['D']:
if cell.row > 1:
cell.style = 'Currency'
writer.close()
return obj, result
def add_kit_function(obj:QMainWindow) -> QMainWindow:
result = None
# setup file dialog to find yaml flie
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "yml(*.yml)")[0])
fname = select_open_file(obj, extension="yml")
assert fname.exists()
# read yaml file
try:
with open(fname.__str__(), "r") as stream:
try:
exp = yaml.load(stream, Loader=yaml.Loader)
except yaml.YAMLError as exc:
logger.error(f'Error reading yaml file {fname}: {exc}')
return {}
except PermissionError:
return
# send to kit creator function
result = create_kit_from_yaml(ctx=obj.ctx, exp=exp)
# match result['code']:
# case 0:
# msg = AlertPop(message=result['message'], status='info')
# case 1:
# msg = AlertPop(message=result['message'], status='critical')
# msg.exec()
return obj, result
def add_org_function(obj:QMainWindow) -> QMainWindow:
result = None
# setup file dialog to find yaml flie
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "yml(*.yml)")[0])
fname = select_open_file(obj, extension="yml")
assert fname.exists()
# read yaml file
try:
with open(fname.__str__(), "r") as stream:
try:
org = yaml.load(stream, Loader=yaml.Loader)
except yaml.YAMLError as exc:
logger.error(f'Error reading yaml file {fname}: {exc}')
return obj, dict(message=f"There was a problem reading yaml file {fname.__str__()}", status="critical")
except PermissionError:
return obj, result
# send to kit creator function
result = create_org_from_yaml(ctx=obj.ctx, org=org)
# match result['code']:
# case 0:
# msg = AlertPop(message=result['message'], status='information')
# case 1:
# msg = AlertPop(message=result['message'], status='critical')
# msg.exec()
return obj, result
def controls_getter_function(obj:QMainWindow) -> QMainWindow:
result = None
# subtype defaults to disabled
try:
obj.table_widget.sub_typer.disconnect()
except TypeError:
pass
# correct start date being more recent than end date and rerun
if obj.table_widget.datepicker.start_date.date() > obj.table_widget.datepicker.end_date.date():
logger.warning("Start date after end date is not allowed!")
threemonthsago = obj.table_widget.datepicker.end_date.date().addDays(-60)
# block signal that will rerun controls getter and set start date
with QSignalBlocker(obj.table_widget.datepicker.start_date) as blocker:
obj.table_widget.datepicker.start_date.setDate(threemonthsago)
obj._controls_getter()
return obj, result
# convert to python useable date object
obj.start_date = obj.table_widget.datepicker.start_date.date().toPyDate()
obj.end_date = obj.table_widget.datepicker.end_date.date().toPyDate()
obj.con_type = obj.table_widget.control_typer.currentText()
obj.mode = obj.table_widget.mode_typer.currentText()
obj.table_widget.sub_typer.clear()
# lookup subtypes
sub_types = get_control_subtypes(ctx=obj.ctx, type=obj.con_type, mode=obj.mode)
if sub_types != []:
# block signal that will rerun controls getter and update sub_typer
with QSignalBlocker(obj.table_widget.sub_typer) as blocker:
obj.table_widget.sub_typer.addItems(sub_types)
obj.table_widget.sub_typer.setEnabled(True)
obj.table_widget.sub_typer.currentTextChanged.connect(obj._chart_maker)
else:
obj.table_widget.sub_typer.clear()
obj.table_widget.sub_typer.setEnabled(False)
obj._chart_maker()
return obj, result
def chart_maker_function(obj:QMainWindow) -> QMainWindow:
result = None
logger.debug(f"Control getter context: \n\tControl type: {obj.con_type}\n\tMode: {obj.mode}\n\tStart Date: {obj.start_date}\n\tEnd Date: {obj.end_date}")
if obj.table_widget.sub_typer.currentText() == "":
obj.subtype = None
else:
obj.subtype = obj.table_widget.sub_typer.currentText()
logger.debug(f"Subtype: {obj.subtype}")
# query all controls using the type/start and end dates from the gui
controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
# if no data found from query set fig to none for reporting in webview
if controls == None:
fig = None
else:
# change each control to list of dicts
data = [control.convert_by_mode(mode=obj.mode) for control in controls]
# flatten data to one dimensional list
data = [item for sublist in data for item in sublist]
logger.debug(f"Control objects going into df conversion: {data}")
# send to dataframe creator
df = convert_data_list_to_df(ctx=obj.ctx, input=data, subtype=obj.subtype)
if obj.subtype == None:
title = obj.mode
else:
title = f"{obj.mode} - {obj.subtype}"
# send dataframe to chart maker
fig = create_charts(ctx=obj.ctx, df=df, ytitle=title)
logger.debug(f"Updating figure...")
# construct html for webview
html = construct_html(figure=fig)
logger.debug(f"The length of html code is: {len(html)}")
obj.table_widget.webengineview.setHtml(html)
obj.table_widget.webengineview.update()
logger.debug("Figure updated... I hope.")
return obj, result
def link_controls_function(obj:QMainWindow) -> QMainWindow:
result = None
all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture")
logger.debug(all_bcs)
all_controls = get_all_controls(obj.ctx)
ac_list = [control.name for control in all_controls]
count = 0
for bcs in all_bcs:
logger.debug(f"Running for {bcs.rsl_plate_num}")
logger.debug(f"Here is the current control: {[control.name for control in bcs.controls]}")
samples = [sample.sample_id for sample in bcs.samples]
logger.debug(bcs.controls)
for sample in samples:
# replace below is a stopgap method because some dingus decided to add spaces in some of the ATCC49... so it looks like "ATCC 49"...
if " " in sample:
logger.warning(f"There is not supposed to be a space in the sample name!!!")
sample = sample.replace(" ", "")
# if sample not in ac_list:
if not any([ac.startswith(sample) for ac in ac_list]):
continue
else:
for control in all_controls:
diff = difflib.SequenceMatcher(a=sample, b=control.name).ratio()
if control.name.startswith(sample):
logger.debug(f"Checking {sample} against {control.name}... {diff}")
logger.debug(f"Found match:\n\tSample: {sample}\n\tControl: {control.name}\n\tDifference: {diff}")
if control in bcs.controls:
logger.debug(f"{control.name} already in {bcs.rsl_plate_num}, skipping")
continue
else:
logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control")
bcs.controls.append(control)
# bcs.control_id.append(control.id)
control.submission = bcs
control.submission_id = bcs.id
obj.ctx["database_session"].add(control)
count += 1
obj.ctx["database_session"].add(bcs)
logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}")
result = dict(message=f"We added {count} controls to bacterial cultures.", status="information")
logger.debug(result)
obj.ctx['database_session'].commit()
# msg = QMessageBox()
# msg.setText("Controls added")
# msg.setInformativeText(result)
# msg.setWindowTitle("Controls added")
# msg.exec()
return obj, result
def link_extractions_function(obj:QMainWindow) -> QMainWindow:
result = None
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0])
fname = select_open_file(obj, extension="csv")
with open(fname.__str__(), 'r') as f:
runs = [col.strip().split(",") for col in f.readlines()]
count = 0
for run in runs:
new_run = dict(
start_time=run[0].strip(),
rsl_plate_num=run[1].strip(),
sample_count=run[2].strip(),
status=run[3].strip(),
experiment_name=run[4].strip(),
end_time=run[5].strip()
)
for ii in range(6, len(run)):
new_run[f"column{str(ii-5)}_vol"] = run[ii]
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
count += 1
except AttributeError:
continue
if sub.extraction_info != None:
existing = json.loads(sub.extraction_info)
else:
existing = None
try:
if json.dumps(new_run) in sub.extraction_info:
logger.debug(f"Looks like we already have that info.")
continue
except TypeError:
pass
if existing != None:
try:
logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}")
existing.append(new_run)
logger.debug(f"Setting: {existing}")
sub.extraction_info = json.dumps(existing)
except TypeError:
logger.error(f"Error updating!")
sub.extraction_info = json.dumps([new_run])
logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}")
else:
sub.extraction_info = json.dumps([new_run])
obj.ctx['database_session'].add(sub)
obj.ctx["database_session"].commit()
result = dict(message=f"We added {count} logs to the database.", status='information')
return obj, result
def link_pcr_function(obj:QMainWindow) -> QMainWindow:
result = None
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0])
fname = select_open_file(obj, extension="csv")
with open(fname.__str__(), 'r') as f:
runs = [col.strip().split(",") for col in f.readlines()]
count = 0
for run in runs:
new_run = dict(
start_time=run[0].strip(),
rsl_plate_num=run[1].strip(),
biomek_status=run[2].strip(),
quant_status=run[3].strip(),
experiment_name=run[4].strip(),
end_time=run[5].strip()
)
# for ii in range(6, len(run)):
# obj[f"column{str(ii-5)}_vol"] = run[ii]
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
continue
if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
existing = json.loads(sub.pcr_info)
else:
existing = None
try:
if json.dumps(new_run) in sub.pcr_info:
logger.debug(f"Looks like we already have that info.")
continue
else:
count += 1
except TypeError:
logger.error(f"No json to dump")
if existing != None:
try:
logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}")
existing.append(new_run)
logger.debug(f"Setting: {existing}")
sub.pcr_info = json.dumps(existing)
except TypeError:
logger.error(f"Error updating!")
sub.pcr_info = json.dumps([new_run])
logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}")
else:
sub.pcr_info = json.dumps([new_run])
obj.ctx['database_session'].add(sub)
obj.ctx["database_session"].commit()
result = dict(message=f"We added {count} logs to the database.", status='information')
return obj, result
def import_pcr_results_function(obj:QMainWindow) -> QMainWindow:
result = None
# home_dir = str(Path(obj.ctx["directory_path"]))
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0])
fname = select_open_file(obj, extension="xlsx")
parser = PCRParser(ctx=obj.ctx, filepath=fname)
logger.debug(f"Attempting lookup for {parser.plate_num}")
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.")
parser.plate_num = "-".join(parser.plate_num.split("-")[:-1])
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
logger.error(f"Rescue of {parser.plate_num} failed.")
return obj, dict(message="Couldn't find a submission with that RSL number.", status="warning")
# jout = json.dumps(parser.pcr)
count = 0
if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
existing = json.loads(sub.pcr_info)
else:
# jout = None
existing = None
if existing != None:
try:
logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}")
if json.dumps(parser.pcr) not in sub.pcr_info:
existing.append(parser.pcr)
logger.debug(f"Setting: {existing}")
sub.pcr_info = json.dumps(existing)
except TypeError:
logger.error(f"Error updating!")
sub.pcr_info = json.dumps([parser.pcr])
logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}")
else:
sub.pcr_info = json.dumps([parser.pcr])
obj.ctx['database_session'].add(sub)
logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}")
logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}")
obj.ctx["database_session"].commit()
logger.debug(f"Got {len(parser.samples)} to update!")
for sample in parser.samples:
logger.debug(f"Running update on: {sample['sample']}")
sample['plate_rsl'] = sub.rsl_plate_num
update_ww_sample(ctx=obj.ctx, sample_obj=sample)
result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information')
return obj, result
# dlg.exec()

View File

@@ -1,6 +1,7 @@
''' '''
Functions for constructing controls graphs using plotly. Functions for constructing controls graphs using plotly.
''' '''
import plotly
import plotly.express as px import plotly.express as px
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
@@ -276,3 +277,13 @@ def divide_chunks(input_list:list, chunk_count:int):
""" """
k, m = divmod(len(input_list), chunk_count) k, m = divmod(len(input_list), chunk_count)
return (input_list[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(chunk_count)) return (input_list[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(chunk_count))
def construct_html(figure:Figure) -> str:
html = '<html><body>'
if figure != None:
html += plotly.offline.plot(figure, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image')
else:
html += "<h1>No data was retrieved for the given parameters.</h1>"
html += '</body></html>'
return html

View File

@@ -8,26 +8,26 @@
<h2><u>Submission Details for {{ sub['Plate Number'] }}</u></h2> <h2><u>Submission Details for {{ sub['Plate Number'] }}</u></h2>
<p>{% for key, value in sub.items() if key not in excluded %} <p>{% for key, value in sub.items() if key not in excluded %}
{% if loop.index == 1 %} {% if loop.index == 1 %}
&nbsp;&nbsp;&nbsp;{% if key=='Cost' %}{{ key }}: {{ "${:,.2f}".format(value) }}{% else %}{{ key }}: {{ value }}{% endif %}<br> &nbsp;&nbsp;&nbsp;<b>{{ key }}:</b> {% if key=='Cost' %}{{ "${:,.2f}".format(value) }}{% else %}{{ value }}{% endif %}<br>
{% else %} {% else %}
&nbsp;&nbsp;&nbsp;&nbsp;{% if key=='Cost' %}{{ key }}: {{ "${:,.2f}".format(value) }}{% else %}{{ key }}: {{ value }}{% endif %}<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key }}: </b>{% if key=='Cost' %} {{ "${:,.2f}".format(value) }}{% else %}{{ value }}{% endif %}<br>
{% endif %} {% endif %}
{% endfor %}</p> {% endfor %}</p>
<h3><u>Reagents:</u></h3> <h3><u>Reagents:</u></h3>
<p>{% for item in sub['reagents'] %} <p>{% for item in sub['reagents'] %}
{% if loop.index == 1%} {% if loop.index == 1%}
&nbsp;&nbsp;&nbsp;{{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})<br> &nbsp;&nbsp;&nbsp;<b>{{ item['type'] }}:</b> {{ item['lot'] }} (EXP: {{ item['expiry'] }})<br>
{% else %} {% else %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ item['type'] }}: {{ item['lot'] }} (EXP: {{ item['expiry'] }})<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ item['type'] }}</b>: {{ item['lot'] }} (EXP: {{ item['expiry'] }})<br>
{% endif %} {% endif %}
{% endfor %}</p> {% endfor %}</p>
{% if sub['samples'] %} {% if sub['samples'] %}
<h3><u>Samples:</u></h3> <h3><u>Samples:</u></h3>
<p>{% for item in sub['samples'] %} <p>{% for item in sub['samples'] %}
{% if loop.index == 1 %} {% if loop.index == 1 %}
&nbsp;&nbsp;&nbsp;{{ item['well'] }}: {{ item['name'] }}<br> &nbsp;&nbsp;&nbsp;<b>{{ item['well'] }}:</b> {{ item['name']|replace('\n\t', '<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;') }}<br>
{% else %} {% else %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ item['well'] }}: {{ item['name'] }}<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ item['well'] }}:</b> {{ item['name']|replace('\n\t', '<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;') }}<br>
{% endif %} {% endif %}
{% endfor %}</p> {% endfor %}</p>
{% endif %} {% endif %}
@@ -52,12 +52,12 @@
<h3><u>Extraction Status:</u></h3> <h3><u>Extraction Status:</u></h3>
<p>{% for key, value in entry.items() %} <p>{% for key, value in entry.items() %}
{% if loop.index == 1%} {% if loop.index == 1%}
&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}<br> &nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}<br>
{% else %} {% else %}
{% if "column" in key %} {% if "column" in key %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}uL<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}uL<br>
{% else %} {% else %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}<br>
{% endif %} {% endif %}
{% endif %} {% endif %}
{% endfor %}</p> {% endfor %}</p>
@@ -72,12 +72,12 @@
{% endif %} {% endif %}
<p>{% for key, value in entry.items() if key != 'imported_by'%} <p>{% for key, value in entry.items() if key != 'imported_by'%}
{% if loop.index == 1%} {% if loop.index == 1%}
&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}<br> &nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}<br>
{% else %} {% else %}
{% if "column" in key %} {% if "column" in key %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}uL<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}uL<br>
{% else %} {% else %}
&nbsp;&nbsp;&nbsp;&nbsp;{{ key|replace('_', ' ')|title() }}: {{ value }}<br> &nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key|replace('_', ' ')|title() }}:</b> {{ value }}<br>
{% endif %} {% endif %}
{% endif %} {% endif %}
{% endfor %}</p> {% endfor %}</p>

View File

@@ -121,11 +121,115 @@ def check_if_app(ctx:dict=None) -> bool:
def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: def retrieve_rsl_number(in_str:str) -> Tuple[str, str]:
"""
Uses regex to retrieve the plate number and submission type from an input string
Args:
in_str (str): string to be parsed
Returns:
Tuple[str, str]: tuple of (output rsl number, submission_type)
"""
in_str = in_str.split("\\")[-1] in_str = in_str.split("\\")[-1]
logger.debug(f"Attempting match of {in_str}") logger.debug(f"Attempting match of {in_str}")
regex = re.compile(r""" regex = re.compile(r"""
(?P<wastewater>RSL-WW-20\d{6})|(?P<bacterial_culture>RSL-\d{2}-\d{4}) (?P<wastewater>RSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?P<bacterial_culture>RSL-\d{2}-\d{4})
""", re.VERBOSE) """, re.VERBOSE)
m = regex.search(in_str) m = regex.search(in_str)
return (m.group(), m.lastgroup) parsed = m.group().replace("_", "-")
return (parsed, m.lastgroup)
def format_rsl_number(instr:str) -> str:
"""
Enforces proper formatting on a plate number
Depreciated, replaced by RSLNamer class
Args:
instr (str): input plate number
Returns:
str: _description_
"""
output = instr.upper()
output = output.replace("_", "-")
return output
def check_regex_match(pattern:str, check:str) -> bool:
try:
return bool(re.match(fr"{pattern}", check))
except TypeError:
return False
class RSLNamer(object):
"""
Object that will enforce proper formatting on RSL plate names.
"""
def __init__(self, instr:str):
# self.parsed_name, self.submission_type = self.retrieve_rsl_number(instr)
self.retrieve_rsl_number(in_str=instr)
if self.submission_type != None:
parser = getattr(self, f"enforce_{self.submission_type}")
parser()
self.parsed_name = self.parsed_name.replace("_", "-")
def retrieve_rsl_number(self, in_str:str) -> Tuple[str, str]:
"""
Uses regex to retrieve the plate number and submission type from an input string
Args:
in_str (str): string to be parsed
Returns:
Tuple[str, str]: tuple of (output rsl number, submission_type)
"""
logger.debug(f"Attempting split of {in_str}")
try:
in_str = in_str.split("\\")[-1]
except AttributeError:
self.parsed_name = None
self.submission_type = None
return
logger.debug(f"Attempting match of {in_str}")
regex = re.compile(r"""
(?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d(?!\d))?)|
(?P<bacterial_culture>RSL-?\d{2}-?\d{4})
""", flags = re.IGNORECASE | re.VERBOSE)
m = regex.search(in_str)
try:
self.parsed_name = m.group().upper()
self.submission_type = m.lastgroup
except AttributeError as e:
logger.critical("No RSL plate number found or submission type found!")
logger.debug(f"The cause of the above error was: {e}")
def enforce_wastewater(self):
"""
Uses regex to enforce proper formatting of wastewater samples
"""
# self.parsed_name = re.sub(r"(\d)-(\d)", "\1\2", self.parsed_name)
# year = str(date.today().year)[:2]
self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name)
self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW")
# .replace(f"WW{year}", f"WW-{year}")
self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE)
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name)
def enforce_bacterial_culture(self):
"""
Uses regex to enforce proper formatting of bacterial culture samples
"""
# year = str(date.today().year)[2:]
# self.parsed_name = self.parsed_name.replace(f"RSL{year}", f"RSL-{year}")
# reg_year = re.compile(fr"{year}(?P<rsl>\d\d\d\d)")
self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE)
self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE)
# year = regex.group('year')
# rsl = regex.group('rsl')
# self.parsed_name = re.sub(fr"{year}(\d\d\d\d)", fr"{year}-\1", self.parsed_name)
# plate_search = reg_year.search(self.parsed_name)
# if plate_search != None:
# self.parsed_name = re.sub(reg_year, f"{year}-{plate_search.group('rsl')}", self.parsed_name)