Large scale refactor to improve db efficiency

This commit is contained in:
Landon Wark
2023-09-27 14:16:28 -05:00
parent 82ab06efad
commit e484eabb22
37 changed files with 1782 additions and 1697 deletions

View File

@@ -1,3 +1,8 @@
## 202309.04
- Extraction kit can now be updated after import.
- Large scale refactoring to improve efficiency of database functions.
## 202309.03
- Autofill now adds name of reagent instead of type.

View File

@@ -1,8 +1,10 @@
- [x] Rerun Kit integrity if extraction kit changed in the form.
- [x] Clean up db.functions.
- [ ] Make kits easier to add.
- [ ] Clean up & document code... again.
- Including paring down the logging.debugs
- Also including reducing number of functions in db.functions
- [ ] Fix Tests... again.
- [x] Fix Tests... again.
- [x] Rebuild database
- [x] Provide more generic names for reagenttypes in kits and move specific names to reagents.
- ex. Instead of "omega_e-z_96_disruptor_plate_c_plus" in reagent types, have "omega_plate" and have "omega_e-z_96_disruptor_plate_c_plus" in reagent name.

7
source/modules.rst Normal file
View File

@@ -0,0 +1,7 @@
submissions
===========
.. toctree::
:maxdepth: 4
submissions

View File

@@ -0,0 +1,37 @@
submissions.backend.db.functions package
========================================
Submodules
----------
submissions.backend.db.functions.constructions module
-----------------------------------------------------
.. automodule:: submissions.backend.db.functions.constructions
:members:
:undoc-members:
:show-inheritance:
submissions.backend.db.functions.lookups module
-----------------------------------------------
.. automodule:: submissions.backend.db.functions.lookups
:members:
:undoc-members:
:show-inheritance:
submissions.backend.db.functions.misc module
--------------------------------------------
.. automodule:: submissions.backend.db.functions.misc
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.backend.db.functions
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,45 @@
submissions.backend.db.models package
=====================================
Submodules
----------
submissions.backend.db.models.controls module
---------------------------------------------
.. automodule:: submissions.backend.db.models.controls
:members:
:undoc-members:
:show-inheritance:
submissions.backend.db.models.kits module
-----------------------------------------
.. automodule:: submissions.backend.db.models.kits
:members:
:undoc-members:
:show-inheritance:
submissions.backend.db.models.organizations module
--------------------------------------------------
.. automodule:: submissions.backend.db.models.organizations
:members:
:undoc-members:
:show-inheritance:
submissions.backend.db.models.submissions module
------------------------------------------------
.. automodule:: submissions.backend.db.models.submissions
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.backend.db.models
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,19 @@
submissions.backend.db package
==============================
Subpackages
-----------
.. toctree::
:maxdepth: 4
submissions.backend.db.functions
submissions.backend.db.models
Module contents
---------------
.. automodule:: submissions.backend.db
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,29 @@
submissions.backend.excel package
=================================
Submodules
----------
submissions.backend.excel.parser module
---------------------------------------
.. automodule:: submissions.backend.excel.parser
:members:
:undoc-members:
:show-inheritance:
submissions.backend.excel.reports module
----------------------------------------
.. automodule:: submissions.backend.excel.reports
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.backend.excel
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,10 @@
submissions.backend.pydant package
==================================
Module contents
---------------
.. automodule:: submissions.backend.pydant
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,20 @@
submissions.backend package
===========================
Subpackages
-----------
.. toctree::
:maxdepth: 4
submissions.backend.db
submissions.backend.excel
submissions.backend.pydant
Module contents
---------------
.. automodule:: submissions.backend
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,37 @@
submissions.frontend.custom\_widgets package
============================================
Submodules
----------
submissions.frontend.custom\_widgets.misc module
------------------------------------------------
.. automodule:: submissions.frontend.custom_widgets.misc
:members:
:undoc-members:
:show-inheritance:
submissions.frontend.custom\_widgets.pop\_ups module
----------------------------------------------------
.. automodule:: submissions.frontend.custom_widgets.pop_ups
:members:
:undoc-members:
:show-inheritance:
submissions.frontend.custom\_widgets.sub\_details module
--------------------------------------------------------
.. automodule:: submissions.frontend.custom_widgets.sub_details
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.frontend.custom_widgets
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,38 @@
submissions.frontend package
============================
Subpackages
-----------
.. toctree::
:maxdepth: 4
submissions.frontend.custom_widgets
submissions.frontend.visualizations
Submodules
----------
submissions.frontend.all\_window\_functions module
--------------------------------------------------
.. automodule:: submissions.frontend.all_window_functions
:members:
:undoc-members:
:show-inheritance:
submissions.frontend.main\_window\_functions module
---------------------------------------------------
.. automodule:: submissions.frontend.main_window_functions
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.frontend
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,37 @@
submissions.frontend.visualizations package
===========================================
Submodules
----------
submissions.frontend.visualizations.barcode module
--------------------------------------------------
.. automodule:: submissions.frontend.visualizations.barcode
:members:
:undoc-members:
:show-inheritance:
submissions.frontend.visualizations.control\_charts module
----------------------------------------------------------
.. automodule:: submissions.frontend.visualizations.control_charts
:members:
:undoc-members:
:show-inheritance:
submissions.frontend.visualizations.plate\_map module
-----------------------------------------------------
.. automodule:: submissions.frontend.visualizations.plate_map
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: submissions.frontend.visualizations
:members:
:undoc-members:
:show-inheritance:

20
source/submissions.rst Normal file
View File

@@ -0,0 +1,20 @@
submissions package
===================
Subpackages
-----------
.. toctree::
:maxdepth: 4
submissions.backend
submissions.frontend
submissions.tools
Module contents
---------------
.. automodule:: submissions
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,10 @@
submissions.tools package
=========================
Module contents
---------------
.. automodule:: submissions.tools
:members:
:undoc-members:
:show-inheritance:

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package
__project__ = "submissions"
__version__ = "202309.3b"
__version__ = "202309.4b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada"

View File

