diff --git a/src/submissions/CHANGELOG.md b/src/submissions/CHANGELOG.md
new file mode 100644
index 0000000..6827ffd
--- /dev/null
+++ b/src/submissions/CHANGELOG.md
@@ -0,0 +1,6 @@
+**202303.03**
+
+- Increased robustness by utilizing PyQT6 widget names to pull data from forms instead of previously used label/input zip.
+- Above allowed for creation of more helpful prompts.
+- Added sorting feature to Submission summary.
+- Reagent import dropdowns will now prioritize lot number found in a parsed sheet, moving it to the top of the list.
\ No newline at end of file
diff --git a/src/submissions/README.md b/src/submissions/README.md
index e69de29..a317b26 100644
--- a/src/submissions/README.md
+++ b/src/submissions/README.md
@@ -0,0 +1 @@
+**This is the readme file**
\ No newline at end of file
diff --git a/src/submissions/__init__.py b/src/submissions/__init__.py
index 6dceed3..7ca50e6 100644
--- a/src/submissions/__init__.py
+++ b/src/submissions/__init__.py
@@ -1,6 +1,6 @@
# __init__.py
# Version of the realpython-reader package
-__version__ = "202303.2b"
+__version__ = "202303.3b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada"
diff --git a/src/submissions/__main__.py b/src/submissions/__main__.py
index 84b5c37..8c612e2 100644
--- a/src/submissions/__main__.py
+++ b/src/submissions/__main__.py
@@ -17,7 +17,7 @@ import __init__ as package
# create database session for use with gui session
ctx["database_session"] = create_database_session(Path(ctx['database']))
-# set package information fro __init__
+# set package information from __init__
ctx['package'] = package
if __name__ == '__main__':
@@ -25,85 +25,3 @@ if __name__ == '__main__':
app = QApplication(['', '--no-sandbox'])
ex = App(ctx=ctx)
sys.exit(app.exec())
-
-
-
-# from pathlib import Path
-
-# from tkinter import *
-# from tkinter import filedialog as fd
-# from tkinter import ttk
-# from tkinterhtml import HtmlFrame
-
-# from xl_parser import SheetParser
-
-# class Window(Frame):
-# def __init__(self, master=None):
-# Frame.__init__(self, master)
-# self.master = master
-# # Frame.pack_propagate(False)
-# menu = Menu(self.master)
-# self.master.config(menu=menu)
-
-# fileMenu = Menu(menu)
-# fileMenu.add_command(label="Import", command=self.import_callback)
-# fileMenu.add_command(label="Exit", command=self.exitProgram)
-# menu.add_cascade(label="File", menu=fileMenu)
-
-# editMenu = Menu(menu)
-# editMenu.add_command(label="Undo")
-# editMenu.add_command(label="Redo")
-# menu.add_cascade(label="Edit", menu=editMenu)
-
-# tab_parent = ttk.Notebook(self.master)
-# self.add_sample_tab = ttk.Frame(tab_parent)
-# self.control_view_tab = HtmlFrame(tab_parent)
-# tab_parent.add(self.add_sample_tab, text="Add Sample")
-# tab_parent.add(self.control_view_tab, text="Controls View")
-# tab_parent.pack()
-# with open("L:\Robotics Laboratory Support\Quality\Robotics Support Laboratory Extraction Controls\MCS-SSTI.html", "r") as f:
-# data = f.read()
-# # frame =
-# # frame.set_content(data)
-# # self.control_view_tab.set_content("""
-# #
-# #
-# # Hello world!
-# # First para
-# #
-# # - first list item
-# # - second list item
-# #
-# #
-# #
-# #
-# # """)
-
-
-
-# def exitProgram(self):
-# exit()
-
-# def import_callback(self):
-# name= fd.askopenfilename()
-# prsr = SheetParser(Path(name), **ctx)
-# for item in prsr.sub:
-# lbl=Label(self.add_sample_tab, text=item, fg='red', font=("Helvetica", 16))
-# lbl.pack()
-# txtfld=Entry(self.add_sample_tab, text="Data not set", bd=2)
-# txtfld.pack()
-# txtfld.delete(0,END)
-# txtfld.insert(0,prsr.sub[item])
-
-
-# root = Tk()
-# app = Window(root)
-# # for item in test_data:
-# # lbl=Label(root, text=item, fg='red', font=("Helvetica", 16))
-# # lbl.pack()
-# # txtfld=Entry(root, text="", bd=2)
-# # txtfld.pack()
-# # txtfld.delete(0,END)
-# # txtfld.insert(0,test_data[item])
-# root.wm_title("Tkinter window")
-# root.mainloop()
\ No newline at end of file
diff --git a/src/submissions/backend/__init__.py b/src/submissions/backend/__init__.py
index e69de29..fb2d012 100644
--- a/src/submissions/backend/__init__.py
+++ b/src/submissions/backend/__init__.py
@@ -0,0 +1,3 @@
+'''
+Contains database and excel operations.
+'''
\ No newline at end of file
diff --git a/src/submissions/backend/db/__init__.py b/src/submissions/backend/db/__init__.py
index bf3b3b5..11f467d 100644
--- a/src/submissions/backend/db/__init__.py
+++ b/src/submissions/backend/db/__init__.py
@@ -1,664 +1,670 @@
-from . import models
-from .models.kits import reagenttypes_kittypes
-from .models.submissions import reagents_submissions
-import pandas as pd
-import sqlalchemy.exc
-import sqlite3
-import logging
-from datetime import date, datetime, timedelta
-from sqlalchemy import and_
-import uuid
-# import base64
-from sqlalchemy import JSON, event
-from sqlalchemy.engine import Engine
-import json
-# from dateutil.relativedelta import relativedelta
-from getpass import getuser
-import numpy as np
-from tools import check_not_nan, check_is_power_user
-import yaml
-from pathlib import Path
-
-logger = logging.getLogger(f"submissions.{__name__}")
-
-# The below _should_ allow automatic creation of foreign keys in the database
-@event.listens_for(Engine, "connect")
-def set_sqlite_pragma(dbapi_connection, connection_record):
- cursor = dbapi_connection.cursor()
- cursor.execute("PRAGMA foreign_keys=ON")
- cursor.close()
-
-def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict:
- """
- Upserts submissions into database
-
- Args:
- ctx (dict): settings passed down from gui
- base_submission (models.BasicSubmission): submission to be add to db
-
- Returns:
- None|dict : object that indicates issue raised for reporting in gui
- """
- logger.debug(f"Hello from store_submission")
- # Add all samples to sample table
- for sample in base_submission.samples:
- sample.rsl_plate = base_submission
- logger.debug(f"Attempting to add sample: {sample.to_string()}")
- try:
- ctx['database_session'].add(sample)
- except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
- logger.debug(f"Hit an integrity error : {e}")
- continue
- # Add submission to submission table
- ctx['database_session'].add(base_submission)
- logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}")
- try:
- ctx['database_session'].commit()
- except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
- logger.debug(f"Hit an integrity error : {e}")
- ctx['database_session'].rollback()
- return {"message":"This plate number already exists, so we can't add it."}
- except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e:
- logger.debug(f"Hit an operational error: {e}")
- ctx['database_session'].rollback()
- return {"message":"The database is locked for editing."}
- return None
+'''
+All database related operations.
+'''
+from .functions import *
-def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
- """
- Inserts a reagent into the database.
+# from . import models
+# from .models.kits import reagenttypes_kittypes
+# from .models.submissions import reagents_submissions
+# import pandas as pd
+# import sqlalchemy.exc
+# import sqlite3
+# import logging
+# from datetime import date, datetime, timedelta
+# from sqlalchemy import and_
+# import uuid
+# # import base64
+# from sqlalchemy import JSON, event
+# from sqlalchemy.engine import Engine
+# import json
+# # from dateutil.relativedelta import relativedelta
+# from getpass import getuser
+# import numpy as np
+# from tools import check_not_nan, check_is_power_user
+# import yaml
+# from pathlib import Path
- Args:
- ctx (dict): settings passed down from gui
- reagent (models.Reagent): Reagent object to be added to db
+# logger = logging.getLogger(f"submissions.{__name__}")
- Returns:
- None|dict: object indicating issue to be reported in the gui
- """
- logger.debug(f"Reagent dictionary: {reagent.__dict__}")
- ctx['database_session'].add(reagent)
- try:
- ctx['database_session'].commit()
- except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError):
- return {"message":"The database is locked for editing."}
- return None
+# # The below _should_ allow automatic creation of foreign keys in the database
+# @event.listens_for(Engine, "connect")
+# def set_sqlite_pragma(dbapi_connection, connection_record):
+# cursor = dbapi_connection.cursor()
+# cursor.execute("PRAGMA foreign_keys=ON")
+# cursor.close()
+
+# def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict:
+# """
+# Upserts submissions into database
+
+# Args:
+# ctx (dict): settings passed down from gui
+# base_submission (models.BasicSubmission): submission to be add to db
+
+# Returns:
+# None|dict : object that indicates issue raised for reporting in gui
+# """
+# logger.debug(f"Hello from store_submission")
+# # Add all samples to sample table
+# for sample in base_submission.samples:
+# sample.rsl_plate = base_submission
+# logger.debug(f"Attempting to add sample: {sample.to_string()}")
+# try:
+# ctx['database_session'].add(sample)
+# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+# logger.debug(f"Hit an integrity error : {e}")
+# continue
+# # Add submission to submission table
+# ctx['database_session'].add(base_submission)
+# logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}")
+# try:
+# ctx['database_session'].commit()
+# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+# logger.debug(f"Hit an integrity error : {e}")
+# ctx['database_session'].rollback()
+# return {"message":"This plate number already exists, so we can't add it."}
+# except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e:
+# logger.debug(f"Hit an operational error: {e}")
+# ctx['database_session'].rollback()
+# return {"message":"The database is locked for editing."}
+# return None
-def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
- """
- Construct submission object from dictionary
+# def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
+# """
+# Inserts a reagent into the database.
- Args:
- ctx (dict): settings passed down from gui
- info_dict (dict): dictionary to be transformed
+# Args:
+# ctx (dict): settings passed down from gui
+# reagent (models.Reagent): Reagent object to be added to db
- Returns:
- models.BasicSubmission: Constructed submission object
- """
- # convert submission type into model name
- query = info_dict['submission_type'].replace(" ", "")
- # Ensure an rsl plate number exists for the plate
- if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
- instance = None
- msg = "A proper RSL plate number is required."
- return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
- # check database for existing object
- instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
- # get model based on submission type converted above
- logger.debug(f"Looking at models for submission type: {query}")
- model = getattr(models, query)
- logger.debug(f"We've got the model: {type(model)}")
- info_dict['submission_type'] = info_dict['submission_type'].replace(" ", "_").lower()
- # if query return nothing, ie doesn't already exist in db
- if instance == None:
- instance = model()
- logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}")
- msg = None
- code = 0
- else:
- code = 1
- msg = "This submission already exists.\nWould you like to overwrite?"
- for item in info_dict:
- logger.debug(f"Setting {item} to {info_dict[item]}")
- # set fields based on keys in dictionary
- match item:
- case "extraction_kit":
- q_str = info_dict[item]
- logger.debug(f"Looking up kit {q_str}")
- try:
- field_value = lookup_kittype_by_name(ctx=ctx, name=q_str)
- except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
- logger.error(f"Hit an integrity error: {e}")
- logger.debug(f"Got {field_value} for kit {q_str}")
- case "submitting_lab":
- q_str = info_dict[item].replace(" ", "_").lower()
- logger.debug(f"Looking up organization: {q_str}")
- field_value = lookup_org_by_name(ctx=ctx, name=q_str)
- logger.debug(f"Got {field_value} for organization {q_str}")
- case "submitter_plate_num":
- # Because of unique constraint, there will be problems with
- # multiple submissions named 'None', so...
- logger.debug(f"Submitter plate id: {info_dict[item]}")
- if info_dict[item] == None or info_dict[item] == "None":
- logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
- info_dict[item] = uuid.uuid4().hex.upper()
- field_value = info_dict[item]
- case _:
- field_value = info_dict[item]
- # insert into field
- try:
- setattr(instance, item, field_value)
- except AttributeError:
- logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
- continue
- # calculate cost of the run: immutable cost + mutable times number of columns
- # This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
- try:
- instance.run_cost = instance.extraction_kit.immutable_cost + (instance.extraction_kit.mutable_cost * ((instance.sample_count / 8)/12))
- except (TypeError, AttributeError):
- logger.debug(f"Looks like that kit doesn't have cost breakdown yet, using full plate cost.")
- instance.run_cost = instance.extraction_kit.cost_per_run
- # We need to make sure there's a proper rsl plate number
- try:
- logger.debug(f"Constructed instance: {instance.to_string()}")
- except AttributeError as e:
- logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
- logger.debug(f"Constructed submissions message: {msg}")
- return instance, {'code':code, 'message':msg}
+# Returns:
+# None|dict: object indicating issue to be reported in the gui
+# """
+# logger.debug(f"Reagent dictionary: {reagent.__dict__}")
+# ctx['database_session'].add(reagent)
+# try:
+# ctx['database_session'].commit()
+# except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError):
+# return {"message":"The database is locked for editing."}
+# return None
+
+
+# def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
+# """
+# Construct submission object from dictionary
+
+# Args:
+# ctx (dict): settings passed down from gui
+# info_dict (dict): dictionary to be transformed
+
+# Returns:
+# models.BasicSubmission: Constructed submission object
+# """
+# # convert submission type into model name
+# query = info_dict['submission_type'].replace(" ", "")
+# # Ensure an rsl plate number exists for the plate
+# if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
+# instance = None
+# msg = "A proper RSL plate number is required."
+# return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
+# # check database for existing object
+# instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
+# # get model based on submission type converted above
+# logger.debug(f"Looking at models for submission type: {query}")
+# model = getattr(models, query)
+# logger.debug(f"We've got the model: {type(model)}")
+# info_dict['submission_type'] = info_dict['submission_type'].replace(" ", "_").lower()
+# # if query return nothing, ie doesn't already exist in db
+# if instance == None:
+# instance = model()
+# logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}")
+# msg = None
+# code = 0
+# else:
+# code = 1
+# msg = "This submission already exists.\nWould you like to overwrite?"
+# for item in info_dict:
+# logger.debug(f"Setting {item} to {info_dict[item]}")
+# # set fields based on keys in dictionary
+# match item:
+# case "extraction_kit":
+# q_str = info_dict[item]
+# logger.debug(f"Looking up kit {q_str}")
+# try:
+# field_value = lookup_kittype_by_name(ctx=ctx, name=q_str)
+# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+# logger.error(f"Hit an integrity error: {e}")
+# logger.debug(f"Got {field_value} for kit {q_str}")
+# case "submitting_lab":
+# q_str = info_dict[item].replace(" ", "_").lower()
+# logger.debug(f"Looking up organization: {q_str}")
+# field_value = lookup_org_by_name(ctx=ctx, name=q_str)
+# logger.debug(f"Got {field_value} for organization {q_str}")
+# case "submitter_plate_num":
+# # Because of unique constraint, there will be problems with
+# # multiple submissions named 'None', so...
+# logger.debug(f"Submitter plate id: {info_dict[item]}")
+# if info_dict[item] == None or info_dict[item] == "None":
+# logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
+# info_dict[item] = uuid.uuid4().hex.upper()
+# field_value = info_dict[item]
+# case _:
+# field_value = info_dict[item]
+# # insert into field
+# try:
+# setattr(instance, item, field_value)
+# except AttributeError:
+# logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
+# continue
+# # calculate cost of the run: immutable cost + mutable times number of columns
+# # This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
+# try:
+# instance.run_cost = instance.extraction_kit.immutable_cost + (instance.extraction_kit.mutable_cost * ((instance.sample_count / 8)/12))
+# except (TypeError, AttributeError):
+# logger.debug(f"Looks like that kit doesn't have cost breakdown yet, using full plate cost.")
+# instance.run_cost = instance.extraction_kit.cost_per_run
+# # We need to make sure there's a proper rsl plate number
+# try:
+# logger.debug(f"Constructed instance: {instance.to_string()}")
+# except AttributeError as e:
+# logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
+# logger.debug(f"Constructed submissions message: {msg}")
+# return instance, {'code':code, 'message':msg}
-def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
- """
- Construct reagent object from dictionary
+# def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
+# """
+# Construct reagent object from dictionary
- Args:
- ctx (dict): settings passed down from gui
- info_dict (dict): dictionary to be converted
+# Args:
+# ctx (dict): settings passed down from gui
+# info_dict (dict): dictionary to be converted
- Returns:
- models.Reagent: Constructed reagent object
- """
- reagent = models.Reagent()
- for item in info_dict:
- logger.debug(f"Reagent info item: {item}")
- # set fields based on keys in dictionary
- match item:
- case "lot":
- reagent.lot = info_dict[item].upper()
- case "expiry":
- reagent.expiry = info_dict[item]
- case "type":
- reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower())
- # add end-of-life extension from reagent type to expiry date
- # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
- # try:
- # reagent.expiry = reagent.expiry + reagent.type.eol_ext
- # except TypeError as e:
- # logger.debug(f"We got a type error: {e}.")
- # except AttributeError:
- # pass
- return reagent
+# Returns:
+# models.Reagent: Constructed reagent object
+# """
+# reagent = models.Reagent()
+# for item in info_dict:
+# logger.debug(f"Reagent info item: {item}")
+# # set fields based on keys in dictionary
+# match item:
+# case "lot":
+# reagent.lot = info_dict[item].upper()
+# case "expiry":
+# reagent.expiry = info_dict[item]
+# case "type":
+# reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower())
+# # add end-of-life extension from reagent type to expiry date
+# # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
+# # try:
+# # reagent.expiry = reagent.expiry + reagent.type.eol_ext
+# # except TypeError as e:
+# # logger.debug(f"We got a type error: {e}.")
+# # except AttributeError:
+# # pass
+# return reagent
-def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
- """
- Query db for reagent based on lot number
+# def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
+# """
+# Query db for reagent based on lot number
- Args:
- ctx (dict): settings passed down from gui
- reagent_lot (str): lot number to query
+# Args:
+# ctx (dict): settings passed down from gui
+# reagent_lot (str): lot number to query
- Returns:
- models.Reagent: looked up reagent
- """
- lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
- return lookedup
+# Returns:
+# models.Reagent: looked up reagent
+# """
+# lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
+# return lookedup
-def get_all_reagenttype_names(ctx:dict) -> list[str]:
- """
- Lookup all reagent types and get names
+# def get_all_reagenttype_names(ctx:dict) -> list[str]:
+# """
+# Lookup all reagent types and get names
- Args:
- ctx (dict): settings passed from gui
+# Args:
+# ctx (dict): settings passed from gui
- Returns:
- list[str]: reagent type names
- """
- lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
- return lookedup
+# Returns:
+# list[str]: reagent type names
+# """
+# lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
+# return lookedup
-def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
- """
- Lookup a single reagent type by name
+# def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
+# """
+# Lookup a single reagent type by name
- Args:
- ctx (dict): settings passed from gui
- rt_name (str): reagent type name to look up
+# Args:
+# ctx (dict): settings passed from gui
+# rt_name (str): reagent type name to look up
- Returns:
- models.ReagentType: looked up reagent type
- """
- logger.debug(f"Looking up ReagentType by name: {rt_name}")
- lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first()
- logger.debug(f"Found ReagentType: {lookedup}")
- return lookedup
+# Returns:
+# models.ReagentType: looked up reagent type
+# """
+# logger.debug(f"Looking up ReagentType by name: {rt_name}")
+# lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first()
+# logger.debug(f"Found ReagentType: {lookedup}")
+# return lookedup
-def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
- """
- Lookup kits by a sample type its used for
+# def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
+# """
+# Lookup kits by a sample type its used for
- Args:
- ctx (dict): settings passed from gui
- used_by (str): sample type (should be string in D3 of excel sheet)
+# Args:
+# ctx (dict): settings passed from gui
+# used_by (str): sample type (should be string in D3 of excel sheet)
- Returns:
- list[models.KitType]: list of kittypes that have that sample type in their uses
- """
- return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
+# Returns:
+# list[models.KitType]: list of kittypes that have that sample type in their uses
+# """
+# return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
-def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
- """
- Lookup a kit type by name
+# def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
+# """
+# Lookup a kit type by name
- Args:
- ctx (dict): settings passed from bui
- name (str): name of kit to query
+# Args:
+# ctx (dict): settings passed from bui
+# name (str): name of kit to query
- Returns:
- models.KitType: retrieved kittype
- """
- logger.debug(f"Querying kittype: {name}")
- return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
+# Returns:
+# models.KitType: retrieved kittype
+# """
+# logger.debug(f"Querying kittype: {name}")
+# return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
-def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
- """
- Lookup reagents by their type name
+# def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
+# """
+# Lookup reagents by their type name
- Args:
- ctx (dict): settings passed from gui
- type_name (str): reagent type name
+# Args:
+# ctx (dict): settings passed from gui
+# type_name (str): reagent type name
- Returns:
- list[models.Reagent]: list of retrieved reagents
- """
- return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
+# Returns:
+# list[models.Reagent]: list of retrieved reagents
+# """
+# return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
-def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]:
- """
- Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.)
+# def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]:
+# """
+# Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.)
- Args:
- ctx (dict): settings pass by gui
- type_name (str): reagent type name
- kit_name (str): kit name
+# Args:
+# ctx (dict): settings pass by gui
+# type_name (str): reagent type name
+# kit_name (str): kit name
- Returns:
- list[models.Reagent]: list of retrieved reagents
- """
- # What I want to do is get the reagent type by name
- # Hang on, this is going to be a long one.
- # by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all()
- rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name))
- # add filter for kit name...
- try:
- check = not np.isnan(kit_name)
- except TypeError:
- check = True
- if check:
- kit_type = lookup_kittype_by_name(ctx=ctx, name=kit_name)
- logger.debug(f"reagenttypes: {[item.name for item in rt_types.all()]}, kit: {kit_type.name}")
- # add in lookup for related kit_id
- rt_types = rt_types.join(reagenttypes_kittypes).filter(reagenttypes_kittypes.c.kits_id==kit_type.id).first()
- else:
- rt_types = rt_types.first()
- output = rt_types.instances
- return output
+# Returns:
+# list[models.Reagent]: list of retrieved reagents
+# """
+# # What I want to do is get the reagent type by name
+# # Hang on, this is going to be a long one.
+# # by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all()
+# rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name))
+# # add filter for kit name...
+# try:
+# check = not np.isnan(kit_name)
+# except TypeError:
+# check = True
+# if check:
+# kit_type = lookup_kittype_by_name(ctx=ctx, name=kit_name)
+# logger.debug(f"reagenttypes: {[item.name for item in rt_types.all()]}, kit: {kit_type.name}")
+# # add in lookup for related kit_id
+# rt_types = rt_types.join(reagenttypes_kittypes).filter(reagenttypes_kittypes.c.kits_id==kit_type.id).first()
+# else:
+# rt_types = rt_types.first()
+# output = rt_types.instances
+# return output
-def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]:
- """
- Get all submissions, filtering by type if given
+# def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]:
+# """
+# Get all submissions, filtering by type if given
- Args:
- ctx (dict): settings pass from gui
- type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None.
+# Args:
+# ctx (dict): settings pass from gui
+# type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None.
- Returns:
- list[models.BasicSubmission]: list of retrieved submissions
- """
- if sub_type == None:
- subs = ctx['database_session'].query(models.BasicSubmission).all()
- else:
- subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all()
- return subs
+# Returns:
+# list[models.BasicSubmission]: list of retrieved submissions
+# """
+# if sub_type == None:
+# subs = ctx['database_session'].query(models.BasicSubmission).all()
+# else:
+# subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all()
+# return subs
-def lookup_all_orgs(ctx:dict) -> list[models.Organization]:
- """
- Lookup all organizations (labs)
+# def lookup_all_orgs(ctx:dict) -> list[models.Organization]:
+# """
+# Lookup all organizations (labs)
- Args:
- ctx (dict): settings passed from gui
+# Args:
+# ctx (dict): settings passed from gui
- Returns:
- list[models.Organization]: list of retrieved organizations
- """
- return ctx['database_session'].query(models.Organization).all()
+# Returns:
+# list[models.Organization]: list of retrieved organizations
+# """
+# return ctx['database_session'].query(models.Organization).all()
-def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
- """
- Lookup organization (lab) by (startswith) name.
+# def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
+# """
+# Lookup organization (lab) by (startswith) name.
- Args:
- ctx (dict): settings passed from gui
- name (str | None): name of organization
+# Args:
+# ctx (dict): settings passed from gui
+# name (str | None): name of organization
- Returns:
- models.Organization: retrieved organization
- """
- logger.debug(f"Querying organization: {name}")
- return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first()
+# Returns:
+# models.Organization: retrieved organization
+# """
+# logger.debug(f"Querying organization: {name}")
+# return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first()
-def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
- """
- Convert submissions looked up by type to dataframe
+# def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
+# """
+# Convert submissions looked up by type to dataframe
- Args:
- ctx (dict): settings passed by gui
- type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None.
+# Args:
+# ctx (dict): settings passed by gui
+# type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None.
- Returns:
- pd.DataFrame: dataframe constructed from retrieved submissions
- """
- logger.debug(f"Type: {sub_type}")
- # use lookup function to create list of dicts
- subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, sub_type=sub_type)]
- # make df from dicts (records) in list
- df = pd.DataFrame.from_records(subs)
- # Exclude sub information
- try:
- df = df.drop("controls", axis=1)
- except:
- logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
- try:
- df = df.drop("ext_info", axis=1)
- except:
- logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
- return df
+# Returns:
+# pd.DataFrame: dataframe constructed from retrieved submissions
+# """
+# logger.debug(f"Type: {sub_type}")
+# # use lookup function to create list of dicts
+# subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, sub_type=sub_type)]
+# # make df from dicts (records) in list
+# df = pd.DataFrame.from_records(subs)
+# # Exclude sub information
+# try:
+# df = df.drop("controls", axis=1)
+# except:
+# logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
+# try:
+# df = df.drop("ext_info", axis=1)
+# except:
+# logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
+# return df
-def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
- """
- Lookup submission by id number
+# def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
+# """
+# Lookup submission by id number
- Args:
- ctx (dict): settings passed from gui
- id (int): submission id number
+# Args:
+# ctx (dict): settings passed from gui
+# id (int): submission id number
- Returns:
- models.BasicSubmission: retrieved submission
- """
- return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
+# Returns:
+# models.BasicSubmission: retrieved submission
+# """
+# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
-def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
- """
- Lookup submissions greater than start_date and less than end_date
+# def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
+# """
+# Lookup submissions greater than start_date and less than end_date
- Args:
- ctx (dict): settings passed from gui
- start_date (datetime.date): date to start looking
- end_date (datetime.date): date to end looking
+# Args:
+# ctx (dict): settings passed from gui
+# start_date (datetime.date): date to start looking
+# end_date (datetime.date): date to end looking
- Returns:
- list[models.BasicSubmission]: list of retrieved submissions
- """
- # return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
- start_date = start_date.strftime("%Y-%m-%d")
- end_date = end_date.strftime("%Y-%m-%d")
- return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all()
+# Returns:
+# list[models.BasicSubmission]: list of retrieved submissions
+# """
+# # return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
+# start_date = start_date.strftime("%Y-%m-%d")
+# end_date = end_date.strftime("%Y-%m-%d")
+# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all()
-def get_all_Control_Types_names(ctx:dict) -> list[str]:
- """
- Grabs all control type names from db.
+# def get_all_Control_Types_names(ctx:dict) -> list[str]:
+# """
+# Grabs all control type names from db.
- Args:
- settings (dict): settings passed down from gui.
+# Args:
+# settings (dict): settings passed down from gui.
- Returns:
- list: list of controltype names
- """
- conTypes = ctx['database_session'].query(models.ControlType).all()
- conTypes = [conType.name for conType in conTypes]
- logger.debug(f"Control Types: {conTypes}")
- return conTypes
+# Returns:
+# list: list of controltype names
+# """
+# conTypes = ctx['database_session'].query(models.ControlType).all()
+# conTypes = [conType.name for conType in conTypes]
+# logger.debug(f"Control Types: {conTypes}")
+# return conTypes
-def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
- """
- Create and store a new kit in the database based on a .yml file
- TODO: split into create and store functions
+# def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
+# """
+# Create and store a new kit in the database based on a .yml file
+# TODO: split into create and store functions
- Args:
- ctx (dict): Context dictionary passed down from frontend
- exp (dict): Experiment dictionary created from yaml file
+# Args:
+# ctx (dict): Context dictionary passed down from frontend
+# exp (dict): Experiment dictionary created from yaml file
- Returns:
- dict: a dictionary containing results of db addition
- """
- # Don't want just anyone adding kits
- if not check_is_power_user(ctx=ctx):
- logger.debug(f"{getuser()} does not have permission to add kits.")
- return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
- # iterate through keys in dict
- for type in exp:
- if type == "password":
- continue
- # A submission type may use multiple kits.
- for kt in exp[type]['kits']:
- kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], constant_cost=exp[type]["kits"][kt]["constant_cost"], mutable_cost=exp[type]["kits"][kt]["mutable_cost"])
- # A kit contains multiple reagent types.
- for r in exp[type]['kits'][kt]['reagenttypes']:
- # check if reagent type already exists.
- look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first()
- if look_up == None:
- rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit])
- else:
- rt = look_up
- rt.kits.append(kit)
- # add this because I think it's necessary to get proper back population
- kit.reagent_types_id.append(rt.id)
- ctx['database_session'].add(rt)
- logger.debug(f"Kit construction reagent type: {rt.__dict__}")
- logger.debug(f"Kit construction kit: {kit.__dict__}")
- ctx['database_session'].add(kit)
- ctx['database_session'].commit()
- return {'code':0, 'message':'Kit has been added', 'status': 'information'}
+# Returns:
+# dict: a dictionary containing results of db addition
+# """
+# # Don't want just anyone adding kits
+# if not check_is_power_user(ctx=ctx):
+# logger.debug(f"{getuser()} does not have permission to add kits.")
+# return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
+# # iterate through keys in dict
+# for type in exp:
+# if type == "password":
+# continue
+# # A submission type may use multiple kits.
+# for kt in exp[type]['kits']:
+# kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], constant_cost=exp[type]["kits"][kt]["constant_cost"], mutable_cost=exp[type]["kits"][kt]["mutable_cost"])
+# # A kit contains multiple reagent types.
+# for r in exp[type]['kits'][kt]['reagenttypes']:
+# # check if reagent type already exists.
+# look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first()
+# if look_up == None:
+# rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit])
+# else:
+# rt = look_up
+# rt.kits.append(kit)
+# # add this because I think it's necessary to get proper back population
+# kit.reagent_types_id.append(rt.id)
+# ctx['database_session'].add(rt)
+# logger.debug(f"Kit construction reagent type: {rt.__dict__}")
+# logger.debug(f"Kit construction kit: {kit.__dict__}")
+# ctx['database_session'].add(kit)
+# ctx['database_session'].commit()
+# return {'code':0, 'message':'Kit has been added', 'status': 'information'}
-def create_org_from_yaml(ctx:dict, org:dict) -> dict:
- """
- Create and store a new organization based on a .yml file
+# def create_org_from_yaml(ctx:dict, org:dict) -> dict:
+# """
+# Create and store a new organization based on a .yml file
- Args:
- ctx (dict): Context dictionary passed down from frontend
- org (dict): Dictionary containing organization info.
+# Args:
+# ctx (dict): Context dictionary passed down from frontend
+# org (dict): Dictionary containing organization info.
- Returns:
- dict: dictionary containing results of db addition
- """
- # Don't want just anyone adding in clients
- if not check_is_power_user(ctx=ctx):
- logger.debug(f"{getuser()} does not have permission to add kits.")
- return {'code':1, 'message':"This user does not have permission to add organizations."}
- # the yml can contain multiple clients
- for client in org:
- cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
- # a client can contain multiple contacts
- for contact in org[client]['contacts']:
- cont_name = list(contact.keys())[0]
- # check if contact already exists
- look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first()
- if look_up == None:
- cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
- else:
- cli_cont = look_up
- cli_cont.organization.append(cli_org)
- ctx['database_session'].add(cli_cont)
- logger.debug(f"Client creation contact: {cli_cont.__dict__}")
- logger.debug(f"Client creation client: {cli_org.__dict__}")
- ctx['database_session'].add(cli_org)
- ctx["database_session"].commit()
- return {"code":0, "message":"Organization has been added."}
+# Returns:
+# dict: dictionary containing results of db addition
+# """
+# # Don't want just anyone adding in clients
+# if not check_is_power_user(ctx=ctx):
+# logger.debug(f"{getuser()} does not have permission to add kits.")
+# return {'code':1, 'message':"This user does not have permission to add organizations."}
+# # the yml can contain multiple clients
+# for client in org:
+# cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
+# # a client can contain multiple contacts
+# for contact in org[client]['contacts']:
+# cont_name = list(contact.keys())[0]
+# # check if contact already exists
+# look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first()
+# if look_up == None:
+# cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
+# else:
+# cli_cont = look_up
+# cli_cont.organization.append(cli_org)
+# ctx['database_session'].add(cli_cont)
+# logger.debug(f"Client creation contact: {cli_cont.__dict__}")
+# logger.debug(f"Client creation client: {cli_org.__dict__}")
+# ctx['database_session'].add(cli_org)
+# ctx["database_session"].commit()
+# return {"code":0, "message":"Organization has been added."}
-def lookup_all_sample_types(ctx:dict) -> list[str]:
- """
- Lookup all sample types and get names
+# def lookup_all_sample_types(ctx:dict) -> list[str]:
+# """
+# Lookup all sample types and get names
- Args:
- ctx (dict): settings pass from gui
+# Args:
+# ctx (dict): settings pass from gui
- Returns:
- list[str]: list of sample type names
- """
- uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()]
- # flattened list of lists
- uses = list(set([item for sublist in uses for item in sublist]))
- return uses
+# Returns:
+# list[str]: list of sample type names
+# """
+# uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()]
+# # flattened list of lists
+# uses = list(set([item for sublist in uses for item in sublist]))
+# return uses
-def get_all_available_modes(ctx:dict) -> list[str]:
- """
- Get types of analysis for controls
+# def get_all_available_modes(ctx:dict) -> list[str]:
+# """
+# Get types of analysis for controls
- Args:
- ctx (dict): settings passed from gui
+# Args:
+# ctx (dict): settings passed from gui
- Returns:
- list[str]: list of analysis types
- """
- # Only one control is necessary since they all share the same control types.
- rel = ctx['database_session'].query(models.Control).first()
- try:
- cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
- except AttributeError as e:
- logger.debug(f"Failed to get available modes from db: {e}")
- cols = []
- return cols
+# Returns:
+# list[str]: list of analysis types
+# """
+# # Only one control is necessary since they all share the same control types.
+# rel = ctx['database_session'].query(models.Control).first()
+# try:
+# cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
+# except AttributeError as e:
+# logger.debug(f"Failed to get available modes from db: {e}")
+# cols = []
+# return cols
-def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]:
- """
- Returns a list of control objects that are instances of the input controltype.
- Between dates if supplied.
+# def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]:
+# """
+# Returns a list of control objects that are instances of the input controltype.
+# Between dates if supplied.
- Args:
- ctx (dict): Settings passed down from gui
- con_type (str): Name of control type.
- start_date (date | None, optional): Start date of query. Defaults to None.
- end_date (date | None, optional): End date of query. Defaults to None.
+# Args:
+# ctx (dict): Settings passed down from gui
+# con_type (str): Name of control type.
+# start_date (date | None, optional): Start date of query. Defaults to None.
+# end_date (date | None, optional): End date of query. Defaults to None.
- Returns:
- list[models.Control]: list of control samples.
- """
- logger.debug(f"Using dates: {start_date} to {end_date}")
- if start_date != None and end_date != None:
- start_date = start_date.strftime("%Y-%m-%d")
- end_date = end_date.strftime("%Y-%m-%d")
- output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all()
- else:
- output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all()
- logger.debug(f"Returned controls between dates: {output}")
- return output
+# Returns:
+# list[models.Control]: list of control samples.
+# """
+# logger.debug(f"Using dates: {start_date} to {end_date}")
+# if start_date != None and end_date != None:
+# start_date = start_date.strftime("%Y-%m-%d")
+# end_date = end_date.strftime("%Y-%m-%d")
+# output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all()
+# else:
+# output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all()
+# logger.debug(f"Returned controls between dates: {output}")
+# return output
-def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
- """
- Get subtypes for a control analysis mode
+# def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
+# """
+# Get subtypes for a control analysis mode
- Args:
- ctx (dict): settings passed from gui
- type (str): control type name
- mode (str): analysis mode name
+# Args:
+# ctx (dict): settings passed from gui
+# type (str): control type name
+# mode (str): analysis mode name
- Returns:
- list[str]: list of subtype names
- """
- # Only the first control of type is necessary since they all share subtypes
- try:
- outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0]
- except TypeError:
- return []
- # Get analysis mode data as dict
- jsoner = json.loads(getattr(outs, mode))
- logger.debug(f"JSON out: {jsoner}")
- try:
- genera = list(jsoner.keys())[0]
- except IndexError:
- return []
- subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
- return subtypes
+# Returns:
+# list[str]: list of subtype names
+# """
+# # Only the first control of type is necessary since they all share subtypes
+# try:
+# outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0]
+# except TypeError:
+# return []
+# # Get analysis mode data as dict
+# jsoner = json.loads(getattr(outs, mode))
+# logger.debug(f"JSON out: {jsoner}")
+# try:
+# genera = list(jsoner.keys())[0]
+# except IndexError:
+# return []
+# subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
+# return subtypes
-def get_all_controls(ctx:dict) -> list[models.Control]:
- """
- Retrieve a list of all controls from the database
+# def get_all_controls(ctx:dict) -> list[models.Control]:
+# """
+# Retrieve a list of all controls from the database
- Args:
- ctx (dict): settings passed down from the gui.
+# Args:
+# ctx (dict): settings passed down from the gui.
- Returns:
- list[models.Control]: list of all control objects
- """
- return ctx['database_session'].query(models.Control).all()
+# Returns:
+# list[models.Control]: list of all control objects
+# """
+# return ctx['database_session'].query(models.Control).all()
-def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission:
- """
- Retrieve a submission from the database based on rsl plate number
+# def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission:
+# """
+# Retrieve a submission from the database based on rsl plate number
- Args:
- ctx (dict): settings passed down from gui
- rsl_num (str): rsl plate number
+# Args:
+# ctx (dict): settings passed down from gui
+# rsl_num (str): rsl plate number
- Returns:
- models.BasicSubmission: Submissions object retrieved from database
- """
- return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first()
+# Returns:
+# models.BasicSubmission: Submissions object retrieved from database
+# """
+# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first()
-def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]:
- return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all()
+# def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]:
+# return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all()
-def delete_submission_by_id(ctx:dict, id:int) -> None:
- """
- Deletes a submission and its associated samples from the database.
+# def delete_submission_by_id(ctx:dict, id:int) -> None:
+# """
+# Deletes a submission and its associated samples from the database.
- Args:
- ctx (dict): settings passed down from gui
- id (int): id of submission to be deleted.
- """
- # In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
- # Retrieve submission
- sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
- # Convert to dict for storing backup as a yml
- backup = sub.to_dict()
- try:
- with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
- yaml.dump(backup, f)
- except KeyError:
- pass
- sub.reagents = []
- for sample in sub.samples:
- ctx['database_session'].delete(sample)
- ctx["database_session"].delete(sub)
- ctx["database_session"].commit()
\ No newline at end of file
+# Args:
+# ctx (dict): settings passed down from gui
+# id (int): id of submission to be deleted.
+# """
+# # In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
+# # Retrieve submission
+# sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
+# # Convert to dict for storing backup as a yml
+# backup = sub.to_dict()
+# try:
+# with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
+# yaml.dump(backup, f)
+# except KeyError:
+# pass
+# sub.reagents = []
+# for sample in sub.samples:
+# ctx['database_session'].delete(sample)
+# ctx["database_session"].delete(sub)
+# ctx["database_session"].commit()
\ No newline at end of file
diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py
new file mode 100644
index 0000000..7d6b105
--- /dev/null
+++ b/src/submissions/backend/db/functions.py
@@ -0,0 +1,669 @@
+'''
+Convenience functions for interacting with the database.
+'''
+
+from . import models
+from .models.kits import reagenttypes_kittypes
+from .models.submissions import reagents_submissions
+import pandas as pd
+import sqlalchemy.exc
+import sqlite3
+import logging
+from datetime import date, datetime, timedelta
+from sqlalchemy import and_
+import uuid
+from sqlalchemy import JSON, event
+from sqlalchemy.engine import Engine
+import json
+from getpass import getuser
+import numpy as np
+
+import yaml
+from pathlib import Path
+
+logger = logging.getLogger(f"submissions.{__name__}")
+
+# The below _should_ allow automatic creation of foreign keys in the database
+@event.listens_for(Engine, "connect")
+def set_sqlite_pragma(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute("PRAGMA foreign_keys=ON")
+ cursor.close()
+
+def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict:
+ """
+ Upserts submissions into database
+
+ Args:
+ ctx (dict): settings passed down from gui
+ base_submission (models.BasicSubmission): submission to be add to db
+
+ Returns:
+ None|dict : object that indicates issue raised for reporting in gui
+ """
+ logger.debug(f"Hello from store_submission")
+ # Add all samples to sample table
+ for sample in base_submission.samples:
+ sample.rsl_plate = base_submission
+ logger.debug(f"Attempting to add sample: {sample.to_string()}")
+ try:
+ ctx['database_session'].add(sample)
+ except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+ logger.debug(f"Hit an integrity error : {e}")
+ continue
+ # Add submission to submission table
+ ctx['database_session'].add(base_submission)
+ logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}")
+ try:
+ ctx['database_session'].commit()
+ except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+ logger.debug(f"Hit an integrity error : {e}")
+ ctx['database_session'].rollback()
+ return {"message":"This plate number already exists, so we can't add it."}
+ except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e:
+ logger.debug(f"Hit an operational error: {e}")
+ ctx['database_session'].rollback()
+ return {"message":"The database is locked for editing."}
+ return None
+
+
+def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
+ """
+ Inserts a reagent into the database.
+
+ Args:
+ ctx (dict): settings passed down from gui
+ reagent (models.Reagent): Reagent object to be added to db
+
+ Returns:
+ None|dict: object indicating issue to be reported in the gui
+ """
+ logger.debug(f"Reagent dictionary: {reagent.__dict__}")
+ ctx['database_session'].add(reagent)
+ try:
+ ctx['database_session'].commit()
+ except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError):
+ return {"message":"The database is locked for editing."}
+ return None
+
+
+def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
+ """
+ Construct submission object from dictionary
+
+ Args:
+ ctx (dict): settings passed down from gui
+ info_dict (dict): dictionary to be transformed
+
+ Returns:
+ models.BasicSubmission: Constructed submission object
+ """
+ from tools import check_not_nan
+ # convert submission type into model name
+ query = info_dict['submission_type'].replace(" ", "")
+ # Ensure an rsl plate number exists for the plate
+ if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
+ instance = None
+ msg = "A proper RSL plate number is required."
+ return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
+ # check database for existing object
+ instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
+ # get model based on submission type converted above
+ logger.debug(f"Looking at models for submission type: {query}")
+ model = getattr(models, query)
+ logger.debug(f"We've got the model: {type(model)}")
+ info_dict['submission_type'] = info_dict['submission_type'].replace(" ", "_").lower()
+ # if query return nothing, ie doesn't already exist in db
+ if instance == None:
+ instance = model()
+ logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}")
+ msg = None
+ code = 0
+ else:
+ code = 1
+ msg = "This submission already exists.\nWould you like to overwrite?"
+ for item in info_dict:
+ logger.debug(f"Setting {item} to {info_dict[item]}")
+ # set fields based on keys in dictionary
+ match item:
+ case "extraction_kit":
+ q_str = info_dict[item]
+ logger.debug(f"Looking up kit {q_str}")
+ try:
+ field_value = lookup_kittype_by_name(ctx=ctx, name=q_str)
+ except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
+ logger.error(f"Hit an integrity error: {e}")
+ logger.debug(f"Got {field_value} for kit {q_str}")
+ case "submitting_lab":
+ q_str = info_dict[item].replace(" ", "_").lower()
+ logger.debug(f"Looking up organization: {q_str}")
+ field_value = lookup_org_by_name(ctx=ctx, name=q_str)
+ logger.debug(f"Got {field_value} for organization {q_str}")
+ case "submitter_plate_num":
+ # Because of unique constraint, there will be problems with
+ # multiple submissions named 'None', so...
+ logger.debug(f"Submitter plate id: {info_dict[item]}")
+ if info_dict[item] == None or info_dict[item] == "None":
+ logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
+ info_dict[item] = uuid.uuid4().hex.upper()
+ field_value = info_dict[item]
+ case _:
+ field_value = info_dict[item]
+ # insert into field
+ try:
+ setattr(instance, item, field_value)
+ except AttributeError:
+ logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
+ continue
+ # calculate cost of the run: immutable cost + mutable times number of columns
+ # This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
+ try:
+ instance.run_cost = instance.extraction_kit.immutable_cost + (instance.extraction_kit.mutable_cost * ((instance.sample_count / 8)/12))
+ except (TypeError, AttributeError):
+ logger.debug(f"Looks like that kit doesn't have cost breakdown yet, using full plate cost.")
+ instance.run_cost = instance.extraction_kit.cost_per_run
+ # We need to make sure there's a proper rsl plate number
+ try:
+ logger.debug(f"Constructed instance: {instance.to_string()}")
+ except AttributeError as e:
+ logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
+ logger.debug(f"Constructed submissions message: {msg}")
+ return instance, {'code':code, 'message':msg}
+
+
+def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
+ """
+ Construct reagent object from dictionary
+
+ Args:
+ ctx (dict): settings passed down from gui
+ info_dict (dict): dictionary to be converted
+
+ Returns:
+ models.Reagent: Constructed reagent object
+ """
+ reagent = models.Reagent()
+ for item in info_dict:
+ logger.debug(f"Reagent info item: {item}")
+ # set fields based on keys in dictionary
+ match item:
+ case "lot":
+ reagent.lot = info_dict[item].upper()
+ case "expiry":
+ reagent.expiry = info_dict[item]
+ case "type":
+ reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower())
+ # add end-of-life extension from reagent type to expiry date
+ # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
+ # try:
+ # reagent.expiry = reagent.expiry + reagent.type.eol_ext
+ # except TypeError as e:
+ # logger.debug(f"We got a type error: {e}.")
+ # except AttributeError:
+ # pass
+ return reagent
+
+
+def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
+ """
+ Query db for reagent based on lot number
+
+ Args:
+ ctx (dict): settings passed down from gui
+ reagent_lot (str): lot number to query
+
+ Returns:
+ models.Reagent: looked up reagent
+ """
+ lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
+ return lookedup
+
+
+def get_all_reagenttype_names(ctx:dict) -> list[str]:
+ """
+ Lookup all reagent types and get names
+
+ Args:
+ ctx (dict): settings passed from gui
+
+ Returns:
+ list[str]: reagent type names
+ """
+ lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
+ return lookedup
+
+
+def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
+ """
+ Lookup a single reagent type by name
+
+ Args:
+ ctx (dict): settings passed from gui
+ rt_name (str): reagent type name to look up
+
+ Returns:
+ models.ReagentType: looked up reagent type
+ """
+ logger.debug(f"Looking up ReagentType by name: {rt_name}")
+ lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first()
+ logger.debug(f"Found ReagentType: {lookedup}")
+ return lookedup
+
+
+def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
+ """
+ Lookup kits by a sample type its used for
+
+ Args:
+ ctx (dict): settings passed from gui
+ used_by (str): sample type (should be string in D3 of excel sheet)
+
+ Returns:
+ list[models.KitType]: list of kittypes that have that sample type in their uses
+ """
+ return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
+
+
+def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
+ """
+ Lookup a kit type by name
+
+ Args:
+ ctx (dict): settings passed from bui
+ name (str): name of kit to query
+
+ Returns:
+ models.KitType: retrieved kittype
+ """
+ logger.debug(f"Querying kittype: {name}")
+ return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
+
+
+def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
+ """
+ Lookup reagents by their type name
+
+ Args:
+ ctx (dict): settings passed from gui
+ type_name (str): reagent type name
+
+ Returns:
+ list[models.Reagent]: list of retrieved reagents
+ """
+ return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
+
+
+def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]:
+ """
+ Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.)
+
+ Args:
+ ctx (dict): settings pass by gui
+ type_name (str): reagent type name
+ kit_name (str): kit name
+
+ Returns:
+ list[models.Reagent]: list of retrieved reagents
+ """
+ # What I want to do is get the reagent type by name
+ # Hang on, this is going to be a long one.
+ # by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all()
+ rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name))
+ # add filter for kit name...
+ try:
+ check = not np.isnan(kit_name)
+ except TypeError:
+ check = True
+ if check:
+ kit_type = lookup_kittype_by_name(ctx=ctx, name=kit_name)
+ logger.debug(f"reagenttypes: {[item.name for item in rt_types.all()]}, kit: {kit_type.name}")
+ # add in lookup for related kit_id
+ rt_types = rt_types.join(reagenttypes_kittypes).filter(reagenttypes_kittypes.c.kits_id==kit_type.id).first()
+ else:
+ rt_types = rt_types.first()
+ output = rt_types.instances
+ return output
+
+
+def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]:
+ """
+ Get all submissions, filtering by type if given
+
+ Args:
+ ctx (dict): settings pass from gui
+ type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None.
+
+ Returns:
+ list[models.BasicSubmission]: list of retrieved submissions
+ """
+ if sub_type == None:
+ subs = ctx['database_session'].query(models.BasicSubmission).all()
+ else:
+ subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all()
+ return subs
+
+def lookup_all_orgs(ctx:dict) -> list[models.Organization]:
+ """
+ Lookup all organizations (labs)
+
+ Args:
+ ctx (dict): settings passed from gui
+
+ Returns:
+ list[models.Organization]: list of retrieved organizations
+ """
+ return ctx['database_session'].query(models.Organization).all()
+
+def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
+ """
+ Lookup organization (lab) by (startswith) name.
+
+ Args:
+ ctx (dict): settings passed from gui
+ name (str | None): name of organization
+
+ Returns:
+ models.Organization: retrieved organization
+ """
+ logger.debug(f"Querying organization: {name}")
+ return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first()
+
+def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
+ """
+ Convert submissions looked up by type to dataframe
+
+ Args:
+ ctx (dict): settings passed by gui
+ type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None.
+
+ Returns:
+ pd.DataFrame: dataframe constructed from retrieved submissions
+ """
+ logger.debug(f"Type: {sub_type}")
+ # use lookup function to create list of dicts
+ subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, sub_type=sub_type)]
+ # make df from dicts (records) in list
+ df = pd.DataFrame.from_records(subs)
+ # Exclude sub information
+ try:
+ df = df.drop("controls", axis=1)
+ except:
+ logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
+ try:
+ df = df.drop("ext_info", axis=1)
+ except:
+ logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
+ return df
+
+
+def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
+ """
+ Lookup submission by id number
+
+ Args:
+ ctx (dict): settings passed from gui
+ id (int): submission id number
+
+ Returns:
+ models.BasicSubmission: retrieved submission
+ """
+ return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
+
+
+def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
+ """
+ Lookup submissions greater than start_date and less than end_date
+
+ Args:
+ ctx (dict): settings passed from gui
+ start_date (datetime.date): date to start looking
+ end_date (datetime.date): date to end looking
+
+ Returns:
+ list[models.BasicSubmission]: list of retrieved submissions
+ """
+ # return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
+ start_date = start_date.strftime("%Y-%m-%d")
+ end_date = end_date.strftime("%Y-%m-%d")
+ return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all()
+
+
+def get_all_Control_Types_names(ctx:dict) -> list[str]:
+ """
+ Grabs all control type names from db.
+
+ Args:
+ settings (dict): settings passed down from gui.
+
+ Returns:
+ list: list of controltype names
+ """
+ conTypes = ctx['database_session'].query(models.ControlType).all()
+ conTypes = [conType.name for conType in conTypes]
+ logger.debug(f"Control Types: {conTypes}")
+ return conTypes
+
+
+def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
+ """
+ Create and store a new kit in the database based on a .yml file
+ TODO: split into create and store functions
+
+ Args:
+ ctx (dict): Context dictionary passed down from frontend
+ exp (dict): Experiment dictionary created from yaml file
+
+ Returns:
+ dict: a dictionary containing results of db addition
+ """
+ from tools import check_is_power_user
+ # Don't want just anyone adding kits
+ if not check_is_power_user(ctx=ctx):
+ logger.debug(f"{getuser()} does not have permission to add kits.")
+ return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
+ # iterate through keys in dict
+ for type in exp:
+ if type == "password":
+ continue
+ # A submission type may use multiple kits.
+ for kt in exp[type]['kits']:
+ kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], constant_cost=exp[type]["kits"][kt]["constant_cost"], mutable_cost=exp[type]["kits"][kt]["mutable_cost"])
+ # A kit contains multiple reagent types.
+ for r in exp[type]['kits'][kt]['reagenttypes']:
+ # check if reagent type already exists.
+ look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first()
+ if look_up == None:
+ rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit])
+ else:
+ rt = look_up
+ rt.kits.append(kit)
+ # add this because I think it's necessary to get proper back population
+ kit.reagent_types_id.append(rt.id)
+ ctx['database_session'].add(rt)
+ logger.debug(f"Kit construction reagent type: {rt.__dict__}")
+ logger.debug(f"Kit construction kit: {kit.__dict__}")
+ ctx['database_session'].add(kit)
+ ctx['database_session'].commit()
+ return {'code':0, 'message':'Kit has been added', 'status': 'information'}
+
+
+def create_org_from_yaml(ctx:dict, org:dict) -> dict:
+ """
+ Create and store a new organization based on a .yml file
+
+ Args:
+ ctx (dict): Context dictionary passed down from frontend
+ org (dict): Dictionary containing organization info.
+
+ Returns:
+ dict: dictionary containing results of db addition
+ """
+ from tools import check_is_power_user
+ # Don't want just anyone adding in clients
+ if not check_is_power_user(ctx=ctx):
+ logger.debug(f"{getuser()} does not have permission to add kits.")
+ return {'code':1, 'message':"This user does not have permission to add organizations."}
+ # the yml can contain multiple clients
+ for client in org:
+ cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
+ # a client can contain multiple contacts
+ for contact in org[client]['contacts']:
+ cont_name = list(contact.keys())[0]
+ # check if contact already exists
+ look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first()
+ if look_up == None:
+ cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
+ else:
+ cli_cont = look_up
+ cli_cont.organization.append(cli_org)
+ ctx['database_session'].add(cli_cont)
+ logger.debug(f"Client creation contact: {cli_cont.__dict__}")
+ logger.debug(f"Client creation client: {cli_org.__dict__}")
+ ctx['database_session'].add(cli_org)
+ ctx["database_session"].commit()
+ return {"code":0, "message":"Organization has been added."}
+
+
+def lookup_all_sample_types(ctx:dict) -> list[str]:
+ """
+ Lookup all sample types and get names
+
+ Args:
+ ctx (dict): settings pass from gui
+
+ Returns:
+ list[str]: list of sample type names
+ """
+ uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()]
+ # flattened list of lists
+ uses = list(set([item for sublist in uses for item in sublist]))
+ return uses
+
+
+def get_all_available_modes(ctx:dict) -> list[str]:
+ """
+ Get types of analysis for controls
+
+ Args:
+ ctx (dict): settings passed from gui
+
+ Returns:
+ list[str]: list of analysis types
+ """
+ # Only one control is necessary since they all share the same control types.
+ rel = ctx['database_session'].query(models.Control).first()
+ try:
+ cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
+ except AttributeError as e:
+ logger.debug(f"Failed to get available modes from db: {e}")
+ cols = []
+ return cols
+
+
+def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]:
+ """
+ Returns a list of control objects that are instances of the input controltype.
+ Between dates if supplied.
+
+ Args:
+ ctx (dict): Settings passed down from gui
+ con_type (str): Name of control type.
+ start_date (date | None, optional): Start date of query. Defaults to None.
+ end_date (date | None, optional): End date of query. Defaults to None.
+
+ Returns:
+ list[models.Control]: list of control samples.
+ """
+ logger.debug(f"Using dates: {start_date} to {end_date}")
+ if start_date != None and end_date != None:
+ start_date = start_date.strftime("%Y-%m-%d")
+ end_date = end_date.strftime("%Y-%m-%d")
+ output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all()
+ else:
+ output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all()
+ logger.debug(f"Returned controls between dates: {output}")
+ return output
+
+
+def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
+ """
+ Get subtypes for a control analysis mode
+
+ Args:
+ ctx (dict): settings passed from gui
+ type (str): control type name
+ mode (str): analysis mode name
+
+ Returns:
+ list[str]: list of subtype names
+ """
+ # Only the first control of type is necessary since they all share subtypes
+ try:
+ outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0]
+ except TypeError:
+ return []
+ # Get analysis mode data as dict
+ jsoner = json.loads(getattr(outs, mode))
+ logger.debug(f"JSON out: {jsoner}")
+ try:
+ genera = list(jsoner.keys())[0]
+ except IndexError:
+ return []
+ subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
+ return subtypes
+
+
+def get_all_controls(ctx:dict) -> list[models.Control]:
+ """
+ Retrieve a list of all controls from the database
+
+ Args:
+ ctx (dict): settings passed down from the gui.
+
+ Returns:
+ list[models.Control]: list of all control objects
+ """
+ return ctx['database_session'].query(models.Control).all()
+
+
+def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission:
+ """
+ Retrieve a submission from the database based on rsl plate number
+
+ Args:
+ ctx (dict): settings passed down from gui
+ rsl_num (str): rsl plate number
+
+ Returns:
+ models.BasicSubmission: Submissions object retrieved from database
+ """
+ return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first()
+
+
+def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]:
+ return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all()
+
+
+def delete_submission_by_id(ctx:dict, id:int) -> None:
+ """
+ Deletes a submission and its associated samples from the database.
+
+ Args:
+ ctx (dict): settings passed down from gui
+ id (int): id of submission to be deleted.
+ """
+ # In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
+ # Retrieve submission
+ sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
+ # Convert to dict for storing backup as a yml
+ backup = sub.to_dict()
+ try:
+ with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
+ yaml.dump(backup, f)
+ except KeyError:
+ pass
+ sub.reagents = []
+ for sample in sub.samples:
+ ctx['database_session'].delete(sample)
+ ctx["database_session"].delete(sub)
+ ctx["database_session"].commit()
\ No newline at end of file
diff --git a/src/submissions/backend/db/functions/__init__.py b/src/submissions/backend/db/functions/__init__.py
deleted file mode 100644
index ad23b3a..0000000
--- a/src/submissions/backend/db/functions/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# from ..models import *
-# import logging
-
-# logger = logging.getLogger(f"submissions.{__name__}")
-
-# def check_kit_integrity(sub:BasicSubmission):
-# ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types]
-# logger.debug(f"Kit reagents: {ext_kit_rtypes}")
-# reagenttypes = [reagent.type.name for reagent in sub.reagents]
-# logger.debug(f"Submission reagents: {reagenttypes}")
-# check = set(ext_kit_rtypes) == set(reagenttypes)
-# logger.debug(f"Checking if reagents match kit contents: {check}")
-# common = list(set(ext_kit_rtypes).intersection(reagenttypes))
-# logger.debug(f"common reagents types: {common}")
-# if check:
-# result = None
-# else:
-# result = {'message' : f"Couldn't verify reagents match listed kit components.\n\nIt looks like you are missing: {[x.upper for x in ext_kit_rtypes if x not in common]}\n\nAlternatively, you may have set the wrong extraction kit."}
-# return result
-
-
diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py
index f8c9d7f..c3ca906 100644
--- a/src/submissions/backend/db/models/__init__.py
+++ b/src/submissions/backend/db/models/__init__.py
@@ -1,5 +1,7 @@
+'''
+Contains all models for sqlalchemy
+'''
from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import relationship
Base = declarative_base()
metadata = Base.metadata
diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py
index 5da0191..3dfd7aa 100644
--- a/src/submissions/backend/db/models/controls.py
+++ b/src/submissions/backend/db/models/controls.py
@@ -1,3 +1,6 @@
+'''
+All control related models.
+'''
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey
from sqlalchemy.orm import relationship
diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py
index a9a0e20..bae23dc 100644
--- a/src/submissions/backend/db/models/kits.py
+++ b/src/submissions/backend/db/models/kits.py
@@ -1,4 +1,6 @@
-from copy import deepcopy
+'''
+All kit and reagent related models
+'''
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT
from sqlalchemy.orm import relationship
diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py
index 75acf27..e857e87 100644
--- a/src/submissions/backend/db/models/organizations.py
+++ b/src/submissions/backend/db/models/organizations.py
@@ -1,3 +1,6 @@
+'''
+All client organization related models.
+'''
from . import Base
from sqlalchemy import Column, String, INTEGER, ForeignKey, Table
from sqlalchemy.orm import relationship
diff --git a/src/submissions/backend/db/models/samples.py b/src/submissions/backend/db/models/samples.py
index b2a3a95..e00cd8e 100644
--- a/src/submissions/backend/db/models/samples.py
+++ b/src/submissions/backend/db/models/samples.py
@@ -1,3 +1,6 @@
+'''
+All models for individual samples.
+'''
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, FLOAT, BOOLEAN
from sqlalchemy.orm import relationship
diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py
index 45c4299..8a0f270 100644
--- a/src/submissions/backend/db/models/submissions.py
+++ b/src/submissions/backend/db/models/submissions.py
@@ -1,3 +1,6 @@
+'''
+Models for the main submission types.
+'''
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT
from sqlalchemy.orm import relationship
diff --git a/src/submissions/backend/excel/__init__.py b/src/submissions/backend/excel/__init__.py
index e385640..e69a1f4 100644
--- a/src/submissions/backend/excel/__init__.py
+++ b/src/submissions/backend/excel/__init__.py
@@ -1,43 +1,50 @@
-from pandas import DataFrame
-import re
+'''
+Contains pandas convenience functions for interacting with excel workbooks
+'''
+
+from .reports import *
+from .parser import *
+
+# from pandas import DataFrame
+# import re
-def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
- """
- get all unique values in a dataframe column by name
+# def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
+# """
+# get all unique values in a dataframe column by name
- Args:
- df (DataFrame): input dataframe
- column_name (str): name of column of interest
+# Args:
+# df (DataFrame): input dataframe
+# column_name (str): name of column of interest
- Returns:
- list: sorted list of unique values
- """
- return sorted(df[column_name].unique())
+# Returns:
+# list: sorted list of unique values
+# """
+# return sorted(df[column_name].unique())
-def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
- """
- Removes semi-duplicates from dataframe after finding sequencing repeats.
+# def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
+# """
+# Removes semi-duplicates from dataframe after finding sequencing repeats.
- Args:
- settings (dict): settings passed from gui
- df (DataFrame): initial dataframe
+# Args:
+# settings (dict): settings passed from gui
+# df (DataFrame): initial dataframe
- Returns:
- DataFrame: dataframe with originals removed in favour of repeats.
- """
- sample_names = get_unique_values_in_df_column(df, column_name="name")
- if 'rerun_regex' in ctx:
- # logger.debug(f"Compiling regex from: {settings['rerun_regex']}")
- rerun_regex = re.compile(fr"{ctx['rerun_regex']}")
- for sample in sample_names:
- # logger.debug(f'Running search on {sample}')
- if rerun_regex.search(sample):
- # logger.debug(f'Match on {sample}')
- first_run = re.sub(rerun_regex, "", sample)
- # logger.debug(f"First run: {first_run}")
- df = df.drop(df[df.name == first_run].index)
- return df
- else:
- return None
+# Returns:
+# DataFrame: dataframe with originals removed in favour of repeats.
+# """
+# sample_names = get_unique_values_in_df_column(df, column_name="name")
+# if 'rerun_regex' in ctx:
+# # logger.debug(f"Compiling regex from: {settings['rerun_regex']}")
+# rerun_regex = re.compile(fr"{ctx['rerun_regex']}")
+# for sample in sample_names:
+# # logger.debug(f'Running search on {sample}')
+# if rerun_regex.search(sample):
+# # logger.debug(f'Match on {sample}')
+# first_run = re.sub(rerun_regex, "", sample)
+# # logger.debug(f"First run: {first_run}")
+# df = df.drop(df[df.name == first_run].index)
+# return df
+# else:
+# return None
diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py
index 70373b1..9d40bff 100644
--- a/src/submissions/backend/excel/parser.py
+++ b/src/submissions/backend/excel/parser.py
@@ -1,3 +1,6 @@
+'''
+contains parser object for pulling values from client generated submission sheets.
+'''
import pandas as pd
from pathlib import Path
from backend.db.models import WWSample, BCSample
diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py
index 4fda303..11c169a 100644
--- a/src/submissions/backend/excel/reports.py
+++ b/src/submissions/backend/excel/reports.py
@@ -1,11 +1,13 @@
-
+'''
+Contains functions for generating summary reports
+'''
from pandas import DataFrame
-# from backend.db import models
import logging
from jinja2 import Environment, FileSystemLoader
from datetime import date, timedelta
import sys
from pathlib import Path
+import re
logger = logging.getLogger(f"submissions.{__name__}")
@@ -93,30 +95,22 @@ def convert_data_list_to_df(ctx:dict, input:list[dict], subtype:str|None=None) -
Returns:
DataFrame: _description_
"""
- # copy = input
- # for item in copy:
- # item['submitted_date'] = item['submitted_date'].strftime("%Y-%m-%d")
- # with open("controls.json", "w") as f:
- # f.write(json.dumps(copy))
- # for item in input:
- # logger.debug(item.keys())
+
df = DataFrame.from_records(input)
df.to_excel("test.xlsx", engine="openpyxl")
safe = ['name', 'submitted_date', 'genus', 'target']
- # logger.debug(df)
for column in df.columns:
if "percent" in column:
count_col = [item for item in df.columns if "count" in item][0]
# The actual percentage from kraken was off due to exclusion of NaN, recalculating.
- # df[column] = 100 * df[count_col] / df.groupby('submitted_date')[count_col].transform('sum')
df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
if column not in safe:
if subtype != None and column != subtype:
del df[column]
- # logger.debug(df)
- # df.sort_values('submitted_date').to_excel("controls.xlsx", engine="openpyxl")
+ # move date of sample submitted on same date as previous ahead one.
df = displace_date(df)
df.sort_values('submitted_date').to_excel("controls.xlsx", engine="openpyxl")
+ # ad hoc method to make data labels more accurate.
df = df_column_renamer(df=df)
return df
@@ -150,24 +144,59 @@ def displace_date(df:DataFrame) -> DataFrame:
Returns:
DataFrame: output dataframe with dates incremented.
"""
- # dict_list = []
- # for item in df['name'].unique():
- # dict_list.append(dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']))
logger.debug(f"Unique items: {df['name'].unique()}")
- # logger.debug(df.to_string())
- # the assumption is that closest names will have closest dates...
+ # get submitted dates for each control
dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in sorted(df['name'].unique())]
for ii, item in enumerate(dict_list):
- # if ii > 0:
try:
check = item['date'] == dict_list[ii-1]['date']
except IndexError:
check = False
if check:
- logger.debug(f"We found one! Increment date!\n{item['date'] - timedelta(days=1)}")
+ logger.debug(f"We found one! Increment date!\n\t{item['date'] - timedelta(days=1)}")
+ # get df locations where name == item name
mask = df['name'] == item['name']
- # logger.debug(f"We will increment dates in: {df.loc[mask, 'submitted_date']}")
+ # increment date in dataframe
df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
- # logger.debug(f"Do these look incremented: {df.loc[mask, 'submitted_date']}")
return df
+
+def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
+ """
+ get all unique values in a dataframe column by name
+
+ Args:
+ df (DataFrame): input dataframe
+ column_name (str): name of column of interest
+
+ Returns:
+ list: sorted list of unique values
+ """
+ return sorted(df[column_name].unique())
+
+
+def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
+ """
+ Removes semi-duplicates from dataframe after finding sequencing repeats.
+
+ Args:
+ settings (dict): settings passed from gui
+ df (DataFrame): initial dataframe
+
+ Returns:
+ DataFrame: dataframe with originals removed in favour of repeats.
+ """
+ sample_names = get_unique_values_in_df_column(df, column_name="name")
+ if 'rerun_regex' in ctx:
+ # logger.debug(f"Compiling regex from: {settings['rerun_regex']}")
+ rerun_regex = re.compile(fr"{ctx['rerun_regex']}")
+ for sample in sample_names:
+ # logger.debug(f'Running search on {sample}')
+ if rerun_regex.search(sample):
+ # logger.debug(f'Match on {sample}')
+ first_run = re.sub(rerun_regex, "", sample)
+ # logger.debug(f"First run: {first_run}")
+ df = df.drop(df[df.name == first_run].index)
+ return df
+ else:
+ return None
diff --git a/src/submissions/configure/__init__.py b/src/submissions/configure/__init__.py
index 9892c06..35d460e 100644
--- a/src/submissions/configure/__init__.py
+++ b/src/submissions/configure/__init__.py
@@ -1,3 +1,6 @@
+'''
+Contains functions for setting up program from config.yml and database.
+'''
import yaml
import sys, os, stat, platform, getpass
import logging
@@ -59,16 +62,18 @@ class StreamToLogger(object):
self.logger.log(self.log_level, line.rstrip())
-def get_config(settings_path: str|None=None) -> dict:
+def get_config(settings_path: Path|str|None=None) -> dict:
"""
Get configuration settings from path or default if blank.
Args:
- settings_path (str, optional): _description_. Defaults to "".
+ settings_path (Path | str | None, optional): Path to config.yml Defaults to None.
Returns:
- setting: dictionary of settings.
- """
+ dict: Dictionary of settings.
+ """
+ if isinstance(settings_path, str):
+ settings_path = Path(settings_path)
# custom pyyaml constructor to join fields
def join(loader, node):
seq = loader.construct_sequence(node)
@@ -105,10 +110,10 @@ def get_config(settings_path: str|None=None) -> dict:
copy_settings_trigger = True
else:
# check if user defined path is directory
- if Path(settings_path).is_dir():
+ if settings_path.is_dir():
settings_path = settings_path.joinpath("config.yml")
# check if user defined path is file
- elif Path(settings_path).is_file():
+ elif settings_path.is_file():
settings_path = settings_path
else:
logger.error("No config.yml file found. Using empty dictionary.")
@@ -128,7 +133,7 @@ def get_config(settings_path: str|None=None) -> dict:
def create_database_session(database_path: Path|str|None=None) -> Session:
"""
- Get database settings from path or default database if database_path is blank.
+ Creates a session to sqlite3 database from path or default database if database_path is blank.
Args:
database_path (Path | str | None, optional): path to sqlite database. Defaults to None.
diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py
index d1a88bc..439fee1 100644
--- a/src/submissions/frontend/__init__.py
+++ b/src/submissions/frontend/__init__.py
@@ -1,3 +1,6 @@
+'''
+Operations for all user interactions.
+'''
import json
import re
from PyQt6.QtWidgets import (
@@ -19,8 +22,7 @@ from xhtml2pdf import pisa
# import plotly.express as px
import yaml
import pprint
-from backend.excel.parser import SheetParser
-from backend.excel.reports import convert_data_list_to_df
+from backend.excel import convert_data_list_to_df, make_report_xlsx, make_report_html, SheetParser
from backend.db import (construct_submission_info, lookup_reagent,
construct_reagent, store_submission, lookup_kittype_by_use,
lookup_regent_by_type_name, lookup_all_orgs, lookup_submissions_by_date_range,
@@ -29,18 +31,15 @@ from backend.db import (construct_submission_info, lookup_reagent,
create_org_from_yaml, store_reagent
)
from backend.db import lookup_kittype_by_name
-
-from .functions import check_kit_integrity
-from tools import check_not_nan, extract_form_info
-from backend.excel.reports import make_report_xlsx, make_report_html
-from frontend.custom_widgets.sub_details import SubmissionsSheet
-from frontend.custom_widgets.pop_ups import AlertPop, QuestionAsker
-from frontend.custom_widgets import AddReagentForm, ReportDatePicker, KitAdder, ControlsDatePicker, ImportReagent
+from .functions import extract_form_info
+from tools import check_not_nan, check_kit_integrity
+# from backend.excel.reports import
+from frontend.custom_widgets import SubmissionsSheet, AlertPop, QuestionAsker, AddReagentForm, ReportDatePicker, KitAdder, ControlsDatePicker, ImportReagent
import logging
import difflib
from getpass import getuser
from datetime import date
-from frontend.visualizations.charts import create_charts
+from frontend.visualizations import create_charts
logger = logging.getLogger(f'submissions.{__name__}')
logger.info("Hello, I am a logger")
diff --git a/src/submissions/frontend/custom_widgets/__init__.py b/src/submissions/frontend/custom_widgets/__init__.py
index 4b4f75a..f7f4021 100644
--- a/src/submissions/frontend/custom_widgets/__init__.py
+++ b/src/submissions/frontend/custom_widgets/__init__.py
@@ -1,323 +1,7 @@
-from datetime import date
-from PyQt6.QtWidgets import (
- QLabel, QVBoxLayout,
- QLineEdit, QComboBox, QDialog,
- QDialogButtonBox, QDateEdit, QSizePolicy, QWidget,
- QGridLayout, QPushButton, QSpinBox,
- QScrollBar, QHBoxLayout,
-)
-from PyQt6.QtCore import Qt, QDate, QSize
-# from submissions.backend.db import lookup_regent_by_type_name_and_kit_name
-from tools import check_not_nan, extract_form_info
-from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name, lookup_regent_by_type_name_and_kit_name
-from backend.excel.parser import SheetParser
-from jinja2 import Environment, FileSystemLoader
-import sys
-from pathlib import Path
-import logging
-import numpy as np
-from .pop_ups import AlertPop
+'''
+Contains all custom generated PyQT6 derivative widgets.
+'''
-logger = logging.getLogger(f"submissions.{__name__}")
-
-if getattr(sys, 'frozen', False):
- loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
-else:
- loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
-loader = FileSystemLoader(loader_path)
-env = Environment(loader=loader)
-
-
-class AddReagentForm(QDialog):
- """
- dialog to add gather info about new reagent
- """
- def __init__(self, ctx:dict, reagent_lot:str|None, reagent_type:str|None, expiry:date|None=None) -> None:
- super().__init__()
-
- if reagent_lot == None:
- reagent_lot = ""
-
- self.setWindowTitle("Add Reagent")
-
- QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
-
- self.buttonBox = QDialogButtonBox(QBtn)
- self.buttonBox.accepted.connect(self.accept)
- self.buttonBox.rejected.connect(self.reject)
- # widget to get lot info
- lot_input = QLineEdit()
- lot_input.setObjectName("lot")
- lot_input.setText(reagent_lot)
- # widget to get expiry info
- exp_input = QDateEdit(calendarPopup=True)
- exp_input.setObjectName('expiry')
- # if expiry is not passed in from gui, use today
- if expiry == None:
- exp_input.setDate(QDate.currentDate())
- else:
- exp_input.setDate(expiry)
- # widget to get reagent type info
- type_input = QComboBox()
- type_input.setObjectName('type')
- type_input.addItems([item.replace("_", " ").title() for item in get_all_reagenttype_names(ctx=ctx)])
- logger.debug(f"Trying to find index of {reagent_type}")
- # convert input to user friendly string?
- try:
- reagent_type = reagent_type.replace("_", " ").title()
- except AttributeError:
- reagent_type = None
- # set parsed reagent type to top of list
- index = type_input.findText(reagent_type, Qt.MatchFlag.MatchEndsWith)
- if index >= 0:
- type_input.setCurrentIndex(index)
- self.layout = QVBoxLayout()
- self.layout.addWidget(QLabel("Lot"))
- self.layout.addWidget(lot_input)
- self.layout.addWidget(QLabel("Expiry"))
- self.layout.addWidget(exp_input)
- self.layout.addWidget(QLabel("Type"))
- self.layout.addWidget(type_input)
- self.layout.addWidget(self.buttonBox)
- self.setLayout(self.layout)
-
-
-class ReportDatePicker(QDialog):
- """
- custom dialog to ask for report start/stop dates
- """
- def __init__(self) -> None:
- super().__init__()
-
- self.setWindowTitle("Select Report Date Range")
- # make confirm/reject buttons
- QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
- self.buttonBox = QDialogButtonBox(QBtn)
- self.buttonBox.accepted.connect(self.accept)
- self.buttonBox.rejected.connect(self.reject)
- # widgets to ask for dates
- start_date = QDateEdit(calendarPopup=True)
- start_date.setObjectName("start_date")
- start_date.setDate(QDate.currentDate())
- end_date = QDateEdit(calendarPopup=True)
- end_date.setObjectName("end_date")
- end_date.setDate(QDate.currentDate())
- self.layout = QVBoxLayout()
- self.layout.addWidget(QLabel("Start Date"))
- self.layout.addWidget(start_date)
- self.layout.addWidget(QLabel("End Date"))
- self.layout.addWidget(end_date)
- self.layout.addWidget(self.buttonBox)
- self.setLayout(self.layout)
-
-
-class KitAdder(QWidget):
- """
- dialog to get information to add kit
- """
- def __init__(self, parent_ctx:dict) -> None:
- super().__init__()
- self.ctx = parent_ctx
- self.grid = QGridLayout()
- self.setLayout(self.grid)
- # insert submit button at top
- self.submit_btn = QPushButton("Submit")
- self.grid.addWidget(self.submit_btn,0,0,1,1)
- self.grid.addWidget(QLabel("Kit Name:"),2,0)
- # widget to get kit name
- kit_name = QLineEdit()
- kit_name.setObjectName("kit_name")
- self.grid.addWidget(kit_name,2,1)
- self.grid.addWidget(QLabel("Used For Sample Type:"),3,0)
- # widget to get uses of kit
- used_for = QComboBox()
- used_for.setObjectName("used_for")
- # Insert all existing sample types
- used_for.addItems(lookup_all_sample_types(ctx=parent_ctx))
- used_for.setEditable(True)
- self.grid.addWidget(used_for,3,1)
- # set cost per run
- self.grid.addWidget(QLabel("Constant cost per full plate (plates, work hours, etc.):"),4,0)
- # widget to get constant cost
- const_cost = QSpinBox()
- const_cost.setObjectName("const_cost")
- const_cost.setMinimum(0)
- const_cost.setMaximum(9999)
- self.grid.addWidget(const_cost,4,1)
- self.grid.addWidget(QLabel("Mutable cost per full plate (tips, reagents, etc.):"),5,0)
- # widget to get mutable costs
- mut_cost = QSpinBox()
- mut_cost.setObjectName("mut_cost")
- mut_cost.setMinimum(0)
- mut_cost.setMaximum(9999)
- self.grid.addWidget(mut_cost,5,1)
- # button to add additional reagent types
- self.add_RT_btn = QPushButton("Add Reagent Type")
- self.grid.addWidget(self.add_RT_btn)
- self.add_RT_btn.clicked.connect(self.add_RT)
- self.submit_btn.clicked.connect(self.submit)
-
- def add_RT(self) -> None:
- """
- insert new reagent type row
- """
- # get bottommost row
- maxrow = self.grid.rowCount()
- reg_form = ReagentTypeForm(parent_ctx=self.ctx)
- reg_form.setObjectName(f"ReagentForm_{maxrow}")
- self.grid.addWidget(reg_form, maxrow + 1,0,1,2)
-
-
- def submit(self) -> None:
- """
- send kit to database
- """
- # get form info
- info, reagents = extract_form_info(self)
- logger.debug(f"kit info: {info}")
- yml_type = {}
- try:
- yml_type['password'] = info['password']
- except KeyError:
- pass
- used = info['used_for'].replace(" ", "_").lower()
- yml_type[used] = {}
- yml_type[used]['kits'] = {}
- yml_type[used]['kits'][info['kit_name']] = {}
- yml_type[used]['kits'][info['kit_name']]['constant_cost'] = info["const_cost"]
- yml_type[used]['kits'][info['kit_name']]['mutable_cost'] = info["mut_cost"]
- yml_type[used]['kits'][info['kit_name']]['reagenttypes'] = reagents
- logger.debug(yml_type)
- # send to kit constructor
- result = create_kit_from_yaml(ctx=self.ctx, exp=yml_type)
- msg = AlertPop(message=result['message'], status=result['status'])
- msg.exec()
-
- # def extract_form_info(self, object):
- # """
- # retrieves arbitrary number of labels, values from form
-
- # Args:
- # object (_type_): the object to extract info from
-
- # Returns:
- # _type_: _description_
- # """
- # labels = []
- # values = []
- # reagents = {}
- # for item in object.findChildren(QWidget):
- # logger.debug(item.parentWidget())
- # # if not isinstance(item.parentWidget(), ReagentTypeForm):
- # match item:
- # case QLabel():
- # labels.append(item.text().replace(" ", "_").strip(":").lower())
- # case QLineEdit():
- # # ad hoc check to prevent double reporting of qdatedit under lineedit for some reason
- # if not isinstance(prev_item, QDateEdit) and not isinstance(prev_item, QComboBox) and not isinstance(prev_item, QSpinBox) and not isinstance(prev_item, QScrollBar):
- # logger.debug(f"Previous: {prev_item}")
- # logger.debug(f"Item: {item}, {item.text()}")
- # values.append(item.text().strip())
- # case QComboBox():
- # values.append(item.currentText().strip())
- # case QDateEdit():
- # values.append(item.date().toPyDate())
- # case QSpinBox():
- # values.append(item.value())
- # case ReagentTypeForm():
- # re_labels, re_values, _ = self.extract_form_info(item)
- # reagent = {item[0]:item[1] for item in zip(re_labels, re_values)}
- # logger.debug(reagent)
- # # reagent = {reagent['name:']:{'eol':reagent['extension_of_life_(months):']}}
- # reagents[reagent["name_(*exactly*_as_it_appears_in_the_excel_submission_form)"].strip()] = {'eol_ext':int(reagent['extension_of_life_(months)'])}
- # prev_item = item
- # return labels, values, reagents
-
-class ReagentTypeForm(QWidget):
- """
- custom widget to add information about a new reagenttype
- """
- def __init__(self, parent_ctx:dict) -> None:
- super().__init__()
- grid = QGridLayout()
- self.setLayout(grid)
- grid.addWidget(QLabel("Name (*Exactly* as it appears in the excel submission form):"),0,0)
- # Widget to get reagent info
- reagent_getter = QComboBox()
- reagent_getter.setObjectName("name")
- # lookup all reagent type names from db
- reagent_getter.addItems(get_all_reagenttype_names(ctx=parent_ctx))
- reagent_getter.setEditable(True)
- grid.addWidget(reagent_getter,0,1)
- grid.addWidget(QLabel("Extension of Life (months):"),0,2)
- # widget toget extension of life
- eol = QSpinBox()
- eol.setObjectName('eol')
- eol.setMinimum(0)
- grid.addWidget(eol, 0,3)
-
-
-class ControlsDatePicker(QWidget):
- """
- custom widget to pick start and end dates for controls graphs
- """
- def __init__(self) -> None:
- super().__init__()
-
- self.start_date = QDateEdit(calendarPopup=True)
- # start date is three month prior to end date by default
- # edit: 2 month, but the variable name is the same cause I'm lazy
- threemonthsago = QDate.currentDate().addDays(-60)
- self.start_date.setDate(threemonthsago)
- self.end_date = QDateEdit(calendarPopup=True)
- self.end_date.setDate(QDate.currentDate())
- self.layout = QHBoxLayout()
- self.layout.addWidget(QLabel("Start Date"))
- self.layout.addWidget(self.start_date)
- self.layout.addWidget(QLabel("End Date"))
- self.layout.addWidget(self.end_date)
- self.setLayout(self.layout)
- self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
-
- def sizeHint(self) -> QSize:
- return QSize(80,20)
-
-
-class ImportReagent(QComboBox):
-
- def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None):
- super().__init__()
- self.setEditable(True)
- # Ensure that all reagenttypes have a name that matches the items in the excel parser
- query_var = item.replace("lot_", "")
- logger.debug(f"Query for: {query_var}")
- if prsr != None:
- if isinstance(prsr.sub[item], np.float64):
- logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
- try:
- prsr.sub[item] = int(prsr.sub[item]['lot'])
- except ValueError:
- pass
- # query for reagents using type name from sheet and kit from sheet
- logger.debug(f"Attempting lookup of reagents by type: {query_var}")
- # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
- relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
- # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
- output_reg = []
- for reagent in relevant_reagents:
- # extract strings from any sets.
- if isinstance(reagent, set):
- for thing in reagent:
- output_reg.append(thing)
- elif isinstance(reagent, str):
- output_reg.append(reagent)
- relevant_reagents = output_reg
- # if reagent in sheet is not found insert it into the front of relevant reagents so it shows
- if prsr != None:
- logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
- if str(prsr.sub[item]['lot']) not in relevant_reagents and prsr.sub[item]['lot'] != 'nan':
- if check_not_nan(prsr.sub[item]['lot']):
- relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
- logger.debug(f"New relevant reagents: {relevant_reagents}")
- self.addItems(relevant_reagents)
-
\ No newline at end of file
+from .misc import *
+from .pop_ups import *
+from .sub_details import *
\ No newline at end of file
diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py
new file mode 100644
index 0000000..c9da335
--- /dev/null
+++ b/src/submissions/frontend/custom_widgets/misc.py
@@ -0,0 +1,334 @@
+'''
+Contains miscellaneous widgets for frontend functions
+'''
+from datetime import date
+from PyQt6.QtWidgets import (
+ QLabel, QVBoxLayout,
+ QLineEdit, QComboBox, QDialog,
+ QDialogButtonBox, QDateEdit, QSizePolicy, QWidget,
+ QGridLayout, QPushButton, QSpinBox,
+ QHBoxLayout,
+)
+from PyQt6.QtCore import Qt, QDate, QSize
+# from submissions.backend.db import lookup_regent_by_type_name_and_kit_name
+from tools import check_not_nan
+from ..functions import extract_form_info
+from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, lookup_regent_by_type_name#, lookup_regent_by_type_name_and_kit_name
+from backend.excel.parser import SheetParser
+from jinja2 import Environment, FileSystemLoader
+import sys
+from pathlib import Path
+import logging
+import numpy as np
+from .pop_ups import AlertPop
+
+logger = logging.getLogger(f"submissions.{__name__}")
+
+if getattr(sys, 'frozen', False):
+ loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
+else:
+ loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
+loader = FileSystemLoader(loader_path)
+env = Environment(loader=loader)
+
+
+class AddReagentForm(QDialog):
+ """
+ dialog to add gather info about new reagent
+ """
+ def __init__(self, ctx:dict, reagent_lot:str|None, reagent_type:str|None, expiry:date|None=None) -> None:
+ super().__init__()
+
+ if reagent_lot == None:
+ reagent_lot = ""
+
+ self.setWindowTitle("Add Reagent")
+
+ QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
+
+ self.buttonBox = QDialogButtonBox(QBtn)
+ self.buttonBox.accepted.connect(self.accept)
+ self.buttonBox.rejected.connect(self.reject)
+ # widget to get lot info
+ lot_input = QLineEdit()
+ lot_input.setObjectName("lot")
+ lot_input.setText(reagent_lot)
+ # widget to get expiry info
+ exp_input = QDateEdit(calendarPopup=True)
+ exp_input.setObjectName('expiry')
+ # if expiry is not passed in from gui, use today
+ if expiry == None:
+ exp_input.setDate(QDate.currentDate())
+ else:
+ exp_input.setDate(expiry)
+ # widget to get reagent type info
+ type_input = QComboBox()
+ type_input.setObjectName('type')
+ type_input.addItems([item.replace("_", " ").title() for item in get_all_reagenttype_names(ctx=ctx)])
+ logger.debug(f"Trying to find index of {reagent_type}")
+ # convert input to user friendly string?
+ try:
+ reagent_type = reagent_type.replace("_", " ").title()
+ except AttributeError:
+ reagent_type = None
+ # set parsed reagent type to top of list
+ index = type_input.findText(reagent_type, Qt.MatchFlag.MatchEndsWith)
+ if index >= 0:
+ type_input.setCurrentIndex(index)
+ self.layout = QVBoxLayout()
+ self.layout.addWidget(QLabel("Lot:"))
+ self.layout.addWidget(lot_input)
+ self.layout.addWidget(QLabel("Expiry:\n(use exact date on reagent.\nEOL will be calculated from kit automatically)"))
+ self.layout.addWidget(exp_input)
+ self.layout.addWidget(QLabel("Type:"))
+ self.layout.addWidget(type_input)
+ self.layout.addWidget(self.buttonBox)
+ self.setLayout(self.layout)
+
+
+class ReportDatePicker(QDialog):
+ """
+ custom dialog to ask for report start/stop dates
+ """
+ def __init__(self) -> None:
+ super().__init__()
+
+ self.setWindowTitle("Select Report Date Range")
+ # make confirm/reject buttons
+ QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
+ self.buttonBox = QDialogButtonBox(QBtn)
+ self.buttonBox.accepted.connect(self.accept)
+ self.buttonBox.rejected.connect(self.reject)
+ # widgets to ask for dates
+ start_date = QDateEdit(calendarPopup=True)
+ start_date.setObjectName("start_date")
+ start_date.setDate(QDate.currentDate())
+ end_date = QDateEdit(calendarPopup=True)
+ end_date.setObjectName("end_date")
+ end_date.setDate(QDate.currentDate())
+ self.layout = QVBoxLayout()
+ self.layout.addWidget(QLabel("Start Date"))
+ self.layout.addWidget(start_date)
+ self.layout.addWidget(QLabel("End Date"))
+ self.layout.addWidget(end_date)
+ self.layout.addWidget(self.buttonBox)
+ self.setLayout(self.layout)
+
+
+class KitAdder(QWidget):
+ """
+ dialog to get information to add kit
+ """
+ def __init__(self, parent_ctx:dict) -> None:
+ super().__init__()
+ self.ctx = parent_ctx
+ self.grid = QGridLayout()
+ self.setLayout(self.grid)
+ # insert submit button at top
+ self.submit_btn = QPushButton("Submit")
+ self.grid.addWidget(self.submit_btn,0,0,1,1)
+ self.grid.addWidget(QLabel("Kit Name:"),2,0)
+ # widget to get kit name
+ kit_name = QLineEdit()
+ kit_name.setObjectName("kit_name")
+ self.grid.addWidget(kit_name,2,1)
+ self.grid.addWidget(QLabel("Used For Sample Type:"),3,0)
+ # widget to get uses of kit
+ used_for = QComboBox()
+ used_for.setObjectName("used_for")
+ # Insert all existing sample types
+ used_for.addItems(lookup_all_sample_types(ctx=parent_ctx))
+ used_for.setEditable(True)
+ self.grid.addWidget(used_for,3,1)
+ # set cost per run
+ self.grid.addWidget(QLabel("Constant cost per full plate (plates, work hours, etc.):"),4,0)
+ # widget to get constant cost
+ const_cost = QSpinBox()
+ const_cost.setObjectName("const_cost")
+ const_cost.setMinimum(0)
+ const_cost.setMaximum(9999)
+ self.grid.addWidget(const_cost,4,1)
+ self.grid.addWidget(QLabel("Mutable cost per full plate (tips, reagents, etc.):"),5,0)
+ # widget to get mutable costs
+ mut_cost = QSpinBox()
+ mut_cost.setObjectName("mut_cost")
+ mut_cost.setMinimum(0)
+ mut_cost.setMaximum(9999)
+ self.grid.addWidget(mut_cost,5,1)
+ # button to add additional reagent types
+ self.add_RT_btn = QPushButton("Add Reagent Type")
+ self.grid.addWidget(self.add_RT_btn)
+ self.add_RT_btn.clicked.connect(self.add_RT)
+ self.submit_btn.clicked.connect(self.submit)
+
+ def add_RT(self) -> None:
+ """
+ insert new reagent type row
+ """
+ # get bottommost row
+ maxrow = self.grid.rowCount()
+ reg_form = ReagentTypeForm(parent_ctx=self.ctx)
+ reg_form.setObjectName(f"ReagentForm_{maxrow}")
+ self.grid.addWidget(reg_form, maxrow + 1,0,1,2)
+
+
+ def submit(self) -> None:
+ """
+ send kit to database
+ """
+ # get form info
+ info, reagents = extract_form_info(self)
+ logger.debug(f"kit info: {info}")
+ yml_type = {}
+ try:
+ yml_type['password'] = info['password']
+ except KeyError:
+ pass
+ used = info['used_for'].replace(" ", "_").lower()
+ yml_type[used] = {}
+ yml_type[used]['kits'] = {}
+ yml_type[used]['kits'][info['kit_name']] = {}
+ yml_type[used]['kits'][info['kit_name']]['constant_cost'] = info["const_cost"]
+ yml_type[used]['kits'][info['kit_name']]['mutable_cost'] = info["mut_cost"]
+ yml_type[used]['kits'][info['kit_name']]['reagenttypes'] = reagents
+ logger.debug(yml_type)
+ # send to kit constructor
+ result = create_kit_from_yaml(ctx=self.ctx, exp=yml_type)
+ msg = AlertPop(message=result['message'], status=result['status'])
+ msg.exec()
+
+ # def extract_form_info(self, object):
+ # """
+ # retrieves arbitrary number of labels, values from form
+
+ # Args:
+ # object (_type_): the object to extract info from
+
+ # Returns:
+ # _type_: _description_
+ # """
+ # labels = []
+ # values = []
+ # reagents = {}
+ # for item in object.findChildren(QWidget):
+ # logger.debug(item.parentWidget())
+ # # if not isinstance(item.parentWidget(), ReagentTypeForm):
+ # match item:
+ # case QLabel():
+ # labels.append(item.text().replace(" ", "_").strip(":").lower())
+ # case QLineEdit():
+ # # ad hoc check to prevent double reporting of qdatedit under lineedit for some reason
+ # if not isinstance(prev_item, QDateEdit) and not isinstance(prev_item, QComboBox) and not isinstance(prev_item, QSpinBox) and not isinstance(prev_item, QScrollBar):
+ # logger.debug(f"Previous: {prev_item}")
+ # logger.debug(f"Item: {item}, {item.text()}")
+ # values.append(item.text().strip())
+ # case QComboBox():
+ # values.append(item.currentText().strip())
+ # case QDateEdit():
+ # values.append(item.date().toPyDate())
+ # case QSpinBox():
+ # values.append(item.value())
+ # case ReagentTypeForm():
+ # re_labels, re_values, _ = self.extract_form_info(item)
+ # reagent = {item[0]:item[1] for item in zip(re_labels, re_values)}
+ # logger.debug(reagent)
+ # # reagent = {reagent['name:']:{'eol':reagent['extension_of_life_(months):']}}
+ # reagents[reagent["name_(*exactly*_as_it_appears_in_the_excel_submission_form)"].strip()] = {'eol_ext':int(reagent['extension_of_life_(months)'])}
+ # prev_item = item
+ # return labels, values, reagents
+
+class ReagentTypeForm(QWidget):
+ """
+ custom widget to add information about a new reagenttype
+ """
+ def __init__(self, parent_ctx:dict) -> None:
+ super().__init__()
+ grid = QGridLayout()
+ self.setLayout(grid)
+ grid.addWidget(QLabel("Name (*Exactly* as it appears in the excel submission form):"),0,0)
+ # Widget to get reagent info
+ reagent_getter = QComboBox()
+ reagent_getter.setObjectName("name")
+ # lookup all reagent type names from db
+ reagent_getter.addItems(get_all_reagenttype_names(ctx=parent_ctx))
+ reagent_getter.setEditable(True)
+ grid.addWidget(reagent_getter,0,1)
+ grid.addWidget(QLabel("Extension of Life (months):"),0,2)
+ # widget toget extension of life
+ eol = QSpinBox()
+ eol.setObjectName('eol')
+ eol.setMinimum(0)
+ grid.addWidget(eol, 0,3)
+
+
+class ControlsDatePicker(QWidget):
+ """
+ custom widget to pick start and end dates for controls graphs
+ """
+ def __init__(self) -> None:
+ super().__init__()
+
+ self.start_date = QDateEdit(calendarPopup=True)
+ # start date is three month prior to end date by default
+ # NOTE: 2 month, but the variable name is the same cause I'm lazy
+ threemonthsago = QDate.currentDate().addDays(-60)
+ self.start_date.setDate(threemonthsago)
+ self.end_date = QDateEdit(calendarPopup=True)
+ self.end_date.setDate(QDate.currentDate())
+ self.layout = QHBoxLayout()
+ self.layout.addWidget(QLabel("Start Date"))
+ self.layout.addWidget(self.start_date)
+ self.layout.addWidget(QLabel("End Date"))
+ self.layout.addWidget(self.end_date)
+ self.setLayout(self.layout)
+ self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
+
+ def sizeHint(self) -> QSize:
+ return QSize(80,20)
+
+
+class ImportReagent(QComboBox):
+
+ def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None):
+ super().__init__()
+ self.setEditable(True)
+ # Ensure that all reagenttypes have a name that matches the items in the excel parser
+ query_var = item.replace("lot_", "")
+ logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}")
+ logger.debug(f"Query for: {query_var}")
+ if prsr != None:
+ if isinstance(prsr.sub[item], np.float64):
+ logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
+ try:
+ prsr.sub[item] = int(prsr.sub[item]['lot'])
+ except ValueError:
+ pass
+ # query for reagents using type name from sheet and kit from sheet
+ logger.debug(f"Attempting lookup of reagents by type: {query_var}")
+ # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
+ relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
+ # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
+ output_reg = []
+ for reagent in relevant_reagents:
+ # extract strings from any sets.
+ if isinstance(reagent, set):
+ for thing in reagent:
+ output_reg.append(thing)
+ elif isinstance(reagent, str):
+ output_reg.append(reagent)
+ relevant_reagents = output_reg
+ # if reagent in sheet is not found insert it into the front of relevant reagents so it shows
+ if prsr != None:
+ logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
+ if str(prsr.sub[item]['lot']) not in relevant_reagents:
+ if check_not_nan(prsr.sub[item]['lot']):
+ relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
+ else:
+ if len(relevant_reagents) > 1:
+ logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
+ relevant_reagents.insert(0, relevant_reagents.pop(relevant_reagents.index(prsr.sub[item]['lot'])))
+ else:
+ logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
+ logger.debug(f"New relevant reagents: {relevant_reagents}")
+ self.addItems(relevant_reagents)
+
\ No newline at end of file
diff --git a/src/submissions/frontend/custom_widgets/pop_ups.py b/src/submissions/frontend/custom_widgets/pop_ups.py
index 9ea856c..7672115 100644
--- a/src/submissions/frontend/custom_widgets/pop_ups.py
+++ b/src/submissions/frontend/custom_widgets/pop_ups.py
@@ -1,3 +1,6 @@
+'''
+Contains dialogs for notification and prompting.
+'''
from PyQt6.QtWidgets import (
QLabel, QVBoxLayout, QDialog,
QDialogButtonBox, QMessageBox
diff --git a/src/submissions/frontend/custom_widgets/sub_details.py b/src/submissions/frontend/custom_widgets/sub_details.py
index 1dba48c..d3433c8 100644
--- a/src/submissions/frontend/custom_widgets/sub_details.py
+++ b/src/submissions/frontend/custom_widgets/sub_details.py
@@ -1,3 +1,6 @@
+'''
+Contains widgets specific to the submission summary and submission details.
+'''
from PyQt6.QtWidgets import (
QVBoxLayout, QDialog, QTableView,
QTextEdit, QPushButton, QScrollArea,
diff --git a/src/submissions/frontend/functions.py b/src/submissions/frontend/functions.py
index ca6c86f..2723eae 100644
--- a/src/submissions/frontend/functions.py
+++ b/src/submissions/frontend/functions.py
@@ -1,80 +1,53 @@
-# from ..models import *
from backend.db.models import *
import logging
+from PyQt6.QtWidgets import (
+ QMainWindow, QLabel, QToolBar,
+ QTabWidget, QWidget, QVBoxLayout,
+ QPushButton, QFileDialog,
+ QLineEdit, QMessageBox, QComboBox, QDateEdit, QHBoxLayout,
+ QSpinBox, QScrollArea
+)
+
logger = logging.getLogger(f"submissions.{__name__}")
-def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None) -> dict|None:
+def extract_form_info(object) -> dict:
"""
- Ensures all reagents expected in kit are listed in Submission
+ retrieves object names and values from form
Args:
- sub (BasicSubmission | KitType): Object containing complete list of reagent types.
- reagenttypes (list | None, optional): List to check against complete list. Defaults to None.
+ object (_type_): the form widget
Returns:
- dict|None: Result object containing a message and any missing components.
- """
- logger.debug(type(sub))
- # What type is sub?
- match sub:
- case BasicSubmission():
- ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types]
- # Overwrite function parameter reagenttypes
- reagenttypes = [reagent.type.name for reagent in sub.reagents]
- case KitType():
- ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types]
- logger.debug(f"Kit reagents: {ext_kit_rtypes}")
- logger.debug(f"Submission reagents: {reagenttypes}")
- # check if lists are equal
- check = set(ext_kit_rtypes) == set(reagenttypes)
- logger.debug(f"Checking if reagents match kit contents: {check}")
- # what reagent types are in both lists?
- # common = list(set(ext_kit_rtypes).intersection(reagenttypes))
- missing = list(set(ext_kit_rtypes).difference(reagenttypes))
- logger.debug(f"Missing reagents types: {missing}")
- # if lists are equal return no problem
- if len(missing)==0:
- result = None
- else:
- # missing = [x for x in ext_kit_rtypes if x not in common]
- result = {'message' : f"Couldn't verify reagents match listed kit components.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.", 'missing': missing}
- return result
-
-
-# Below is depreciated:
-# def insert_reagent_import(ctx:dict, item:str, prsr:SheetParser|None=None) -> QComboBox:
-# add_widget = QComboBox()
-# add_widget.setEditable(True)
-# # Ensure that all reagenttypes have a name that matches the items in the excel parser
-# query_var = item.replace("lot_", "")
-# logger.debug(f"Query for: {query_var}")
-# if prsr != None:
-# if isinstance(prsr.sub[item], np.float64):
-# logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
-# try:
-# prsr.sub[item] = int(prsr.sub[item]['lot'])
-# except ValueError:
-# pass
-# # query for reagents using type name from sheet and kit from sheet
-# logger.debug(f"Attempting lookup of reagents by type: {query_var}")
-# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
-# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
-# output_reg = []
-# for reagent in relevant_reagents:
-# if isinstance(reagent, set):
-# for thing in reagent:
-# output_reg.append(thing)
-# elif isinstance(reagent, str):
-# output_reg.append(reagent)
-# relevant_reagents = output_reg
-# # if reagent in sheet is not found insert it into items
-# if prsr != None:
-# logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
-# if str(prsr.sub[item]['lot']) not in relevant_reagents and prsr.sub[item]['lot'] != 'nan':
-# if check_not_nan(prsr.sub[item]['lot']):
-# relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
-# logger.debug(f"New relevant reagents: {relevant_reagents}")
-# add_widget.addItems(relevant_reagents)
-# return add_widget
-
+ dict: dictionary of objectName:text items
+ """
+ from frontend.custom_widgets import ReagentTypeForm
+ dicto = {}
+ reagents = {}
+ logger.debug(f"Object type: {type(object)}")
+ # grab all widgets in form
+ try:
+ all_children = object.layout.parentWidget().findChildren(QWidget)
+ except AttributeError:
+ all_children = object.layout().parentWidget().findChildren(QWidget)
+ for item in all_children:
+ logger.debug(f"Looking at: {item.objectName()}")
+ match item:
+ case QLineEdit():
+ dicto[item.objectName()] = item.text()
+ case QComboBox():
+ dicto[item.objectName()] = item.currentText()
+ case QDateEdit():
+ dicto[item.objectName()] = item.date().toPyDate()
+ case QSpinBox():
+ dicto[item.objectName()] = item.value()
+ case ReagentTypeForm():
+ reagent = extract_form_info(item)
+ # reagent = {item[0]:item[1] for item in zip(re_labels, re_values)}
+ logger.debug(reagent)
+ # reagent = {reagent['name:']:{'eol':reagent['extension_of_life_(months):']}}
+ reagents[reagent["name"].strip()] = {'eol_ext':int(reagent['eol'])}
+ # value for ad hoc check above
+ if reagents != {}:
+ return dicto, reagents
+ return dicto
\ No newline at end of file
diff --git a/src/submissions/frontend/static/css/data_browser.css b/src/submissions/frontend/static/css/data_browser.css
deleted file mode 100644
index 35bdbd1..0000000
--- a/src/submissions/frontend/static/css/data_browser.css
+++ /dev/null
@@ -1,40 +0,0 @@
-Screen {
-
-}
-
-.box {
- height: 3;
- border: solid green;
-}
-
-#tree-view {
- display: none;
- scrollbar-gutter: stable;
- overflow: auto;
- width: auto;
- height: 100%;
- dock: left;
-}
-
-DataBrowser.-show-tree #tree-view {
- display: block;
- max-width: 50%;
-}
-
-
-#code-view {
- overflow: auto scroll;
- min-width: 100%;
-}
-#code {
- width: auto;
-}
-
-FormField {
- width: 70%;
- height: 3;
- padding: 1 2;
- background: $primary;
- border: $secondary tall;
- content-align: center middle;
-}
\ No newline at end of file
diff --git a/src/submissions/frontend/static/css/styles.css b/src/submissions/frontend/static/css/styles.css
deleted file mode 100644
index e69de29..0000000
diff --git a/src/submissions/frontend/visualizations/__init__.py b/src/submissions/frontend/visualizations/__init__.py
index e69de29..ef76651 100644
--- a/src/submissions/frontend/visualizations/__init__.py
+++ b/src/submissions/frontend/visualizations/__init__.py
@@ -0,0 +1,4 @@
+'''
+Contains all operations for creating charts, graphs and visual effects.
+'''
+from .control_charts import *
\ No newline at end of file
diff --git a/src/submissions/frontend/visualizations/charts.py b/src/submissions/frontend/visualizations/control_charts.py
similarity index 94%
rename from src/submissions/frontend/visualizations/charts.py
rename to src/submissions/frontend/visualizations/control_charts.py
index a13fcb1..b52b14f 100644
--- a/src/submissions/frontend/visualizations/charts.py
+++ b/src/submissions/frontend/visualizations/control_charts.py
@@ -1,3 +1,6 @@
+'''
+Functions for constructing controls graphs using plotly.
+'''
import plotly.express as px
import pandas as pd
from pathlib import Path
@@ -149,8 +152,19 @@ def output_figures(settings:dict, figs:list, group_name:str):
def construct_chart(ctx:dict, df:pd.DataFrame, modes:list, ytitle:str|None=None) -> Figure:
- fig = Figure()
+ """
+ Creates a plotly chart for controls from a pandas dataframe
+ Args:
+ ctx (dict): settings passed down from gui
+ df (pd.DataFrame): input dataframe of controls
+ modes (list): analysis modes to construct charts for
+ ytitle (str | None, optional): title on the y-axis. Defaults to None.
+
+ Returns:
+ Figure: output stacked bar chart.
+ """
+ fig = Figure()
for ii, mode in enumerate(modes):
if "count" in mode:
df[mode] = pd.to_numeric(df[mode],errors='coerce')
@@ -161,7 +175,6 @@ def construct_chart(ctx:dict, df:pd.DataFrame, modes:list, ytitle:str|None=None)
color_discrete_sequence=None
else:
color = "target"
- # print(get_unique_values_in_df_column(df, 'target'))
match get_unique_values_in_df_column(df, 'target'):
case ['Target']:
color_discrete_sequence=["blue"]
@@ -180,7 +193,6 @@ def construct_chart(ctx:dict, df:pd.DataFrame, modes:list, ytitle:str|None=None)
)
bar.update_traces(visible = ii == 0)
fig.add_traces(bar.data)
- # sys.exit(f"number of traces={len(fig.data)}")
return generic_figure_markers(fig=fig, modes=modes, ytitle=ytitle)
# Below are the individual construction functions. They must be named "construct_{mode}_chart" and
@@ -264,17 +276,3 @@ def divide_chunks(input_list:list, chunk_count:int):
"""
k, m = divmod(len(input_list), chunk_count)
return (input_list[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(chunk_count))
-
-########This must be at bottom of module###########
-
-function_map = {}
-for item in dict(locals().items()):
- try:
- if dict(locals().items())[item].__module__ == __name__:
- try:
- function_map[item] = dict(locals().items())[item]
- except KeyError:
- pass
- except AttributeError:
- pass
-###################################################
\ No newline at end of file
diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py
index 2a60b57..4d25aac 100644
--- a/src/submissions/tools/__init__.py
+++ b/src/submissions/tools/__init__.py
@@ -1,18 +1,14 @@
import numpy as np
import logging
import getpass
-from PyQt6.QtWidgets import (
- QMainWindow, QLabel, QToolBar,
- QTabWidget, QWidget, QVBoxLayout,
- QPushButton, QFileDialog,
- QLineEdit, QMessageBox, QComboBox, QDateEdit, QHBoxLayout,
- QSpinBox, QScrollArea
-)
-
+from backend.db.models import BasicSubmission, KitType
logger = logging.getLogger(f"submissions.{__name__}")
def check_not_nan(cell_contents) -> bool:
+ # check for nan as a string first
+ if cell_contents == 'nan':
+ cell_contents = np.nan
try:
return not np.isnan(cell_contents)
except TypeError:
@@ -28,7 +24,7 @@ def check_is_power_user(ctx:dict) -> bool:
except KeyError as e:
check = False
except Exception as e:
- logger.debug(f"Check encounteded unknown error: {type(e).__name__} - {e}")
+ logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
check = False
return check
@@ -37,43 +33,39 @@ def create_reagent_list(in_dict:dict) -> list[str]:
return [item.strip("lot_") for item in in_dict.keys()]
-def extract_form_info(object) -> dict:
+def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None) -> dict|None:
"""
- retrieves object names and values from form
+ Ensures all reagents expected in kit are listed in Submission
Args:
- object (_type_): the form widget
+ sub (BasicSubmission | KitType): Object containing complete list of reagent types.
+ reagenttypes (list | None, optional): List to check against complete list. Defaults to None.
Returns:
- dict: dictionary of objectName:text items
- """
- from frontend.custom_widgets import ReagentTypeForm
- dicto = {}
- reagents = {}
- logger.debug(f"Object type: {type(object)}")
- # grab all widgets in form
- try:
- all_children = object.layout.parentWidget().findChildren(QWidget)
- except AttributeError:
- all_children = object.layout().parentWidget().findChildren(QWidget)
- for item in all_children:
- logger.debug(f"Looking at: {item.objectName()}")
- match item:
- case QLineEdit():
- dicto[item.objectName()] = item.text()
- case QComboBox():
- dicto[item.objectName()] = item.currentText()
- case QDateEdit():
- dicto[item.objectName()] = item.date().toPyDate()
- case QSpinBox():
- dicto[item.objectName()] = item.value()
- case ReagentTypeForm():
- reagent = extract_form_info(item)
- # reagent = {item[0]:item[1] for item in zip(re_labels, re_values)}
- logger.debug(reagent)
- # reagent = {reagent['name:']:{'eol':reagent['extension_of_life_(months):']}}
- reagents[reagent["name"].strip()] = {'eol_ext':int(reagent['eol'])}
- # value for ad hoc check above
- if reagents != {}:
- return dicto, reagents
- return dicto
\ No newline at end of file
+ dict|None: Result object containing a message and any missing components.
+ """
+ logger.debug(type(sub))
+ # What type is sub?
+ match sub:
+ case BasicSubmission():
+ ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types]
+ # Overwrite function parameter reagenttypes
+ reagenttypes = [reagent.type.name for reagent in sub.reagents]
+ case KitType():
+ ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types]
+ logger.debug(f"Kit reagents: {ext_kit_rtypes}")
+ logger.debug(f"Submission reagents: {reagenttypes}")
+ # check if lists are equal
+ check = set(ext_kit_rtypes) == set(reagenttypes)
+ logger.debug(f"Checking if reagents match kit contents: {check}")
+ # what reagent types are in both lists?
+ # common = list(set(ext_kit_rtypes).intersection(reagenttypes))
+ missing = list(set(ext_kit_rtypes).difference(reagenttypes))
+ logger.debug(f"Missing reagents types: {missing}")
+ # if lists are equal return no problem
+ if len(missing)==0:
+ result = None
+ else:
+ # missing = [x for x in ext_kit_rtypes if x not in common]
+ result = {'message' : f"Couldn't verify reagents match listed kit components.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.", 'missing': missing}
+ return result
\ No newline at end of file