@@ -1,3 +1,3 @@
'''
Contains database and excel operations.
Contains database, pydantic and excel operations.
'''

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,91 @@
'''Contains or imports all database convenience functions'''
from tools import Settings, package_dir
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from pathlib import Path
import logging
logger = logging.getLogger(f"Submissions_{__name__}")
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""
*should* allow automatic creation of foreign keys in the database
I have no idea how it actually works.
Args:
dbapi_connection (_type_): _description_
connection_record (_type_): _description_
"""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def create_database_session(ctx:Settings) -> Session:
"""
Create database session for app.
Args:
ctx (Settings): settings passed down from gui
Raises:
FileNotFoundError: Raised if sqlite file not found
Returns:
Session: Sqlalchemy session object.
"""
database_path = ctx.database_path
if database_path == None:
# check in user's .submissions directory for submissions.db
if Path.home().joinpath(".submissions", "submissions.db").exists():
database_path = Path.home().joinpath(".submissions", "submissions.db")
# finally, look in the local dir
else:
database_path = package_dir.joinpath("submissions.db")
else:
if database_path == ":memory:":
pass
# check if user defined path is directory
elif database_path.is_dir():
database_path = database_path.joinpath("submissions.db")
# check if user defined path is a file
elif database_path.is_file():
database_path = database_path
else:
raise FileNotFoundError("No database file found. Exiting program.")
logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}", echo=True, future=True)
session = Session(engine)
return session
def store_object(ctx:Settings, object) -> dict|None:
"""
Store an object in the database
Args:
ctx (Settings): Settings object passed down from gui
object (_type_): Object to be stored
Returns:
dict|None: Result of action
"""
dbs = ctx.database_session
dbs.merge(object)
try:
dbs.commit()
except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
logger.debug(f"Hit an integrity error : {e}")
dbs.rollback()
return {"message":f"This object {object} already exists, so we can't add it.", "status":"Critical"}
except (SQLOperationalError, AlcOperationalError):
logger.error(f"Hit an operational error: {e}")
dbs.rollback()
return {"message":"The database is locked for editing."}
return None
from .lookups import *
from .constructions import *
from .misc import *

View File

@@ -0,0 +1,276 @@
'''
Used to construct models from input dictionaries.
'''
from getpass import getuser
from tools import Settings, RSLNamer, check_regex_match
from .. import models
from .lookups import *
import logging
from datetime import date, timedelta
from dateutil.parser import parse
from typing import Tuple
from sqlalchemy.exc import IntegrityError, SAWarning
logger = logging.getLogger(f"submissions.{__name__}")
def construct_reagent(ctx:Settings, info_dict:dict) -> models.Reagent:
"""
Construct reagent object from dictionary
Args:
ctx (Settings): settings object passed down from gui
info_dict (dict): dictionary to be converted
Returns:
models.Reagent: Constructed reagent object
"""
reagent = models.Reagent()
for item in info_dict:
logger.debug(f"Reagent info item for {item}: {info_dict[item]}")
# set fields based on keys in dictionary
match item:
case "lot":
reagent.lot = info_dict[item].upper()
case "expiry":
if isinstance(info_dict[item], date):
reagent.expiry = info_dict[item]
else:
reagent.expiry = parse(info_dict[item]).date()
case "type":
reagent_type = lookup_reagent_types(ctx=ctx, name=info_dict[item])
if reagent_type != None:
reagent.type.append(reagent_type)
case "name":
if item == None:
reagent.name = reagent.type.name
else:
reagent.name = info_dict[item]
# add end-of-life extension from reagent type to expiry date
# NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
return reagent
def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.BasicSubmission, dict]:
"""
Construct submission object from dictionary pulled from gui form
Args:
ctx (Settings): settings object passed down from gui
info_dict (dict): dictionary to be transformed
Returns:
models.BasicSubmission: Constructed submission object
"""
# convert submission type into model name
query = info_dict['submission_type'].replace(" ", "")
# Ensure an rsl plate number exists for the plate
if not check_regex_match("^RSL", info_dict["rsl_plate_num"]):
instance = None
msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
else:
# enforce conventions on the rsl plate number from the form
info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name
# check database for existing object
instance = lookup_submissions(ctx=ctx, rsl_number=info_dict['rsl_plate_num'])
# get model based on submission type converted above
logger.debug(f"Looking at models for submission type: {query}")
model = getattr(models, query)
logger.debug(f"We've got the model: {type(model)}")
# if query return nothing, ie doesn't already exist in db
if instance == None:
instance = model()
logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}")
msg = None
code = 0
else:
code = 1
msg = "This submission already exists.\nWould you like to overwrite?"
for item in info_dict:
value = info_dict[item]
logger.debug(f"Setting {item} to {value}")
# set fields based on keys in dictionary
match item:
case "extraction_kit":
logger.debug(f"Looking up kit {value}")
field_value = lookup_kit_types(ctx=ctx, name=value)
logger.debug(f"Got {field_value} for kit {value}")
case "submitting_lab":
logger.debug(f"Looking up organization: {value}")
field_value = lookup_organizations(ctx=ctx, name=value)
logger.debug(f"Got {field_value} for organization {value}")
case "submitter_plate_num":
logger.debug(f"Submitter plate id: {value}")
field_value = value
case "samples":
instance = construct_samples(ctx=ctx, instance=instance, samples=value)
continue
case "submission_type":
field_value = lookup_submission_type(ctx=ctx, name=value)
case _:
field_value = value
# insert into field
try:
setattr(instance, item, field_value)
except AttributeError:
logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
continue
except KeyError:
continue
# calculate cost of the run: immutable cost + mutable times number of columns
# This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
try:
logger.debug(f"Calculating costs for procedure...")
instance.calculate_base_cost()
except (TypeError, AttributeError) as e:
logger.debug(f"Looks like that kit doesn't have cost breakdown yet due to: {e}, using full plate cost.")
instance.run_cost = instance.extraction_kit.cost_per_run
logger.debug(f"Calculated base run cost of: {instance.run_cost}")
# Apply any discounts that are applicable for client and kit.
try:
logger.debug("Checking and applying discounts...")
discounts = [item.amount for item in lookup_discounts(ctx=ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)]
logger.debug(f"We got discounts: {discounts}")
if len(discounts) > 0:
discounts = sum(discounts)
instance.run_cost = instance.run_cost - discounts
except Exception as e:
logger.error(f"An unknown exception occurred when calculating discounts: {e}")
# We need to make sure there's a proper rsl plate number
logger.debug(f"We've got a total cost of {instance.run_cost}")
try:
logger.debug(f"Constructed instance: {instance.to_string()}")
except AttributeError as e:
logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
logger.debug(f"Constructed submissions message: {msg}")
return instance, {'code':code, 'message':msg}
def construct_samples(ctx:Settings, instance:models.BasicSubmission, samples:List[dict]) -> models.BasicSubmission:
"""
constructs sample objects and adds to submission
Args:
ctx (Settings): settings passed down from gui
instance (models.BasicSubmission): Submission samples scraped from.
samples (List[dict]): List of parsed samples
Returns:
models.BasicSubmission: Updated submission object.
"""
for sample in samples:
sample_instance = lookup_samples(ctx=ctx, submitter_id=sample['sample'].submitter_id)
if sample_instance == None:
sample_instance = sample['sample']
else:
logger.warning(f"Sample {sample} already exists, creating association.")
logger.debug(f"Adding {sample_instance.__dict__}")
if sample_instance in instance.samples:
logger.error(f"Looks like there's a duplicate sample on this plate: {sample_instance.submitter_id}!")
continue
try:
with ctx.database_session.no_autoflush:
try:
sample_query = sample_instance.sample_type.replace('Sample', '').strip()
logger.debug(f"Here is the sample instance type: {sample_instance}")
try:
assoc = getattr(models, f"{sample_query}Association")
except AttributeError as e:
logger.error(f"Couldn't get type specific association. Getting generic.")
assoc = models.SubmissionSampleAssociation
assoc = assoc(submission=instance, sample=sample_instance, row=sample['row'], column=sample['column'])
instance.submission_sample_associations.append(assoc)
except IntegrityError:
logger.error(f"Hit integrity error for: {sample}")
continue
except SAWarning:
logger.error(f"Looks like the association already exists for submission: {instance} and sample: {sample_instance}")
continue
except IntegrityError as e:
logger.critical(e)
continue
return instance
def construct_kit_from_yaml(ctx:Settings, exp:dict) -> dict:
"""
Create and store a new kit in the database based on a .yml file
TODO: split into create and store functions
Args:
ctx (Settings): Context object passed down from frontend
exp (dict): Experiment dictionary created from yaml file
Returns:
dict: a dictionary containing results of db addition
"""
from tools import check_is_power_user, massage_common_reagents
# Don't want just anyone adding kits
if not check_is_power_user(ctx=ctx):
logger.debug(f"{getuser()} does not have permission to add kits.")
return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
# iterate through keys in dict
for type in exp:
# A submission type may use multiple kits.
for kt in exp[type]['kits']:
logger.debug(f"Looking up submission type: {type}")
# submission_type = lookup_submissiontype_by_name(ctx=ctx, type_name=type)
submission_type = lookup_submission_type(ctx=ctx, name=type)
logger.debug(f"Looked up submission type: {submission_type}")
kit = models.KitType(name=kt)
kt_st_assoc = models.SubmissionTypeKitTypeAssociation(kit_type=kit, submission_type=submission_type)
kt_st_assoc.constant_cost = exp[type]["kits"][kt]["constant_cost"]
kt_st_assoc.mutable_cost_column = exp[type]["kits"][kt]["mutable_cost_column"]
kt_st_assoc.mutable_cost_sample = exp[type]["kits"][kt]["mutable_cost_sample"]
kit.kit_submissiontype_associations.append(kt_st_assoc)
# A kit contains multiple reagent types.
for r in exp[type]['kits'][kt]['reagenttypes']:
# check if reagent type already exists.
r = massage_common_reagents(r)
look_up = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==r).first()
if look_up == None:
rt = models.ReagentType(name=r.strip(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), last_used="")
else:
rt = look_up
assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses={})
ctx.database_session.add(rt)
kit.kit_reagenttype_associations.append(assoc)
logger.debug(f"Kit construction reagent type: {rt.__dict__}")
logger.debug(f"Kit construction kit: {kit.__dict__}")
ctx.database_session.add(kit)
ctx.database_session.commit()
return {'code':0, 'message':'Kit has been added', 'status': 'information'}
def construct_org_from_yaml(ctx:Settings, org:dict) -> dict:
"""
Create and store a new organization based on a .yml file
Args:
ctx (Settings): Context object passed down from frontend
org (dict): Dictionary containing organization info.
Returns:
dict: dictionary containing results of db addition
"""
from tools import check_is_power_user
# Don't want just anyone adding in clients
if not check_is_power_user(ctx=ctx):
logger.debug(f"{getuser()} does not have permission to add kits.")
return {'code':1, 'message':"This user does not have permission to add organizations."}
# the yml can contain multiple clients
for client in org:
cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
# a client can contain multiple contacts
for contact in org[client]['contacts']:
cont_name = list(contact.keys())[0]
# check if contact already exists
look_up = ctx.database_session.query(models.Contact).filter(models.Contact.name==cont_name).first()
if look_up == None:
cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
else:
cli_cont = look_up
cli_cont.organization.append(cli_org)
ctx.database_session.add(cli_cont)
logger.debug(f"Client creation contact: {cli_cont.__dict__}")
logger.debug(f"Client creation client: {cli_org.__dict__}")
ctx.database_session.add(cli_org)
ctx.database_session.commit()
return {"code":0, "message":"Organization has been added."}

View File

@@ -0,0 +1,483 @@
from .. import models
from tools import Settings, RSLNamer
from typing import List
import logging
from datetime import date, datetime
from dateutil.parser import parse
from sqlalchemy.orm.query import Query
from sqlalchemy import and_, JSON
from sqlalchemy.orm import Session
logger = logging.getLogger(f"submissions.{__name__}")
def query_return(query:Query, limit:int=0):
with query.session.no_autoflush:
match limit:
case 0:
return query.all()
case 1:
return query.first()
case _:
return query.limit(limit).all()
def setup_lookup(ctx:Settings, locals:dict) -> Session:
for k, v in locals.items():
if k == "kwargs":
continue
if isinstance(v, dict):
raise ValueError("Cannot use dictionary in query. Make sure you parse it first.")
# return create_database_session(ctx=ctx)
return ctx.database_session
################## Basic Lookups ####################################
def lookup_reagents(ctx:Settings,
reagent_type:str|models.ReagentType|None=None,
lot_number:str|None=None,
limit:int=0
) -> models.Reagent|List[models.Reagent]:
"""
Lookup a list of reagents from the database.
Args:
ctx (Settings): Settings object passed down from gui
reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None.
lot_number (str | None, optional): Reagent lot number. Defaults to None.
limit (int, optional): limit of results returned. Defaults to 0.
Returns:
models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter.
"""
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Reagent)
match reagent_type:
case str():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==reagent_type)
case models.ReagentType():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.filter(models.Reagent.type.contains(reagent_type))
case _:
pass
match lot_number:
case str():
logger.debug(f"Looking up reagent by lot number: {lot_number}")
query = query.filter(models.Reagent.lot==lot_number)
# In this case limit number returned.
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_kit_types(ctx:Settings,
name:str=None,
used_for:str|None=None,
id:int|None=None,
limit:int=0
) -> models.KitType|List[models.KitType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitType)
match used_for:
case str():
logger.debug(f"Looking up kit type by use: {used_for}")
query = query.filter(models.KitType.used_for.any(name=used_for))
case _:
pass
match name:
case str():
logger.debug(f"Looking up kit type by name: {name}")
query = query.filter(models.KitType.name==name)
limit = 1
case _:
pass
match id:
case int():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(models.KitType.id==id)
limit = 1
case str():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(models.KitType.id==int(id))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_reagent_types(ctx:Settings,
name: str|None=None,
kit_type: models.KitType|str|None=None,
reagent: models.Reagent|str|None=None,
limit:int=0,
) -> models.ReagentType|List[models.ReagentType]:
"""
_summary_
Args:
ctx (Settings): Settings object passed down from gui.
name (str | None, optional): Reagent type name. Defaults to None.
limit (int, optional): limit of results to return. Defaults to 0.
Returns:
models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query = setup_lookup(ctx=ctx, locals=locals()).query(models.ReagentType)
if (kit_type != None and reagent == None) or (reagent != None and kit_type == None):
raise ValueError("Cannot filter without both reagent and kit type.")
elif kit_type == None and reagent == None:
pass
else:
match kit_type:
case str():
kit_type = lookup_kit_types(ctx=ctx, name=kit_type)
case _:
pass
match reagent:
case str():
reagent = lookup_reagents(ctx=ctx, lot_number=reagent)
case _:
pass
return list(set(kit_type.reagent_types).intersection(reagent.type))[0]
match name:
case str():
logger.debug(f"Looking up reagent type by name: {name}")
query = query.filter(models.ReagentType.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_submissions(ctx:Settings,
submission_type:str|models.SubmissionType|None=None,
id:int|str|None=None,
rsl_number:str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
reagent:models.Reagent|str|None=None,
chronologic:bool=False, limit:int=0,
**kwargs
) -> models.BasicSubmission | List[models.BasicSubmission]:
model = models.find_subclasses(parent=models.BasicSubmission, attrs=kwargs)
query = setup_lookup(ctx=ctx, locals=locals()).query(model)
# by submission type
match submission_type:
case models.SubmissionType():
logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(models.BasicSubmission.submission_type_name==submission_type.name)
query = query.filter(model.submission_type_name==submission_type.name)
case str():
logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(models.BasicSubmission.submission_type_name==submission_type)
query = query.filter(model.submission_type_name==submission_type)
case _:
pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
# query = query.filter(models.BasicSubmission.submitted_date.between(start_date, end_date))
query = query.filter(model.submitted_date.between(start_date, end_date))
# by reagent (for some reason)
match reagent:
case str():
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
reagent = lookup_reagents(ctx=ctx, lot_number=reagent)
query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all()
case models.Reagent:
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all()
case _:
pass
# by rsl number (returns only a single value)
match rsl_number:
case str():
logger.debug(f"Looking up BasicSubmission with rsl number: {rsl_number}")
rsl_number = RSLNamer(ctx=ctx, instr=rsl_number).parsed_name
# query = query.filter(models.BasicSubmission.rsl_plate_num==rsl_number)
query = query.filter(model.rsl_plate_num==rsl_number)
limit = 1
case _:
pass
# by id (returns only a single value)
match id:
case int():
logger.debug(f"Looking up BasicSubmission with id: {id}")
# query = query.filter(models.BasicSubmission.id==id)
query = query.filter(model.id==id)
limit = 1
case str():
logger.debug(f"Looking up BasicSubmission with id: {id}")
# query = query.filter(models.BasicSubmission.id==int(id))
query = query.filter(model.id==int(id))
limit = 1
case _:
pass
for k, v in kwargs.items():
attr = getattr(model, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
if chronologic:
# query.order_by(models.BasicSubmission.submitted_date)
query.order_by(model.submitted_date)
return query_return(query=query, limit=limit)
def lookup_submission_type(ctx:Settings,
name:str|None=None,
limit:int=0
) -> models.SubmissionType|List[models.SubmissionType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionType)
match name:
case str():
logger.debug(f"Looking up submission type by name: {name}")
query = query.filter(models.SubmissionType.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_organizations(ctx:Settings,
name:str|None=None,
limit:int=0,
) -> models.Organization|List[models.Organization]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Organization)
match name:
case str():
logger.debug(f"Looking up organization with name: {name}")
query = query.filter(models.Organization.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_discounts(ctx:Settings,
organization:models.Organization|str|int,
kit_type:models.KitType|str|int,
) -> models.Discount|List[models.Discount]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Discount)
match organization:
case models.Organization():
logger.debug(f"Looking up discount with organization: {organization}")
organization = organization.id
case str():
logger.debug(f"Looking up discount with organization: {organization}")
organization = lookup_organizations(ctx=ctx, name=organization).id
case int():
logger.debug(f"Looking up discount with organization id: {organization}")
pass
case _:
raise ValueError(f"Invalid value for organization: {organization}")
match kit_type:
case models.KitType():
logger.debug(f"Looking up discount with kit type: {kit_type}")
kit_type = kit_type.id
case str():
logger.debug(f"Looking up discount with kit type: {kit_type}")
kit_type = lookup_kit_types(ctx=ctx, name=kit_type).id
case int():
logger.debug(f"Looking up discount with kit type id: {organization}")
pass
case _:
raise ValueError(f"Invalid value for kit type: {kit_type}")
return query.join(models.KitType).join(models.Organization).filter(and_(
models.KitType.id==kit_type,
models.Organization.id==organization
)).all()
def lookup_controls(ctx:Settings,
control_type:models.ControlType|str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
limit:int=0
) -> models.Control|List[models.Control]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Control)
# by control type
match control_type:
case models.ControlType():
logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(models.ControlType).filter(models.ControlType==control_type)
case str():
logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(models.ControlType).filter(models.ControlType.name==control_type)
case _:
pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
query = query.filter(models.Control.submitted_date.between(start_date, end_date))
return query_return(query=query, limit=limit)
def lookup_control_types(ctx:Settings, limit:int=0) -> models.ControlType|List[models.ControlType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.ControlType)
return query_return(query=query, limit=limit)
def lookup_samples(ctx:Settings,
submitter_id:str|None=None,
sample_type:str|None=None,
limit:int=0,
**kwargs
) -> models.BasicSample|models.WastewaterSample|List[models.BasicSample]:
logger.debug(f"Length of kwargs: {len(kwargs)}")
model = models.find_subclasses(parent=models.BasicSample, attrs=kwargs)
query = setup_lookup(ctx=ctx, locals=locals()).query(model)
match submitter_id:
case str():
logger.debug(f"Looking up {model} with submitter id: {submitter_id}")
query = query.filter(models.BasicSample.submitter_id==submitter_id)
limit = 1
case _:
pass
match sample_type:
case str():
logger.debug(f"Looking up {model} with sample type: {sample_type}")
query = query.filter(models.BasicSample.sample_type==sample_type)
case _:
pass
for k, v in kwargs.items():
attr = getattr(model, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
return query_return(query=query, limit=limit)
def lookup_reagenttype_kittype_association(ctx:Settings,
kit_type:models.KitType|str|None,
reagent_type:models.ReagentType|str|None,
limit:int=0
) -> models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitTypeReagentTypeAssociation)
match kit_type:
case models.KitType():
query = query.filter(models.KitTypeReagentTypeAssociation.kit_type==kit_type)
case str():
query = query.join(models.KitType).filter(models.KitType.name==kit_type)
case _:
pass
match reagent_type:
case models.ReagentType():
query = query.filter(models.KitTypeReagentTypeAssociation.reagent_type==reagent_type)
case str():
query = query.join(models.ReagentType).filter(models.ReagentType.name==reagent_type)
case _:
pass
if kit_type != None and reagent_type != None:
limit = 1
return query_return(query=query, limit=limit)
def lookup_submission_sample_association(ctx:Settings,
submission:models.BasicSubmission|str|None=None,
sample:models.BasicSample|str|None=None,
limit:int=0
) -> models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionSampleAssociation)
match submission:
case models.BasicSubmission():
query = query.filter(models.SubmissionSampleAssociation.submission==submission)
case str():
query = query.join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==submission)
case _:
pass
match sample:
case models.BasicSample():
query = query.filter(models.SubmissionSampleAssociation.sample==sample)
case str():
query = query.join(models.BasicSample).filter(models.BasicSample.submitter_id==sample)
case _:
pass
logger.debug(f"Query count: {query.count()}")
if query.count() == 1:
limit = 1
return query_return(query=query, limit=limit)
def lookup_modes(ctx:Settings) -> List[str]:
rel = ctx.database_session.query(models.Control).first()
try:
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.debug(f"Failed to get available modes from db: {e}")
cols = []
return cols
############### Complex Lookups ###################################
def lookup_sub_samp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str|models.BasicSample, rsl_sample_num:str|models.BasicSubmission) -> models.WastewaterAssociation:
"""
_summary_
Args:
ctx (Settings): _description_
rsl_plate_num (str): _description_
sample_submitter_id (_type_): _description_
Returns:
models.SubmissionSampleAssociation: _description_
"""
# logger.debug(f"{type(rsl_plate_num)}, {type(rsl_sample_num)}")
match rsl_plate_num:
case models.BasicSubmission()|models.Wastewater():
# logger.debug(f"Model for rsl_plate_num: {rsl_plate_num}")
first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.filter(models.SubmissionSampleAssociation.submission==rsl_plate_num)
case str():
# logger.debug(f"String for rsl_plate_num: {rsl_plate_num}")
first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.join(models.BasicSubmission)\
.filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)
case _:
logger.error(f"Unknown case for rsl_plate_num {rsl_plate_num}")
match rsl_sample_num:
case models.BasicSample()|models.WastewaterSample():
# logger.debug(f"Model for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
# case models.WastewaterSample:
# second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
case str():
# logger.debug(f"String for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.join(models.BasicSample)\
.filter(models.BasicSample.submitter_id==rsl_sample_num)
case _:
logger.error(f"Unknown case for rsl_sample_num {rsl_sample_num}")
try:
return second_query.first()
except UnboundLocalError:
logger.error(f"Couldn't construct second query")
return None

View File

@@ -0,0 +1,238 @@
'''
Contains convenience functions for using database
'''
from tools import Settings
from .lookups import *
import pandas as pd
import json
from pathlib import Path
import yaml
from .. import models
from . import store_object
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from pprint import pformat
def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) -> pd.DataFrame:
"""
Convert submissions looked up by type to dataframe
Args:
ctx (Settings): settings object passed by gui
submission_type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None.
limit (int): Maximum number of submissions to return. Defaults to 0.
Returns:
pd.DataFrame: dataframe constructed from retrieved submissions
"""
logger.debug(f"Querying Type: {submission_type}")
logger.debug(f"Using limit: {limit}")
# use lookup function to create list of dicts
subs = [item.to_dict() for item in lookup_submissions(ctx=ctx, submission_type=submission_type, limit=limit)]
logger.debug(f"Got {len(subs)} results.")
# make df from dicts (records) in list
df = pd.DataFrame.from_records(subs)
# Exclude sub information
try:
df = df.drop("controls", axis=1)
except:
logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
try:
df = df.drop("ext_info", axis=1)
except:
logger.warning(f"Couldn't drop 'ext_info' column from submissionsheet df.")
try:
df = df.drop("pcr_info", axis=1)
except:
logger.warning(f"Couldn't drop 'pcr_info' column from submissionsheet df.")
# NOTE: Moved to submissions_to_df function
try:
del df['samples']
except KeyError:
pass
try:
del df['reagents']
except KeyError:
pass
try:
del df['comments']
except KeyError:
pass
return df
def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]:
"""
Get subtypes for a control analysis mode
Args:
ctx (Settings): settings object passed from gui
type (str): control type name
mode (str): analysis mode name
Returns:
list[str]: list of subtype names
"""
# Only the first control of type is necessary since they all share subtypes
try:
outs = lookup_controls(ctx=ctx, control_type=type, limit=1)
except (TypeError, IndexError):
return []
# Get analysis mode data as dict
jsoner = json.loads(getattr(outs, mode))
logger.debug(f"JSON out: {jsoner}")
try:
genera = list(jsoner.keys())[0]
except IndexError:
return []
subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
return subtypes
def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType):
"""
Updates the 'last_used' field in kittypes/reagenttypes
Args:
ctx (Settings): settings object passed down from gui
reagent (models.Reagent): reagent to be used for update
kit (models.KitType): kit to be used for lookup
"""
# rt = list(set(reagent.type).intersection(kit.reagent_types))[0]
rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent)
if rt != None:
assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt)
if assoc != None:
if assoc.last_used != reagent.lot:
logger.debug(f"Updating {assoc} last used to {reagent.lot}")
assoc.last_used = reagent.lot
# ctx.database_session.merge(assoc)
# ctx.database_session.commit()
result = store_object(ctx=ctx, object=assoc)
return result
return dict(message=f"Updating last used {rt} was not performed.")
def delete_submission(ctx:Settings, id:int) -> dict|None:
"""
Deletes a submission and its associated samples from the database.
Args:
ctx (Settings): settings object passed down from gui
id (int): id of submission to be deleted.
"""
# In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
# Retrieve submission
sub = lookup_submissions(ctx=ctx, id=id)
# Convert to dict for storing backup as a yml
backup = sub.to_dict()
try:
with open(Path(ctx.backup_path).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
yaml.dump(backup, f)
except KeyError:
pass
ctx.database_session.delete(sub)
try:
ctx.database_session.commit()
except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e:
ctx.database_session.rollback()
raise e
return None
def update_ww_sample(ctx:Settings, sample_obj:dict) -> dict|None:
"""
Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary
Args:
ctx (Settings): settings object passed down from gui
sample_obj (dict): dictionary representing new values for database object
"""
logger.debug(f"dictionary to use for update: {pformat(sample_obj)}")
logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}")
assoc = lookup_submission_sample_association(ctx=ctx, submission=sample_obj['plate_rsl'], sample=sample_obj['sample'])
if assoc != None:
for key, value in sample_obj.items():
# set attribute 'key' to 'value'
try:
check = getattr(assoc, key)
except AttributeError as e:
logger.error(f"Item doesn't have field {key} due to {e}")
continue
if check != value:
logger.debug(f"Setting association key: {key} to {value}")
try:
setattr(assoc, key, value)
except AttributeError as e:
logger.error(f"Can't set field {key} to {value} due to {e}")
continue
else:
logger.error(f"Unable to find sample {sample_obj['sample']}")
return
result = store_object(ctx=ctx, object=assoc)
return result
def check_kit_integrity(sub:models.BasicSubmission|models.KitType, reagenttypes:list|None=None) -> dict|None:
"""
Ensures all reagents expected in kit are listed in Submission
Args:
sub (BasicSubmission | KitType): Object containing complete list of reagent types.
reagenttypes (list | None, optional): List to check against complete list. Defaults to None.
Returns:
dict|None: Result object containing a message and any missing components.
"""
logger.debug(type(sub))
# What type is sub?
reagenttypes = []
match sub:
case models.BasicSubmission():
# Get all required reagent types for this kit.
ext_kit_rtypes = [item.name for item in sub.extraction_kit.get_reagents(required=True, submission_type=sub.submission_type_name)]
# Overwrite function parameter reagenttypes
for reagent in sub.reagents:
try:
rt = list(set(reagent.type).intersection(sub.extraction_kit.reagent_types))[0].name
logger.debug(f"Got reagent type: {rt}")
reagenttypes.append(rt)
except AttributeError as e:
logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}")
reagenttypes.append(reagent.type[0].name)
case models.KitType():
ext_kit_rtypes = [item.name for item in sub.get_reagents(required=True)]
case _:
raise ValueError(f"There was no match for the integrity object.\n\nCheck to make sure they are imported from the same place because it matters.")
logger.debug(f"Kit reagents: {ext_kit_rtypes}")
logger.debug(f"Submission reagents: {reagenttypes}")
# check if lists are equal
check = set(ext_kit_rtypes) == set(reagenttypes)
logger.debug(f"Checking if reagents match kit contents: {check}")
# what reagent types are in both lists?
missing = list(set(ext_kit_rtypes).difference(reagenttypes))
logger.debug(f"Missing reagents types: {missing}")
# if lists are equal return no problem
if len(missing)==0:
result = None
else:
result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing}
return result
def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission, sample:models.BasicSample, input_dict:dict) -> dict|None:
"""
Inserts PCR results into wastewater submission/sample association
Args:
ctx (Settings): settings object passed down from gui
submission (models.BasicSubmission): Submission object
sample (models.BasicSample): Sample object
input_dict (dict): dictionary with info to be updated.
Returns:
dict|None: result object
"""
assoc = lookup_submission_sample_association(ctx, submission=submission, sample=sample)
for k,v in input_dict.items():
try:
setattr(assoc, k, v)
except AttributeError:
logger.error(f"Can't set {k} to {v}")
result = store_object(ctx=ctx, object=assoc)
return result

View File

@@ -1,12 +1,46 @@
'''
Contains all models for sqlalchemy
'''
from sqlalchemy.ext.declarative import declarative_base
from typing import Any
from sqlalchemy.orm import declarative_base
import logging
from pprint import pformat
Base = declarative_base()
metadata = Base.metadata
logger = logging.getLogger(f"submissions.{__name__}")
def find_subclasses(parent:Any, attrs:dict) -> Any:
"""
Finds subclasses of a parent that does contain all
attributes if the parent does not.
Args:
parent (_type_): Parent class.
attrs (dict): Key:Value dictionary of attributes
Raises:
AttributeError: Raised if no subclass is found.
Returns:
_type_: Parent or subclass.
"""
if len(attrs) == 0:
return parent
if any([not hasattr(parent, attr) for attr in attrs]):
# looks for first model that has all included kwargs
try:
model = [subclass for subclass in parent.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0]
except IndexError as e:
raise AttributeError(f"Couldn't find existing class/subclass of {parent} with all attributes:\n{pformat(attrs)}")
else:
model = parent
logger.debug(f"Using model: {model}")
return model
from .controls import Control, ControlType
from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation
from .organizations import Organization, Contact
from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, BasicSample, SubmissionSampleAssociation, WastewaterAssociation

View File

@@ -19,9 +19,7 @@ class ControlType(Base):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(255), unique=True) #: controltype name (e.g. MCS)
targets = Column(JSON) #: organisms checked for
# instances_id = Column(INTEGER, ForeignKey("_control_samples.id", ondelete="SET NULL", name="fk_ctype_instances_id"))
instances = relationship("Control", back_populates="controltype") #: control samples created of this type.
# UniqueConstraint('name', name='uq_controltype_name')
class Control(Base):
@@ -39,13 +37,14 @@ class Control(Base):
contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism
matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism
kraken = Column(JSON) #: unstructured output from kraken_report
# UniqueConstraint('name', name='uq_control_name')
submission_id = Column(INTEGER, ForeignKey("_submissions.id")) #: parent submission id
submission = relationship("BacterialCulture", back_populates="controls", foreign_keys=[submission_id]) #: parent submission
refseq_version = Column(String(16)) #: version of refseq used in fastq parsing
kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing
kraken2_db_version = Column(String(32)) #: folder name of kraken2 db
def __repr__(self) -> str:
return f"<Control({self.name})>"
def to_sub_dict(self) -> dict:
"""

View File

@@ -5,13 +5,11 @@ from . import Base
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date
import logging
logger = logging.getLogger(f'submissions.{__name__}')
reagenttypes_reagents = Table("_reagenttypes_reagents", Base.metadata, Column("reagent_id", INTEGER, ForeignKey("_reagents.id")), Column("reagenttype_id", INTEGER, ForeignKey("_reagent_types.id")))
@@ -55,21 +53,25 @@ class KitType(Base):
"""
return self.name
def get_reagents(self, required:bool=False) -> list:
def get_reagents(self, required:bool=False, submission_type:str|None=None) -> list:
"""
Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation.
Args:
required (bool, optional): If true only return required types. Defaults to False.
submission_type (str | None, optional): Submission type to narrow results. Defaults to None.
Returns:
list: List of ReagentTypes
list: List of reagent types
"""
if required:
return [item.reagent_type for item in self.kit_reagenttype_associations if item.required == 1]
if submission_type != None:
relevant_associations = [item for item in self.kit_reagenttype_associations if submission_type in item.uses.keys()]
else:
return [item.reagent_type for item in self.kit_reagenttype_associations]
relevant_associations = [item for item in self.kit_reagenttype_associations]
if required:
return [item.reagent_type for item in relevant_associations if item.required == 1]
else:
return [item.reagent_type for item in relevant_associations]
def construct_xl_map_for_use(self, use:str) -> dict:
"""
@@ -97,8 +99,6 @@ class KitType(Base):
map['info'] = {}
return map
class ReagentType(Base):
"""
Base of reagent type abstract
@@ -118,13 +118,7 @@ class ReagentType(Base):
# association proxy of "user_keyword_associations" collection
# to "keyword" attribute
kit_types = association_proxy("kit_reagenttype_associations", "kit_type")
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
kit_types = association_proxy("reagenttype_kit_associations", "kit_type")
def __str__(self) -> str:
"""
@@ -206,12 +200,16 @@ class Reagent(Base):
"""
return str(self.lot)
def to_sub_dict(self, extraction_kit:KitType=None) -> dict:
"""
dictionary containing values necessary for gui
Args:
extraction_kit (KitType, optional): KitType to use to get reagent type. Defaults to None.
Returns:
dict: gui friendly dictionary
dict: _description_
"""
if extraction_kit != None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
@@ -245,6 +243,9 @@ class Reagent(Base):
"""
Returns basic reagent dictionary.
Args:
extraction_kit (KitType, optional): KitType to use to get reagent type. Defaults to None.
Returns:
dict: Basic reagent dictionary of 'type', 'lot', 'expiry'
"""
@@ -268,7 +269,6 @@ class Reagent(Base):
"expiry": self.expiry.strftime("%Y-%m-%d")
}
class Discount(Base):
"""
Relationship table for client labs for certain kits.
@@ -303,7 +303,7 @@ class SubmissionType(Base):
cascade="all, delete-orphan",
)
kit_types = association_proxy("kit_submissiontype_associations", "kit_type")
kit_types = association_proxy("submissiontype_kit_associations", "kit_type")
def __repr__(self) -> str:
return f"<SubmissionType({self.name})>"
@@ -321,7 +321,7 @@ class SubmissionTypeKitTypeAssociation(Base):
kit_type = relationship(KitType, back_populates="kit_submissiontype_associations")
# reference to the "ReagentType" object
# reference to the "SubmissionType" object
submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_associations")
def __init__(self, kit_type=None, submission_type=None):

View File

@@ -5,7 +5,6 @@ from . import Base
from sqlalchemy import Column, String, INTEGER, ForeignKey, Table
from sqlalchemy.orm import relationship
# table containing organization/contact relationship
orgs_contacts = Table("_orgs_contacts", Base.metadata, Column("org_id", INTEGER, ForeignKey("_organizations.id")), Column("contact_id", INTEGER, ForeignKey("_contacts.id")))

View File

@@ -74,10 +74,13 @@ class BasicSubmission(Base):
def to_dict(self, full_data:bool=False) -> dict:
"""
dictionary used in submissions summary
Constructs dictionary used in submissions summary
Args:
full_data (bool, optional): indicates if sample dicts to be constructed. Defaults to False.
Returns:
dict: dictionary used in submissions summary
dict: dictionary used in submissions summary and details
"""
# get lab from nested organization object
logger.debug(f"Converting {self.rsl_plate_num} to dict...")
@@ -113,10 +116,6 @@ class BasicSubmission(Base):
else:
reagents = None
samples = None
# Updated 2023-09 to get sample association with plate number
# for item in self.submission_sample_associations:
# sample = item.sample.to_sub_dict(submission_rsl=self.rsl_plate_num)
# samples.append(sample)
try:
comments = self.comment
except:
@@ -383,7 +382,6 @@ class BasicSample(Base):
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
# self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
# Since there is no PCR, negliable result is necessary.
return dict(name=self.submitter_id, positive=False)

View File

@@ -6,15 +6,14 @@ import pprint
from typing import List
import pandas as pd
from pathlib import Path
from backend.db import lookup_sample_by_submitter_id, get_reagents_in_extkit, lookup_kittype_by_name, lookup_submissiontype_by_name, models
from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples
from backend.pydant import PydSubmission, PydReagent
import logging
from collections import OrderedDict
import re
import numpy as np
from datetime import date
from dateutil.parser import parse, ParserError
from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings, convert_well_to_row_column
from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings
from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector
logger = logging.getLogger(f"submissions.{__name__}")
@@ -106,11 +105,14 @@ class SheetParser(object):
self.sub[k] = v
logger.debug(f"Parser.sub after info scrape: {pprint.pformat(self.sub)}")
def parse_reagents(self):
def parse_reagents(self, extraction_kit:str|None=None):
"""
Pulls reagent info from the excel sheet
"""
self.sub['reagents'] = ReagentParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=self.sub['extraction_kit']).parse_reagents()
if extraction_kit == None:
extraction_kit = extraction_kit=self.sub['extraction_kit']
logger.debug(f"Parsing reagents for {extraction_kit}")
self.sub['reagents'] = ReagentParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=extraction_kit).parse_reagents()
def parse_samples(self):
"""
@@ -180,7 +182,8 @@ class SheetParser(object):
"""
Enforce that only allowed reagents get into the Pydantic Model
"""
allowed_reagents = [item.name for item in get_reagents_in_extkit(ctx=self.ctx, kit_name=self.sub['extraction_kit']['value'])]
kit = lookup_kit_types(ctx=self.ctx, name=self.sub['extraction_kit']['value'])
allowed_reagents = [item.name for item in kit.get_reagents()]
logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}")
self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents]
@@ -217,7 +220,8 @@ class InfoParser(object):
if isinstance(submission_type, str):
submission_type = dict(value=submission_type, parsed=False)
logger.debug(f"Looking up submission type: {submission_type['value']}")
submission_type = lookup_submissiontype_by_name(ctx=self.ctx, type_name=submission_type['value'])
# submission_type = lookup_submissiontype_by_name(ctx=self.ctx, type_name=submission_type['value'])
submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value'])
info_map = submission_type.info_map
return info_map
@@ -269,7 +273,9 @@ class ReagentParser(object):
self.xl = xl
def fetch_kit_info_map(self, extraction_kit:dict, submission_type:str):
kit = lookup_kittype_by_name(ctx=self.ctx, name=extraction_kit['value'])
if isinstance(extraction_kit, dict):
extraction_kit = extraction_kit['value']
kit = lookup_kit_types(ctx=self.ctx, name=extraction_kit)
if isinstance(submission_type, dict):
submission_type = submission_type['value']
reagent_map = kit.construct_xl_map_for_use(submission_type.title())
@@ -300,9 +306,9 @@ class ReagentParser(object):
logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}")
lot = str(lot)
listo.append(dict(value=PydReagent(type=item.strip(), lot=lot, exp=expiry, name=name), parsed=parsed))
logger.debug(f"Returning listo: {listo}")
return listo
class SampleParser(object):
"""
object to pull data for samples in excel sheet and construct individual sample objects
@@ -331,23 +337,48 @@ class SampleParser(object):
if isinstance(self.lookup_table, pd.DataFrame):
self.parse_lookup_table()
def fetch_sample_info_map(self, submission_type:dict) -> dict:
def fetch_sample_info_map(self, submission_type:str) -> dict:
"""
Gets info locations in excel book for submission type.
Args:
submission_type (str): submission type
Returns:
dict: Info locations.
"""
logger.debug(f"Looking up submission type: {submission_type}")
submission_type = lookup_submissiontype_by_name(ctx=self.ctx, type_name=submission_type)
submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type)
logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}")
sample_info_map = submission_type.info_map['samples']
return sample_info_map
def construct_plate_map(self, plate_map_location:dict) -> pd.DataFrame:
"""
Gets location of samples from plate map grid in excel sheet.
Args:
plate_map_location (dict): sheet name, start/end row/column
Returns:
pd.DataFrame: Plate map grid
"""
df = self.xl.parse(plate_map_location['sheet'], header=None, dtype=object)
df = df.iloc[plate_map_location['start_row']-1:plate_map_location['end_row'], plate_map_location['start_column']-1:plate_map_location['end_column']]
# logger.debug(f"Input dataframe for plate map: {df}")
df = pd.DataFrame(df.values[1:], columns=df.iloc[0])
df = df.set_index(df.columns[0])
# logger.debug(f"Output dataframe for plate map: {df}")
return df
def construct_lookup_table(self, lookup_table_location) -> pd.DataFrame:
def construct_lookup_table(self, lookup_table_location:dict) -> pd.DataFrame:
"""
Gets table of misc information from excel book
Args:
lookup_table_location (dict): sheet name, start/end row
Returns:
pd.DataFrame: _description_
"""
try:
df = self.xl.parse(lookup_table_location['sheet'], header=None, dtype=object)
except KeyError:
@@ -355,16 +386,17 @@ class SampleParser(object):
df = df.iloc[lookup_table_location['start_row']-1:lookup_table_location['end_row']]
df = pd.DataFrame(df.values[1:], columns=df.iloc[0])
df = df.reset_index(drop=True)
# logger.debug(f"Dataframe for lookup table: {df}")
return df
def create_basic_dictionaries_from_plate_map(self):
"""
Parse sample location/name from plate map
"""
invalids = [0, "0", "EMPTY"]
new_df = self.plate_map.dropna(axis=1, how='all')
columns = new_df.columns.tolist()
for _, iii in new_df.iterrows():
for c in columns:
# logger.debug(f"Checking sample {iii[c]}")
if check_not_nan(iii[c]):
if iii[c] in invalids:
logger.debug(f"Invalid sample name: {iii[c]}, skipping.")
@@ -378,8 +410,10 @@ class SampleParser(object):
self.samples.append(dict(submitter_id=id, row=row_keys[iii._name], column=c))
def parse_lookup_table(self):
"""
Parse misc info from lookup table.
"""
def determine_if_date(input_str) -> str|date:
# logger.debug(f"Looks like we have a str: {input_str}")
regex = re.compile(r"^\d{4}-?\d{2}-?\d{2}")
if bool(regex.search(input_str)):
logger.warning(f"{input_str} is a date!")
@@ -407,11 +441,19 @@ class SampleParser(object):
sample[k] = v
logger.debug(f"Output sample dict: {sample}")
def parse_samples(self, generate:bool=True) -> List[dict]:
def parse_samples(self, generate:bool=True) -> List[dict]|List[models.BasicSample]:
"""
Parse merged platemap\lookup info into dicts/samples
Args:
generate (bool, optional): Indicates if sample objects to be generated from dicts. Defaults to True.
Returns:
List[dict]|List[models.BasicSample]: List of samples
"""
result = None
new_samples = []
for ii, sample in enumerate(self.samples):
# logger.debug(f"\n\n{new_samples}\n\n")
try:
if sample['submitter_id'] in [check_sample['sample'].submitter_id for check_sample in new_samples]:
sample['submitter_id'] = f"{sample['submitter_id']}-{ii}"
@@ -432,7 +474,6 @@ class SampleParser(object):
translated_dict[k] = convert_nans_to_nones(v)
translated_dict['sample_type'] = f"{self.submission_type} Sample"
parser_query = f"parse_{translated_dict['sample_type'].replace(' ', '_').lower()}"
# logger.debug(f"New sample dictionary going into object creation:\n{translated_dict}")
try:
custom_parser = getattr(self, parser_query)
translated_dict = custom_parser(translated_dict)
@@ -445,6 +486,15 @@ class SampleParser(object):
return result, new_samples
def generate_sample_object(self, input_dict) -> models.BasicSample:
"""
Constructs sample object from dict
Args:
input_dict (dict): sample information
Returns:
models.BasicSample: Sample object
"""
query = input_dict['sample_type'].replace(" ", "")
try:
database_obj = getattr(models, query)
@@ -452,13 +502,12 @@ class SampleParser(object):
logger.error(f"Could not find the model {query}. Using generic.")
database_obj = models.BasicSample
logger.debug(f"Searching database for {input_dict['submitter_id']}...")
instance = lookup_sample_by_submitter_id(ctx=self.ctx, submitter_id=input_dict['submitter_id'])
instance = lookup_samples(ctx=self.ctx, submitter_id=input_dict['submitter_id'])
if instance == None:
logger.debug(f"Couldn't find sample {input_dict['submitter_id']}. Creating new sample.")
instance = database_obj()
for k,v in input_dict.items():
try:
# setattr(instance, k, v)
instance.set_attribute(k, v)
except Exception as e:
logger.error(f"Failed to set {k} due to {type(e).__name__}: {e}")
@@ -511,12 +560,27 @@ class SampleParser(object):
return input_dict
def parse_first_strand_sample(self, input_dict:dict) -> dict:
"""
Update sample dictionary with first strand specific information
Args:
input_dict (dict): Input sample dictionary
Returns:
dict: Updated sample dictionary
"""
logger.debug("Called first strand sample parser")
input_dict['well'] = re.search(r"\s\((.*)\)$", input_dict['submitter_id']).groups()[0]
input_dict['submitter_id'] = re.sub(r"\s\(.*\)$", "", str(input_dict['submitter_id'])).strip()
return input_dict
def grab_plates(self):
def grab_plates(self) -> List[str]:
"""
Parse plate names from
Returns:
List[str]: list of plate names.
"""
plates = []
for plate in self.plates:
df = self.xl.parse(plate['sheet'], header=None)
@@ -527,7 +591,6 @@ class SampleParser(object):
plates.append(output)
return plates
class PCRParser(object):
"""
Object to pull data from Design and Analysis PCR export file.
@@ -574,7 +637,6 @@ class PCRParser(object):
sheet_name (str): Name of sheet in excel workbook that holds info.
"""
df = self.xl.parse(sheet_name=sheet_name, dtype=object).fillna("")
# self.pcr['file'] = df.iloc[1][1]
self.pcr['comment'] = df.iloc[0][1]
self.pcr['operator'] = df.iloc[1][1]
self.pcr['barcode'] = df.iloc[2][1]
@@ -615,7 +677,6 @@ class PCRParser(object):
except ValueError:
logger.error("Well call number doesn't match sample number")
logger.debug(f"Well call df: {well_call_df}")
# iloc is [row][column]
for ii, row in self.samples_df.iterrows():
try:
sample_obj = [sample for sample in self.samples if sample['sample'] == row[3]][0]
@@ -623,14 +684,8 @@ class PCRParser(object):
sample_obj = dict(
sample = row['Sample'],
plate_rsl = self.plate_num,
# elution_well = row['Well Position']
)
logger.debug(f"Got sample obj: {sample_obj}")
# logger.debug(f"row: {row}")
# rsl_num = row[3]
# # logger.debug(f"Looking up: {rsl_num}")
# ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=self.ctx, rsl_number=rsl_num)
# logger.debug(f"Got: {ww_samp}")
if isinstance(row['Cq'], float):
sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq']
else:
@@ -639,20 +694,6 @@ class PCRParser(object):
sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment']
except KeyError:
logger.error(f"No assessment for {sample_obj['sample']}")
# match row["Target"]:
# case "N1":
# if isinstance(row['Cq'], float):
# sample_obj['ct_n1'] = row["Cq"]
# else:
# sample_obj['ct_n1'] = 0.0
# sample_obj['n1_status'] = row['Assessment']
# case "N2":
# if isinstance(row['Cq'], float):
# sample_obj['ct_n2'] = row['Assessment']
# else:
# sample_obj['ct_n2'] = 0.0
# case _:
# logger.warning(f"Unexpected input for row[4]: {row["Target"]}")
self.samples.append(sample_obj)

View File

@@ -14,7 +14,7 @@ env = jinja_template_loading()
logger = logging.getLogger(f"submissions.{__name__}")
def make_report_xlsx(records:list[dict]) -> DataFrame:
def make_report_xlsx(records:list[dict]) -> Tuple[DataFrame, DataFrame]:
"""
create the dataframe for a report
@@ -92,7 +92,6 @@ def convert_data_list_to_df(ctx:dict, input:list[dict], subtype:str|None=None) -
"""
df = DataFrame.from_records(input)
# df.to_excel("test.xlsx", engine="openpyxl")
safe = ['name', 'submitted_date', 'genus', 'target']
for column in df.columns:
if "percent" in column:
@@ -102,7 +101,6 @@ def convert_data_list_to_df(ctx:dict, input:list[dict], subtype:str|None=None) -
if column not in safe:
if subtype != None and column != subtype:
del df[column]
# logger.debug(df)
# move date of sample submitted on same date as previous ahead one.
df = displace_date(df)
# ad hoc method to make data labels more accurate.
@@ -215,14 +213,10 @@ def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
"""
if 'rerun_regex' in ctx:
sample_names = get_unique_values_in_df_column(df, column_name="name")
# logger.debug(f"Compiling regex from: {settings['rerun_regex']}")
rerun_regex = re.compile(fr"{ctx['rerun_regex']}")
for sample in sample_names:
# logger.debug(f'Running search on {sample}')
if rerun_regex.search(sample):
# logger.debug(f'Match on {sample}')
first_run = re.sub(rerun_regex, "", sample)
# logger.debug(f"First run: {first_run}")
df = df.drop(df[df.name == first_run].index)
return df

View File

@@ -1,3 +1,6 @@
'''
Contains pydantic models and accompanying validators
'''
import uuid
from pydantic import BaseModel, field_validator, Extra, Field
from datetime import date, datetime
@@ -9,7 +12,7 @@ from pathlib import Path
import re
import logging
from tools import check_not_nan, convert_nans_to_nones, Settings
from backend.db.functions import lookup_submission_by_rsl_num
from backend.db.functions import lookup_submissions
@@ -47,13 +50,15 @@ class PydReagent(BaseModel):
@field_validator("exp", mode="before")
@classmethod
def enforce_date(cls, value):
# if isinstance(value, float) or value == np.nan:
# raise ValueError(f"Date cannot be a float: {value}")
# else:
# return value
if value != None:
if isinstance(value, int):
match value:
case int():
return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2).date()
case str():
return parse(value)
case date():
return value
case _:
return convert_nans_to_nones(str(value))
if value == None:
value = date.today()
@@ -83,14 +88,7 @@ class PydSubmission(BaseModel, extra=Extra.allow):
technician: dict|None
reagents: List[dict] = []
samples: List[Any]
# missing_fields: List[str] = []
# @field_validator("submitter_plate_num", mode="before")
# @classmethod
# def rescue_submitter_id(cls, value):
# if value == None:
# return dict(value=None, parsed=False)
# return value
@field_validator("submitter_plate_num")
@classmethod
@@ -146,12 +144,9 @@ class PydSubmission(BaseModel, extra=Extra.allow):
@classmethod
def rsl_from_file(cls, value, values):
logger.debug(f"RSL-plate initial value: {value['value']}")
# if isinstance(values.data['submission_type'], dict):
# sub_type = values.data['submission_type']['value']
# elif isinstance(values.data['submission_type'], str):
sub_type = values.data['submission_type']['value']
if check_not_nan(value['value']):
if lookup_submission_by_rsl_num(ctx=values.data['ctx'], rsl_num=value['value']) == None:
if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None:
return dict(value=value['value'], parsed=True)
else:
logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath")
@@ -178,18 +173,6 @@ class PydSubmission(BaseModel, extra=Extra.allow):
return dict(value=convert_nans_to_nones(value['value']), parsed=False)
return value
# @field_validator("reagents")
# @classmethod
# def remove_atcc(cls, value):
# return_val = []
# for reagent in value:
# logger.debug(f"Pydantic reagent: {reagent}")
# if reagent['value'].type == None:
# continue
# else:
# return_val.append(reagent)
# return return_val
@field_validator("sample_count", mode='before')
@classmethod
def rescue_sample_count(cls, value):
@@ -212,17 +195,11 @@ class PydSubmission(BaseModel, extra=Extra.allow):
return dict(value=None, parsed=False)
return value
# @field_validator("extraction_kit")
# @classmethod
# def enforce_kit(cls, value, values):
# from frontend.custom_widgets.pop_ups import KitSelector
# if value['value'] == None:
# return dict(value=KitSelector(values.data['ctx'], title="Select Extraction Kit", message="No extraction kit was found, please select from below."))
# return value
@field_validator("submission_type", mode='before')
@classmethod
def make_submission_type(cls, value, values):
if not isinstance(value, dict):
value = {"value": value}
if check_not_nan(value['value']):
value = value['value'].title()
return dict(value=value, parsed=True)

View File

@@ -12,9 +12,8 @@ from PyQt6.QtGui import QAction
from PyQt6.QtWebEngineWidgets import QWebEngineView
from pathlib import Path
from backend.db import (
construct_reagent, get_all_Control_Types_names, get_all_available_modes, store_reagent
construct_reagent, store_object, lookup_control_types, lookup_modes
)
# from .main_window_functions import *
from .all_window_functions import extract_form_info
from tools import check_if_app, Settings
from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker
@@ -63,7 +62,7 @@ class App(QMainWindow):
menuBar = self.menuBar()
fileMenu = menuBar.addMenu("&File")
# Creating menus using a title
editMenu = menuBar.addMenu("&Edit")
# editMenu = menuBar.addMenu("&Edit")
methodsMenu = menuBar.addMenu("&Methods")
reportMenu = menuBar.addMenu("&Reports")
maintenanceMenu = menuBar.addMenu("&Monthly")
@@ -213,13 +212,12 @@ class App(QMainWindow):
if dlg.exec():
# extract form info
info = extract_form_info(dlg)
logger.debug(f"dictionary from form: {info}")
# return None
logger.debug(f"Reagent info: {info}")
# create reagent object
reagent = construct_reagent(ctx=self.ctx, info_dict=info)
# send reagent to db
store_reagent(ctx=self.ctx, reagent=reagent)
# store_reagent(ctx=self.ctx, reagent=reagent)
result = store_object(ctx=self.ctx, object=reagent)
return reagent
def generateReport(self):
@@ -302,6 +300,14 @@ class App(QMainWindow):
self, result = construct_first_strand_function(self)
self.result_reporter(result)
def scrape_reagents(self, *args, **kwargs):
from .main_window_functions import scrape_reagents
logger.debug(f"Args: {args}")
logger.debug(F"kwargs: {kwargs}")
self, result = scrape_reagents(self, args[0])
self.kit_integrity_completion()
self.result_reporter(result)
class AddSubForm(QWidget):
def __init__(self, parent):
@@ -348,11 +354,13 @@ class AddSubForm(QWidget):
self.tab2.layout = QVBoxLayout(self)
self.control_typer = QComboBox()
# fetch types of controls
con_types = get_all_Control_Types_names(ctx=parent.ctx)
# con_types = get_all_Control_Types_names(ctx=parent.ctx)
con_types = [item.name for item in lookup_control_types(ctx=parent.ctx)]
self.control_typer.addItems(con_types)
# create custom widget to get types of analysis
self.mode_typer = QComboBox()
mode_types = get_all_available_modes(ctx=parent.ctx)
# mode_types = get_all_available_modes(ctx=parent.ctx)
mode_types = lookup_modes(ctx=parent.ctx)
self.mode_typer.addItems(mode_types)
# create custom widget to get subtypes of analysis
self.sub_typer = QComboBox()

View File

@@ -22,14 +22,12 @@ def select_open_file(obj:QMainWindow, file_extension:str) -> Path:
Returns:
Path: Path of file to be opened
"""
# home_dir = str(Path(obj.ctx["directory_path"]))
try:
home_dir = Path(obj.ctx.directory_path).resolve().__str__()
except FileNotFoundError:
home_dir = Path.home().resolve().__str__()
fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{file_extension}(*.{file_extension})")[0])
# fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', filter = f"{file_extension}(*.{file_extension})")[0])
return fname
def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path:
@@ -45,7 +43,6 @@ def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path:
Path: Path of file to be opened
"""
try:
# home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__()
home_dir = Path(obj.ctx.directory_path).joinpath(default_name).resolve().__str__()
except FileNotFoundError:
home_dir = Path.home().joinpath(default_name).resolve().__str__()

View File

@@ -12,8 +12,8 @@ from PyQt6.QtWidgets import (
from PyQt6.QtCore import Qt, QDate, QSize
from tools import check_not_nan, jinja_template_loading, Settings
from ..all_window_functions import extract_form_info
from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, \
lookup_regent_by_type_name, lookup_last_used_reagenttype_lot, lookup_all_reagent_names_by_role
from backend.db import construct_kit_from_yaml, \
lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association
import logging
import numpy as np
from .pop_ups import AlertPop
@@ -27,7 +27,7 @@ class AddReagentForm(QDialog):
"""
dialog to add gather info about new reagent
"""
def __init__(self, ctx:dict, reagent_lot:str|None, reagent_type:str|None, expiry:date|None=None, reagent_name:str|None=None) -> None:
def __init__(self, ctx:dict, reagent_lot:str|None=None, reagent_type:str|None=None, expiry:date|None=None, reagent_name:str|None=None) -> None:
super().__init__()
self.ctx = ctx
if reagent_lot == None:
@@ -60,7 +60,7 @@ class AddReagentForm(QDialog):
# widget to get reagent type info
self.type_input = QComboBox()
self.type_input.setObjectName('type')
self.type_input.addItems([item for item in get_all_reagenttype_names(ctx=ctx)])
self.type_input.addItems([item.name for item in lookup_reagent_types(ctx=ctx)])
logger.debug(f"Trying to find index of {reagent_type}")
# convert input to user friendly string?
try:
@@ -85,9 +85,13 @@ class AddReagentForm(QDialog):
self.type_input.currentTextChanged.connect(self.update_names)
def update_names(self):
"""
Updates reagent names form field with examples from reagent type
"""
logger.debug(self.type_input.currentText())
self.name_input.clear()
self.name_input.addItems(item for item in lookup_all_reagent_names_by_role(ctx=self.ctx, role_name=self.type_input.currentText().replace(" ", "_").lower()))
lookup = lookup_reagents(ctx=self.ctx, reagent_type=self.type_input.currentText())
self.name_input.addItems(list(set([item.name for item in lookup])))
class ReportDatePicker(QDialog):
"""
@@ -103,17 +107,17 @@ class ReportDatePicker(QDialog):
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
# widgets to ask for dates
start_date = QDateEdit(calendarPopup=True)
start_date.setObjectName("start_date")
start_date.setDate(QDate.currentDate())
end_date = QDateEdit(calendarPopup=True)
end_date.setObjectName("end_date")
end_date.setDate(QDate.currentDate())
self.start_date = QDateEdit(calendarPopup=True)
self.start_date.setObjectName("start_date")
self.start_date.setDate(QDate.currentDate())
self.end_date = QDateEdit(calendarPopup=True)
self.end_date.setObjectName("end_date")
self.end_date.setDate(QDate.currentDate())
self.layout = QVBoxLayout()
self.layout.addWidget(QLabel("Start Date"))
self.layout.addWidget(start_date)
self.layout.addWidget(self.start_date)
self.layout.addWidget(QLabel("End Date"))
self.layout.addWidget(end_date)
self.layout.addWidget(self.end_date)
self.layout.addWidget(self.buttonBox)
self.setLayout(self.layout)
@@ -139,7 +143,8 @@ class KitAdder(QWidget):
used_for = QComboBox()
used_for.setObjectName("used_for")
# Insert all existing sample types
used_for.addItems(lookup_all_sample_types(ctx=parent_ctx))
# used_for.addItems(lookup_all_sample_types(ctx=parent_ctx))
used_for.addItems([item.name for item in lookup_submission_type(ctx=parent_ctx)])
used_for.setEditable(True)
self.grid.addWidget(used_for,3,1)
# set cost per run
@@ -203,7 +208,7 @@ class KitAdder(QWidget):
yml_type[used]['kits'][info['kit_name']]['reagenttypes'] = reagents
logger.debug(yml_type)
# send to kit constructor
result = create_kit_from_yaml(ctx=self.ctx, exp=yml_type)
result = construct_kit_from_yaml(ctx=self.ctx, exp=yml_type)
msg = AlertPop(message=result['message'], status=result['status'])
msg.exec()
self.__init__(self.ctx)
@@ -212,18 +217,20 @@ class ReagentTypeForm(QWidget):
"""
custom widget to add information about a new reagenttype
"""
def __init__(self, parent_ctx:dict) -> None:
def __init__(self, ctx:dict) -> None:
super().__init__()
grid = QGridLayout()
self.setLayout(grid)
grid.addWidget(QLabel("Name (*Exactly* as it appears in the excel submission form):"),0,0)
# Widget to get reagent info
reagent_getter = QComboBox()
reagent_getter.setObjectName("name")
self.reagent_getter = QComboBox()
self.reagent_getter.setObjectName("name")
# lookup all reagent type names from db
reagent_getter.addItems(get_all_reagenttype_names(ctx=parent_ctx))
reagent_getter.setEditable(True)
grid.addWidget(reagent_getter,0,1)
lookup = lookup_reagent_types(ctx=ctx)
logger.debug(f"Looked up ReagentType names: {lookup}")
self.reagent_getter.addItems([item.__str__() for item in lookup])
self.reagent_getter.setEditable(True)
grid.addWidget(self.reagent_getter,0,1)
grid.addWidget(QLabel("Extension of Life (months):"),0,2)
# widget to get extension of life
eol = QSpinBox()
@@ -257,12 +264,14 @@ class ControlsDatePicker(QWidget):
class ImportReagent(QComboBox):
def __init__(self, ctx:dict, reagent:PydReagent, extraction_kit:str):
def __init__(self, ctx:Settings, reagent:dict|PydReagent, extraction_kit:str):
super().__init__()
self.setEditable(True)
if isinstance(reagent, dict):
reagent = PydReagent(**reagent)
# Ensure that all reagenttypes have a name that matches the items in the excel parser
query_var = reagent.type
logger.debug(f"Import Reagent is looking at: {reagent.lot} for {reagent.type}")
logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}")
if isinstance(reagent.lot, np.float64):
logger.debug(f"{reagent.lot} is a numpy float!")
try:
@@ -272,7 +281,8 @@ class ImportReagent(QComboBox):
# query for reagents using type name from sheet and kit from sheet
logger.debug(f"Attempting lookup of reagents by type: {query_var}")
# below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]
lookup = lookup_reagents(ctx=ctx, reagent_type=query_var)
relevant_reagents = [item.__str__() for item in lookup]
output_reg = []
for rel_reagent in relevant_reagents:
# extract strings from any sets.
@@ -289,7 +299,8 @@ class ImportReagent(QComboBox):
relevant_reagents.insert(0, str(reagent.lot))
else:
# TODO: look up the last used reagent of this type in the database
looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type, extraction_kit=extraction_kit)
looked_up_rt = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=reagent.type, kit_type=extraction_kit)
looked_up_reg = lookup_reagents(ctx=ctx, lot_number=looked_up_rt.last_used)
logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}")
if looked_up_reg != None:
relevant_reagents.remove(str(looked_up_reg.lot))
@@ -309,12 +320,14 @@ class ImportReagent(QComboBox):
class ParsedQLabel(QLabel):
def __init__(self, input_object, field_name, title:bool=True):
def __init__(self, input_object, field_name, title:bool=True, label_name:str|None=None):
super().__init__()
try:
check = input_object['parsed']
except:
return
if label_name != None:
self.setObjectName(label_name)
if title:
output = field_name.replace('_', ' ').title()
else:

View File

@@ -7,13 +7,12 @@ from PyQt6.QtWidgets import (
)
from tools import jinja_template_loading
import logging
from backend.db.functions import lookup_kittype_by_use, lookup_all_sample_types
from backend.db.functions import lookup_kit_types, lookup_submission_type
logger = logging.getLogger(f"submissions.{__name__}")
env = jinja_template_loading()
class QuestionAsker(QDialog):
"""
dialog to ask yes/no questions
@@ -28,8 +27,8 @@ class QuestionAsker(QDialog):
self.buttonBox.rejected.connect(self.reject)
self.layout = QVBoxLayout()
# Text for the yes/no question
message = QLabel(message)
self.layout.addWidget(message)
self.message = QLabel(message)
self.layout.addWidget(self.message)
self.layout.addWidget(self.buttonBox)
self.setLayout(self.layout)
@@ -53,7 +52,7 @@ class KitSelector(QDialog):
super().__init__()
self.setWindowTitle(title)
self.widget = QComboBox()
kits = [item.__str__() for item in lookup_kittype_by_use(ctx=ctx)]
kits = [item.__str__() for item in lookup_kit_types(ctx=ctx)]
self.widget.addItems(kits)
self.widget.setEditable(False)
# set yes/no buttons
@@ -72,14 +71,6 @@ class KitSelector(QDialog):
def getValues(self):
return self.widget.currentText()
# @staticmethod
# def launch(parent):
# dlg = KitSelector(parent)
# r = dlg.exec_()
# if r:
# return dlg.getValues()
# return None
class SubmissionTypeSelector(QDialog):
"""
dialog to ask yes/no questions
@@ -88,7 +79,7 @@ class SubmissionTypeSelector(QDialog):
super().__init__()
self.setWindowTitle(title)
self.widget = QComboBox()
sub_type = lookup_all_sample_types(ctx=ctx)
sub_type = [item.name for item in lookup_submission_type(ctx=ctx)]
self.widget.addItems(sub_type)
self.widget.setEditable(False)
# set yes/no buttons

View File

@@ -15,7 +15,7 @@ from PyQt6.QtWidgets import (
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel
from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter
from backend.db import submissions_to_df, lookup_submission_by_id, delete_submission_by_id, lookup_submission_by_rsl_num, hitpick_plate
from backend.db.functions import submissions_to_df, delete_submission, lookup_submissions
from backend.excel import make_hitpicks
from tools import check_if_app, Settings
from tools import jinja_template_loading
@@ -33,6 +33,7 @@ env = jinja_template_loading()
class pandasModel(QAbstractTableModel):
"""
pandas model for inserting summary sheet into gui
NOTE: Copied from Stack Overflow. I have no idea how it actually works.
"""
def __init__(self, data) -> None:
QAbstractTableModel.__init__(self)
@@ -73,7 +74,6 @@ class pandasModel(QAbstractTableModel):
return self._data.columns[col]
return None
class SubmissionsSheet(QTableView):
"""
presents submission summary to user in tab1
@@ -98,24 +98,13 @@ class SubmissionsSheet(QTableView):
"""
sets data in model
"""
self.data = submissions_to_df(ctx=self.ctx, limit=100)
self.data = submissions_to_df(ctx=self.ctx)
try:
self.data['id'] = self.data['id'].apply(str)
self.data['id'] = self.data['id'].str.zfill(3)
except KeyError:
pass
try:
del self.data['samples']
except KeyError:
pass
try:
del self.data['reagents']
except KeyError:
pass
try:
del self.data['comments']
except KeyError:
pass
proxyModel = QSortFilterProxyModel()
proxyModel.setSourceModel(pandasModel(self.data))
self.setModel(proxyModel)
@@ -132,6 +121,9 @@ class SubmissionsSheet(QTableView):
pass
def create_barcode(self) -> None:
"""
Generates a window for displaying barcode
"""
index = (self.selectionModel().currentIndex())
value = index.sibling(index.row(),1).data()
logger.debug(f"Selected value: {value}")
@@ -140,6 +132,9 @@ class SubmissionsSheet(QTableView):
dlg.print_barcode()
def add_comment(self) -> None:
"""
Generates a text editor window.
"""
index = (self.selectionModel().currentIndex())
value = index.sibling(index.row(),1).data()
logger.debug(f"Selected value: {value}")
@@ -147,7 +142,6 @@ class SubmissionsSheet(QTableView):
if dlg.exec():
dlg.add_comment()
def contextMenuEvent(self, event):
"""
Creates actions for right click menu events.
@@ -174,25 +168,23 @@ class SubmissionsSheet(QTableView):
# add other required actions
self.menu.popup(QCursor.pos())
def delete_item(self, event):
"""
Confirms user deletion and sends id to backend for deletion.
Args:
event (_type_): _description_
event (_type_): the item of interest
"""
index = (self.selectionModel().currentIndex())
value = index.sibling(index.row(),0).data()
logger.debug(index)
msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n")
if msg.exec():
delete_submission_by_id(ctx=self.ctx, id=value)
delete_submission(ctx=self.ctx, id=value)
else:
return
self.setData()
def hit_pick(self):
"""
Extract positive samples from submissions with PCR results and export to csv.
@@ -207,7 +199,7 @@ class SubmissionsSheet(QTableView):
logger.error(f"Error: Had to truncate number of plates to 4.")
indices = indices[:4]
# lookup ids in the database
subs = [lookup_submission_by_id(self.ctx, id) for id in indices]
subs = [lookup_submissions(ctx=self.ctx, id=id) for id in indices]
# full list of samples
dicto = []
# list to contain plate images
@@ -217,7 +209,6 @@ class SubmissionsSheet(QTableView):
if iii > 3:
logger.error(f"Error: Had to truncate number of plates to 4.")
continue
# plate_dicto = hitpick_plate(submission=sub, plate_number=iii+1)
plate_dicto = sub.hitpick_plate(plate_number=iii+1)
if plate_dicto == None:
continue
@@ -252,7 +243,6 @@ class SubmissionsSheet(QTableView):
except Exception as e:
logger.error(f"Could not show image: {e}.")
class SubmissionDetails(QDialog):
"""
a window showing text details of submission
@@ -262,41 +252,18 @@ class SubmissionDetails(QDialog):
super().__init__()
self.ctx = ctx
self.setWindowTitle("Submission Details")
# create scrollable interior
interior = QScrollArea()
interior.setParent(self)
# get submision from db
data = lookup_submission_by_id(ctx=ctx, id=id)
logger.debug(f"Submission details data:\n{pprint.pformat(data.to_dict())}")
self.base_dict = data.to_dict(full_data=True)
sub = lookup_submissions(ctx=ctx, id=id)
logger.debug(f"Submission details data:\n{pprint.pformat(sub.to_dict())}")
self.base_dict = sub.to_dict(full_data=True)
# don't want id
del self.base_dict['id']
# retrieve jinja template
# template = env.get_template("submission_details.txt")
# render using object dict
# text = template.render(sub=self.base_dict)
# create text field
# txt_editor = QTextEdit(self)
# txt_editor.setReadOnly(True)
# txt_editor.document().setPlainText(text)
# resize
# font = txt_editor.document().defaultFont()
# fontMetrics = QFontMetrics(font)
# textSize = fontMetrics.size(0, txt_editor.toPlainText())
# w = textSize.width() + 10
# h = textSize.height() + 10
# txt_editor.setMinimumSize(w, h)
# txt_editor.setMaximumSize(w, h)
# txt_editor.resize(w, h)
# interior.resize(w,900)
# txt_editor.setText(text)
# interior.setWidget(txt_editor)
logger.debug(f"Creating barcode.")
if not check_if_app():
self.base_dict['barcode'] = base64.b64encode(make_plate_barcode(self.base_dict['Plate Number'], width=120, height=30)).decode('utf-8')
sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=self.base_dict['Plate Number'])
# plate_dicto = hitpick_plate(sub)
logger.debug(f"Hitpicking plate...")
plate_dicto = sub.hitpick_plate()
logger.debug(f"Making platemap...")
@@ -307,7 +274,6 @@ class SubmissionDetails(QDialog):
platemap.save(image_io, 'JPEG')
except AttributeError:
logger.error(f"No plate map found for {sub.rsl_plate_num}")
# platemap.save("test.jpg", 'JPEG')
self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8')
template = env.get_template("submission_details.html")
self.html = template.render(sub=self.base_dict)
@@ -325,31 +291,11 @@ class SubmissionDetails(QDialog):
btn.setFixedWidth(900)
btn.clicked.connect(self.export)
def export(self):
"""
Renders submission to html, then creates and saves .pdf file to user selected file.
"""
# template = env.get_template("submission_details.html")
# # make barcode because, reasons
# self.base_dict['barcode'] = base64.b64encode(make_plate_barcode(self.base_dict['Plate Number'], width=120, height=30)).decode('utf-8')
# sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=self.base_dict['Plate Number'])
# plate_dicto = hitpick_plate(sub)
# platemap = make_plate_map(plate_dicto)
# logger.debug(f"platemap: {platemap}")
# image_io = BytesIO()
# try:
# platemap.save(image_io, 'JPEG')
# except AttributeError:
# logger.error(f"No plate map found for {sub.rsl_plate_num}")
# # platemap.save("test.jpg", 'JPEG')
# self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8')
# logger.debug(self.base_dict)
# html = template.render(sub=self.base_dict)
# with open("test.html", "w") as f:
# f.write(html)
try:
# home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__()
home_dir = Path(self.ctx.directory_path).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__()
except FileNotFoundError:
home_dir = Path.home().resolve().__str__()
@@ -421,6 +367,9 @@ class BarcodeWindow(QDialog):
def print_barcode(self):
"""
Sends barcode image to printer.
"""
printer = QtPrintSupport.QPrinter()
dialog = QtPrintSupport.QPrintDialog(printer)
if dialog.exec():
@@ -439,7 +388,7 @@ class SubmissionComment(QDialog):
"""
a window for adding comment text to a submission
"""
def __init__(self, ctx:dict, rsl:str) -> None:
def __init__(self, ctx:Settings, rsl:str) -> None:
super().__init__()
self.ctx = ctx
@@ -460,18 +409,22 @@ class SubmissionComment(QDialog):
self.setLayout(self.layout)
def add_comment(self):
"""
Adds comment to submission object.
"""
commenter = getuser()
comment = self.txt_editor.toPlainText()
dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
full_comment = {"name":commenter, "time": dt, "text": comment}
logger.debug(f"Full comment: {full_comment}")
sub = lookup_submission_by_rsl_num(ctx = self.ctx, rsl_num=self.rsl)
# sub = lookup_submission_by_rsl_num(ctx = self.ctx, rsl_num=self.rsl)
sub = lookup_submissions(ctx = self.ctx, rsl_number=self.rsl)
try:
sub.comment.append(full_comment)
except AttributeError:
sub.comment = [full_comment]
logger.debug(sub.__dict__)
self.ctx['database_session'].add(sub)
self.ctx['database_session'].commit()
self.ctx.database_session.add(sub)
self.ctx.database_session.commit()

View File

@@ -22,16 +22,12 @@ from PyQt6.QtWidgets import (
from .all_window_functions import extract_form_info, select_open_file, select_save_file
from PyQt6.QtCore import QSignalBlocker
from backend.db.functions import (
lookup_all_orgs, lookup_kittype_by_use, lookup_kittype_by_name,
construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range,
create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type,
lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_subsampassoc_with_pcr,
check_kit_integrity, lookup_sub_samp_association_by_plate_sample, lookup_ww_sample_by_processing_number,
lookup_sample_by_submitter_id, update_last_used
construct_submission_info, lookup_reagents, construct_kit_from_yaml, construct_org_from_yaml, get_control_subtypes,
update_subsampassoc_with_pcr, check_kit_integrity, update_last_used, lookup_organizations, lookup_kit_types,
lookup_submissions, lookup_controls, lookup_samples, lookup_submission_sample_association, store_object
)
from backend.excel.parser import SheetParser, PCRParser, SampleParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
from backend.pydant import PydReagent
from tools import check_not_nan, convert_well_to_row_column
from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker
@@ -55,8 +51,7 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
logger.debug(obj.ctx)
# initialize samples
obj.samples = []
obj.reagents = []
obj.missing_reagents = []
obj.missing_info = []
# set file dialog
@@ -67,31 +62,20 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
return obj, result
# create sheetparser using excel sheet and context from gui
try:
prsr = SheetParser(ctx=obj.ctx, filepath=fname)
obj.prsr = SheetParser(ctx=obj.ctx, filepath=fname)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return obj, result
# prsr.sub = import_validation_check(ctx=obj.ctx, parser_sub=prsr.sub)
# obj.column_count = prsr.column_count
try:
logger.debug(f"Submission dictionary:\n{pprint.pformat(prsr.sub)}")
pyd = prsr.to_pydantic()
logger.debug(f"Submission dictionary:\n{pprint.pformat(obj.prsr.sub)}")
pyd = obj.prsr.to_pydantic()
logger.debug(f"Pydantic result: \n\n{pprint.pformat(pyd)}\n\n")
except Exception as e:
return obj, dict(message= f"Problem creating pydantic model:\n\n{e}", status="critical")
try:
obj.xl = prsr.filepath
except Exception as e:
logger.error(f"Unable to make obj xl.")
# for sample in pyd.samples:
# if hasattr(sample, "elution_well"):
# logger.debug(f"Sample from import: {sample.elution_well}")
# I don't remember why this is here.
obj.current_submission_type = pyd.submission_type['value']
# destroy any widgets from previous imports
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
obj.current_submission_type = pyd.submission_type['value']
# Get list of fields from pydantic model.
fields = list(pyd.model_fields.keys()) + list(pyd.model_extra.keys())
fields.remove('filepath')
@@ -107,13 +91,11 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
label = ParsedQLabel(value, field)
match field:
case 'submitting_lab':
# create label
# label = QLabel(field.replace("_", " ").title())
# label = ParsedQLabel(value, field)
logger.debug(f"{field}: {value['value']}")
# create combobox to hold looked up submitting labs
add_widget = QComboBox()
labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
# labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)]
# try to set closest match to top of list
try:
labs = difflib.get_close_matches(value['value'], labs, len(labs), 0)
@@ -122,9 +104,6 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# set combobox values to lookedup values
add_widget.addItems(labs)
case 'extraction_kit':
# create label
# label = QLabel(field.replace("_", " ").title())
# if extraction kit not available, all other values fail
if not check_not_nan(value['value']):
msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
@@ -132,21 +111,21 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# create combobox to hold looked up kits
add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser
# uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type['value'].lower())]
logger.debug(f"Looking up kits used for {pyd.submission_type['value']}")
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_for=pyd.submission_type['value'])]
# uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_for=pyd.submission_type['value'])]
uses = [item.__str__() for item in lookup_kit_types(ctx=obj.ctx, used_for=pyd.submission_type['value'])]
logger.debug(f"Kits received for {pyd.submission_type['value']}: {uses}")
if check_not_nan(value['value']):
logger.debug(f"The extraction kit in parser was: {value['value']}")
uses.insert(0, uses.pop(uses.index(value['value'])))
obj.ext_kit = value['value']
else:
logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
logger.error(f"Couldn't find {obj.prsr.sub['extraction_kit']}")
obj.ext_kit = uses[0]
add_widget.addItems(uses)
# Run reagent scraper whenever extraction kit is changed.
add_widget.currentTextChanged.connect(obj.scrape_reagents)
# add_widget.addItems(uses)
case 'submitted_date':
# create label
# label = QLabel(field.replace("_", " ").title())
# uses base calendar
add_widget = QDateEdit(calendarPopup=True)
# sets submitted date based on date found in excel sheet
@@ -163,40 +142,10 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
case "ctx":
continue
case 'reagents':
for reagent in value:
# create label
# reg_label = QLabel(reagent['type'].replace("_", " ").title())
reg_label = ParsedQLabel(reagent, reagent['value'].type, title=False)
if reagent['parsed']:
# try:
# reg_label = QLabel(f"Parsed Lot: {reagent['value'].type}")
obj.reagents.append(reagent['value'])
# except AttributeError:
# continue
else:
# try:
# reg_label = QLabel(f"MISSING Lot: {reagent['value'].type}")
obj.missing_reagents.append(reagent['value'])
# NOTE: This is now set to run when the extraction kit is updated.
continue
# except AttributeError:
# continue
# reg_label.setObjectName(f"lot_{reagent['type']}_label")
reg_label.setObjectName(f"lot_{reagent['value'].type}_label")
# create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent['value'], extraction_kit=pyd.extraction_kit['value'])
add_widget.setObjectName(f"lot_{reagent['value'].type}")
logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(reg_label)
obj.table_widget.formlayout.addWidget(add_widget)
continue
# case "rsl_plate_num":
# label = QLabel(field.replace("_", " ").title())
# add_widget = QLineEdit()
# logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}")
# add_widget.setText(str(value['value']).replace("_", " "))
case _:
# anything else gets added in as a line edit
# label = QLabel(field.replace("_", " ").title())
add_widget = QLineEdit()
logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}")
add_widget.setText(str(value['value']).replace("_", " "))
@@ -207,13 +156,11 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
obj.table_widget.formlayout.addWidget(add_widget)
except AttributeError as e:
logger.error(e)
kit_widget = obj.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit')
kit_widget.addItems(uses)
# compare obj.reagents with expected reagents in kit
if hasattr(obj, 'ext_kit'):
obj.kit_integrity_completion()
# obj.missing_reagents = obj.missing_reagents + missing_info
logger.debug(f"Imported reagents: {obj.reagents}")
if prsr.sample_result != None:
msg = AlertPop(message=prsr.sample_result, status="WARNING")
if obj.prsr.sample_result != None:
msg = AlertPop(message=obj.prsr.sample_result, status="WARNING")
msg.exec()
logger.debug(f"Pydantic extra fields: {pyd.model_extra}")
if "csv" in pyd.model_extra:
@@ -255,31 +202,30 @@ def kit_integrity_completion_function(obj:QMainWindow) -> Tuple[QMainWindow, dic
"""
result = None
logger.debug(inspect.currentframe().f_back.f_code.co_name)
# find the widget that contains lit info
# find the widget that contains kit info
kit_widget = obj.table_widget.formlayout.parentWidget().findChild(QComboBox, 'extraction_kit')
logger.debug(f"Kit selector: {kit_widget}")
# get current kit info
# get current kit being used
obj.ext_kit = kit_widget.currentText()
for item in obj.reagents:
obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':True}, item.type, title=False, label_name=f"lot_{item.type}_label"))
reagent = dict(type=item.type, lot=item.lot, exp=item.exp, name=item.name)
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent, extraction_kit=obj.ext_kit)
obj.table_widget.formlayout.addWidget(add_widget)
logger.debug(f"Checking integrity of {obj.ext_kit}")
# get the kit from database using current kit info
# kit = lookup_kittype_by_name(ctx=obj.ctx, name=obj.ext_kit)
# get all reagents stored in the QWindow object
# reagents_to_lookup = [item.name for item in obj.missing_reagents]
# logger.debug(f"Reagents for lookup for {kit.name}: {reagents_to_lookup}")
# make sure kit contains all necessary info
# kit_integrity = check_kit_integrity(kit, reagents_to_lookup)
# if kit integrity comes back with an error, make widgets with missing reagents using default info
# if kit_integrity != None:
# result = dict(message=kit_integrity['message'], status="Warning")
# obj.missing_reagents = kit_integrity['missing']
# for item in kit_integrity['missing']:
# see if there are any missing reagents
if len(obj.missing_reagents) > 0:
result = dict(message=f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.type.upper() for item in obj.missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", status="Warning")
for item in obj.missing_reagents:
obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':False}, item.type, title=False))
# Add label that has parsed as False to show "MISSING" label.
obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':False}, item.type, title=False, label_name=f"missing_{item.type}_label"))
# Set default parameters for the empty reagent.
reagent = dict(type=item.type, lot=None, exp=date.today(), name=None)
add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent), extraction_kit=obj.ext_kit)#item=item)
# create and add widget
# add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent), extraction_kit=obj.ext_kit)
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent, extraction_kit=obj.ext_kit)
obj.table_widget.formlayout.addWidget(add_widget)
# Add submit button to the form.
submit_btn = QPushButton("Submit")
submit_btn.setObjectName("lot_submit_btn")
obj.table_widget.formlayout.addWidget(submit_btn)
@@ -309,7 +255,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
# compare reagents in form to reagent database
for reagent in reagents:
# Lookup any existing reagent of this type with this lot number
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent)
wanted_reagent = lookup_reagents(ctx=obj.ctx, lot_number=reagents[reagent], reagent_type=reagent)
logger.debug(f"Looked up reagent: {wanted_reagent}")
# if reagent not found offer to add to database
if wanted_reagent == None:
@@ -362,8 +308,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
if kit_integrity != None:
return obj, dict(message=kit_integrity['message'], status="critical")
logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
result = store_submission(ctx=obj.ctx, base_submission=base_submission)
# check result of storing for issues
result = store_object(ctx=obj.ctx, object=base_submission)
# update summary sheet
obj.table_widget.sub_wid.setData()
# reset form
@@ -372,7 +317,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"All attributes of obj: {pprint.pformat(obj.__dict__)}")
if len(obj.missing_reagents + obj.missing_info) > 0:
logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.")
extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit)
extraction_kit = lookup_kit_types(ctx=obj.ctx, name=obj.ext_kit)
logger.debug(f"We have the extraction kit: {extraction_kit.name}")
excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type)
logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}")
@@ -410,7 +355,8 @@ def generate_report_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
info = extract_form_info(dlg)
logger.debug(f"Report info: {info}")
# find submissions based on date range
subs = lookup_submissions_by_date_range(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date'])
# subs = lookup_submissions_by_date_range(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date'])
subs = lookup_submissions(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date'])
# convert each object to dict
records = [item.report_dict() for item in subs]
# make dataframe from record dictionaries
@@ -452,7 +398,7 @@ def add_kit_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
Tuple[QMainWindow, dict]: Collection of new main app window and result dict
"""
result = None
# setup file dialog to find yaml flie
# setup file dialog to find yaml file
fname = select_open_file(obj, file_extension="yml")
assert fname.exists()
# read yaml file
@@ -466,7 +412,7 @@ def add_kit_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
except PermissionError:
return
# send to kit creator function
result = create_kit_from_yaml(ctx=obj.ctx, exp=exp)
result = construct_kit_from_yaml(ctx=obj.ctx, exp=exp)
return obj, result
def add_org_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
@@ -494,7 +440,7 @@ def add_org_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
except PermissionError:
return obj, result
# send to kit creator function
result = create_org_from_yaml(ctx=obj.ctx, org=org)
result = construct_org_from_yaml(ctx=obj.ctx, org=org)
return obj, result
def controls_getter_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
@@ -531,6 +477,7 @@ def controls_getter_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
obj.table_widget.sub_typer.clear()
# lookup subtypes
sub_types = get_control_subtypes(ctx=obj.ctx, type=obj.con_type, mode=obj.mode)
# sub_types = lookup_controls(ctx=obj.ctx, control_type=obj.con_type)
if sub_types != []:
# block signal that will rerun controls getter and update sub_typer
with QSignalBlocker(obj.table_widget.sub_typer) as blocker:
@@ -562,7 +509,8 @@ def chart_maker_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
obj.subtype = obj.table_widget.sub_typer.currentText()
logger.debug(f"Subtype: {obj.subtype}")
# query all controls using the type/start and end dates from the gui
controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
# controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
controls = lookup_controls(ctx=obj.ctx, control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
# if no data found from query set fig to none for reporting in webview
if controls == None:
fig = None
@@ -602,9 +550,11 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
Tuple[QMainWindow, dict]: Collection of new main app window and result dict
"""
result = None
all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture")
# all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture")
all_bcs = lookup_submissions(ctx=obj.ctx, submission_type="Bacterial Culture")
logger.debug(all_bcs)
all_controls = get_all_controls(obj.ctx)
# all_controls = get_all_controls(obj.ctx)
all_controls = lookup_controls(ctx=obj.ctx)
ac_list = [control.name for control in all_controls]
count = 0
for bcs in all_bcs:
@@ -617,7 +567,6 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
if " " in sample:
logger.warning(f"There is not supposed to be a space in the sample name!!!")
sample = sample.replace(" ", "")
# if sample not in ac_list:
if not any([ac.startswith(sample) for ac in ac_list]):
continue
else:
@@ -632,24 +581,15 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
else:
logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control")
bcs.controls.append(control)
# bcs.control_id.append(control.id)
control.submission = bcs
control.submission_id = bcs.id
# obj.ctx["database_session"].add(control)
obj.ctx.database_session.add(control)
count += 1
# obj.ctx["database_session"].add(bcs)
obj.ctx.database_session.add(bcs)
logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}")
result = dict(message=f"We added {count} controls to bacterial cultures.", status="information")
logger.debug(result)
# obj.ctx['database_session'].commit()
obj.ctx.database_session.commit()
# msg = QMessageBox()
# msg.setText("Controls added")
# msg.setInformativeText(result)
# msg.setWindowTitle("Controls added")
# msg.exec()
return obj, result
def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
@@ -681,7 +621,8 @@ def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
for ii in range(6, len(run)):
new_run[f"column{str(ii-5)}_vol"] = run[ii]
# Lookup imported submissions
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num'])
# If no such submission exists, move onto the next run
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
@@ -712,8 +653,6 @@ def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}")
else:
sub.extraction_info = json.dumps([new_run])
# obj.ctx['database_session'].add(sub)
# obj.ctx["database_session"].commit()
obj.ctx.database_session.add(sub)
obj.ctx.database_session.commit()
result = dict(message=f"We added {count} logs to the database.", status='information')
@@ -745,7 +684,8 @@ def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
end_time=run[5].strip()
)
# lookup imported submission
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num'])
# if imported submission doesn't exist move on to next run
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
@@ -777,8 +717,6 @@ def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}")
else:
sub.pcr_info = json.dumps([new_run])
# obj.ctx['database_session'].add(sub)
# obj.ctx["database_session"].commit()
obj.ctx.database_session.add(sub)
obj.ctx.database_session.commit()
result = dict(message=f"We added {count} logs to the database.", status='information')
@@ -798,14 +736,16 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
fname = select_open_file(obj, file_extension="xlsx")
parser = PCRParser(ctx=obj.ctx, filepath=fname)
logger.debug(f"Attempting lookup for {parser.plate_num}")
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num)
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
# If no plate is found, may be because this is a repeat. Lop off the '-1' or '-2' and repeat
logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.")
parser.plate_num = "-".join(parser.plate_num.split("-")[:-1])
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num)
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
@@ -830,11 +770,9 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}")
else:
sub.pcr_info = json.dumps([parser.pcr])
# obj.ctx['database_session'].add(sub)
obj.ctx.database_session.add(sub)
logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}")
logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}")
# obj.ctx["database_session"].commit()
obj.ctx.database_session.commit()
logger.debug(f"Got {len(parser.samples)} samples to update!")
logger.debug(f"Parser samples: {parser.samples}")
@@ -844,8 +782,6 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
sample_dict = [item for item in parser.samples if item['sample']==sample.rsl_number][0]
except IndexError:
continue
# sample['plate_rsl'] = sub.rsl_plate_num
# update_ww_sample(ctx=obj.ctx, sample_obj=sample)
update_subsampassoc_with_pcr(ctx=obj.ctx, submission=sub, sample=sample, input_dict=sample_dict)
result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information')
@@ -909,8 +845,8 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
new_info.append(new_item)
logger.debug(f"New reagents: {new_reagents}")
logger.debug(f"New info: {new_info}")
# open the workbook using openpyxl
workbook = load_workbook(obj.xl)
# open a new workbook using openpyxl
workbook = load_workbook(obj.prsr.xl.io)
# get list of sheet names
sheets = workbook.sheetnames
# logger.debug(workbook.sheetnames)
@@ -941,22 +877,25 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
workbook.save(filename=fname.__str__())
def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
"""
Generates a csv file from client submitted xlsx file.
Args:
obj (QMainWindow): Main application
Returns:
Tuple[QMainWindow, dict]: Updated main application and result
"""
def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]:
logger.debug(f"Looking up {input_sample_number} in {plates}")
samp = lookup_ww_sample_by_processing_number(ctx=obj.ctx, processing_number=input_sample_number)
# samp = lookup_ww_sample_by_processing_number(ctx=obj.ctx, processing_number=input_sample_number)
samp = lookup_samples(ctx=obj.ctx, ww_processing_num=input_sample_number)
if samp == None:
samp = lookup_sample_by_submitter_id(ctx=obj.ctx, submitter_id=input_sample_number)
# samp = lookup_sample_by_submitter_id(ctx=obj.ctx, submitter_id=input_sample_number)
samp = lookup_samples(ctx=obj.ctx, submitter_id=input_sample_number)
logger.debug(f"Got sample: {samp}")
# if samp != None:
new_plates = [(iii+1, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate))) for iii, plate in enumerate(plates)]
# for iii, plate in enumerate(plates):
# lplate = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate)
# if lplate == None:
# continue
# else:
# logger.debug(f"Got a plate: {lplate}")
# new_plates.append((iii, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lplate)))
# new_plates = [(iii+1, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate))) for iii, plate in enumerate(plates)]
new_plates = [(iii+1, lookup_submission_sample_association(ctx=obj.ctx, sample=samp, submission=plate)) for iii, plate in enumerate(plates)]
logger.debug(f"Associations: {pprint.pformat(new_plates)}")
try:
plate_num, plate = next(assoc for assoc in new_plates if assoc[1] is not None)
@@ -964,8 +903,6 @@ def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]
plate_num, plate = None, None
logger.debug(f"Plate number {plate_num} is {plate}")
return plate_num, plate
fname = select_open_file(obj=obj, file_extension="xlsx")
xl = pd.ExcelFile(fname)
sprsr = SampleParser(ctx=obj.ctx, xl=xl, submission_type="First Strand")
@@ -988,7 +925,6 @@ def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]
else:
new_dict['destination_row'] = item['row']
new_dict['destination_column'] = item['column']
# assocs = [(iii, lookup_ww_sample_by_processing_number_and_plate(ctx=obj.ctx, processing_number=new_dict['sample'], plate_number=plate)) for iii, plate in enumerate(plates)]
plate_num, plate = get_plates(input_sample_number=new_dict['sample'], plates=plates)
if plate_num == None:
plate_num = str(old_plate_number) + "*"
@@ -1015,3 +951,34 @@ def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]
df.to_csv(ofname, index=False)
return obj, None
def scrape_reagents(obj:QMainWindow, extraction_kit:str) -> Tuple[QMainWindow, dict]:
"""
Extracted scrape reagents function that will run when
form 'extraction_kit' widget is updated.
Args:
obj (QMainWindow): updated main application
extraction_kit (str): name of extraction kit (in 'extraction_kit' widget)
Returns:
Tuple[QMainWindow, dict]: Updated application and result
"""
logger.debug("\n\nHello from reagent scraper!!\n\n")
logger.debug(f"Extraction kit: {extraction_kit}")
obj.reagents = []
obj.missing_reagents = []
[item.setParent(None) for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget) if item.objectName().startswith("lot_") or item.objectName().startswith("missing_")]
reagents = obj.prsr.parse_reagents(extraction_kit=extraction_kit)
logger.debug(f"Got reagents: {reagents}")
for reagent in obj.prsr.sub['reagents']:
# create label
if reagent['parsed']:
obj.reagents.append(reagent['value'])
else:
obj.missing_reagents.append(reagent['value'])
logger.debug(f"Imported reagents: {obj.reagents}")
logger.debug(f"Missing reagents: {obj.missing_reagents}")
return obj, None

View File

@@ -7,7 +7,7 @@
<body>
<h2><u>Submission Details for {{ sub['Plate Number'] }}</u></h2>&nbsp;&nbsp;&nbsp;{% if sub['barcode'] %}<img align='right' height="30px" width="120px" src="data:image/jpeg;base64,{{ sub['barcode'] | safe }}">{% endif %}
<p>{% for key, value in sub.items() if key not in excluded %}
&nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key }}: </b>{% if key=='Cost' %} {{ "${:,.2f}".format(value) }}{% else %}{{ value }}{% endif %}<br>
&nbsp;&nbsp;&nbsp;&nbsp;<b>{{ key }}: </b>{% if key=='Cost' %}{% if sub['Cost'] %} {{ "${:,.2f}".format(value) }}{% endif %}{% else %}{{ value }}{% endif %}<br>
{% endfor %}</p>
<h3><u>Reagents:</u></h3>
<p>{% for item in sub['reagents'] %}

View File

@@ -89,13 +89,13 @@ def convert_nans_to_nones(input_str) -> str|None:
def create_reagent_list(in_dict:dict) -> list[str]:
"""
Makes list of reagent types without "lot\_" prefix for each key in a dictionary
Makes list of reagent types without "lot_" prefix for each key in a dictionary
Args:
in_dict (dict): input dictionary of reagents
Returns:
list[str]: list of reagent types with "lot\_" prefix removed.
list[str]: list of reagent types with "lot_" prefix removed.
"""
return [item.strip("lot_") for item in in_dict.keys()]
@@ -320,7 +320,7 @@ class Settings(BaseSettings):
"""
directory_path: Path
database_path: Path|None = None
database_path: Path|str|None = None
backup_path: Path
super_users: list|None = None
power_users: list|None = None
@@ -344,6 +344,8 @@ class Settings(BaseSettings):
@field_validator('database_path', mode="before")
@classmethod
def ensure_database_exists(cls, value):
if value == ":memory:":
return value
if isinstance(value, str):
value = Path(value)
if value.exists():
@@ -366,17 +368,18 @@ class Settings(BaseSettings):
else:
database_path = package_dir.joinpath("submissions.db")
else:
if database_path == ":memory:":
pass
# check if user defined path is directory
if database_path.is_dir():
elif database_path.is_dir():
database_path = database_path.joinpath("submissions.db")
# check if user defined path is a file
elif database_path.is_file():
database_path = database_path
else:
raise FileNotFoundError("No database file found. Exiting program.")
# sys.exit()
logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}")
engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True)
session = Session(engine)
return session
@@ -387,7 +390,7 @@ class Settings(BaseSettings):
if value == None:
return package
def get_config(settings_path: Path|str|None=None) -> dict:
def get_config(settings_path: Path|str|None=None) -> Settings:
"""
Get configuration settings from path or default if blank.
@@ -417,7 +420,6 @@ def get_config(settings_path: Path|str|None=None) -> dict:
LOGDIR.mkdir(parents=True)
except FileExistsError:
pass
# if user hasn't defined config path in cli args
if settings_path == None:
# Check user .config/submissions directory
@@ -448,20 +450,12 @@ def get_config(settings_path: Path|str|None=None) -> dict:
settings_path = settings_path
else:
logger.error("No config.yml file found. Writing to directory.")
# raise FileNotFoundError("No config.yml file found. Cannot continue.")
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
return Settings(**copy_settings(settings_path=settings_path, settings=default_settings))
logger.debug(f"Using {settings_path} for config file.")
with open(settings_path, "r") as stream:
# try:
settings = yaml.load(stream, Loader=yaml.Loader)
# except yaml.YAMLError as exc:
# logger.error(f'Error reading yaml file {settings_path}: {exc}'
# return {}
# copy settings to config directory
# if copy_settings_trigger:
# settings = copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=settings)
return Settings(**settings)
def create_database_session(database_path: Path|str|None=None) -> Session: