Pre-major refactor

This commit is contained in:
Landon Wark
2023-11-03 10:49:37 -05:00
parent 22a23b7838
commit 5570d87b7c
19 changed files with 1078 additions and 1045 deletions

View File

@@ -1,4 +1,9 @@
- [ ] Update artic submission type database entry to - [ ] Clear out any unnecessary ctx passes now that queries are improved.
- [ ] Make a 'query or create' method in all db objects to go with new query.
- [ ] Ensure Bacterial plates end up with RSL_YY_###_{submitterName}_{submitterPlateID}.xlsx format.
- [x] Move lookup functions into class methods of db objects?
- Not sure if will work for associations.
- [x] Update artic submission type database entry to add more technicians.
- [ ] Document code - [ ] Document code
- [x] Rewrite tests... again. - [x] Rewrite tests... again.
- [x] Have InfoItem change status self.missing to True if value changed. - [x] Have InfoItem change status self.missing to True if value changed.

View File

@@ -1,4 +1,5 @@
''' '''
All database related operations. All database related operations.
''' '''
# from .functions import * from .models import *
from .functions import *

View File

@@ -1,25 +1,36 @@
''' '''Contains or imports all database convenience functions'''
Contains convenience functions for using database
'''
import sys
from tools import Settings from tools import Settings
from .lookups import * from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
import logging
import pandas as pd import pandas as pd
import json import json
from pathlib import Path from pathlib import Path
import yaml from .models import *
from .. import models # from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from . import store_object # from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from pprint import pformat
import logging import logging
from backend.validators import pydant from backend.validators.pydant import *
logger = logging.getLogger(f"Submissions_{__name__}")
logger = logging.getLogger(f"submissions.{__name__}") @event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""
*should* allow automatic creation of foreign keys in the database
I have no idea how it actually works.
def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) -> pd.DataFrame: Args:
dbapi_connection (_type_): _description_
connection_record (_type_): _description_
"""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def submissions_to_df(submission_type:str|None=None, limit:int=0) -> pd.DataFrame:
""" """
Convert submissions looked up by type to dataframe Convert submissions looked up by type to dataframe
@@ -34,7 +45,8 @@ def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0)
logger.debug(f"Querying Type: {submission_type}") logger.debug(f"Querying Type: {submission_type}")
logger.debug(f"Using limit: {limit}") logger.debug(f"Using limit: {limit}")
# use lookup function to create list of dicts # use lookup function to create list of dicts
subs = [item.to_dict() for item in lookup_submissions(ctx=ctx, submission_type=submission_type, limit=limit)] # subs = [item.to_dict() for item in lookup_submissions(ctx=ctx, submission_type=submission_type, limit=limit)]
subs = [item.to_dict() for item in BasicSubmission.query(submission_type=submission_type, limit=limit)]
logger.debug(f"Got {len(subs)} results.") logger.debug(f"Got {len(subs)} results.")
# make df from dicts (records) in list # make df from dicts (records) in list
df = pd.DataFrame.from_records(subs) df = pd.DataFrame.from_records(subs)
@@ -66,7 +78,7 @@ def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0)
pass pass
return df return df
def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]: def get_control_subtypes(type:str, mode:str) -> list[str]:
""" """
Get subtypes for a control analysis mode Get subtypes for a control analysis mode
@@ -80,7 +92,8 @@ def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]:
""" """
# Only the first control of type is necessary since they all share subtypes # Only the first control of type is necessary since they all share subtypes
try: try:
outs = lookup_controls(ctx=ctx, control_type=type, limit=1) # outs = lookup_controls(ctx=ctx, control_type=type, limit=1)
outs = Control.query(control_type=type, limit=1)
except (TypeError, IndexError): except (TypeError, IndexError):
return [] return []
# Get analysis mode data as dict # Get analysis mode data as dict
@@ -93,7 +106,7 @@ def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]:
subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
return subtypes return subtypes
def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType): def update_last_used(reagent:Reagent, kit:KitType):
""" """
Updates the 'last_used' field in kittypes/reagenttypes Updates the 'last_used' field in kittypes/reagenttypes
@@ -104,78 +117,73 @@ def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType):
""" """
# rt = list(set(reagent.type).intersection(kit.reagent_types))[0] # rt = list(set(reagent.type).intersection(kit.reagent_types))[0]
logger.debug(f"Attempting update of reagent type at intersection of ({reagent}), ({kit})") logger.debug(f"Attempting update of reagent type at intersection of ({reagent}), ({kit})")
rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent) # rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent)
rt = ReagentType.query(kit_type=kit, reagent=reagent)
if rt != None: if rt != None:
assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt) # assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt)
assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt)
if assoc != None: if assoc != None:
if assoc.last_used != reagent.lot: if assoc.last_used != reagent.lot:
logger.debug(f"Updating {assoc} last used to {reagent.lot}") logger.debug(f"Updating {assoc} last used to {reagent.lot}")
assoc.last_used = reagent.lot assoc.last_used = reagent.lot
# ctx.database_session.merge(assoc) # ctx.database_session.merge(assoc)
# ctx.database_session.commit() # ctx.database_session.commit()
result = store_object(ctx=ctx, object=assoc) # result = store_object(ctx=ctx, object=assoc)
result = assoc.save()
return result return result
return dict(message=f"Updating last used {rt} was not performed.") return dict(message=f"Updating last used {rt} was not performed.")
def delete_submission(ctx:Settings, id:int) -> dict|None: # def delete_submission(id:int) -> dict|None:
""" # """
Deletes a submission and its associated samples from the database. # Deletes a submission and its associated samples from the database.
Args: # Args:
ctx (Settings): settings object passed down from gui # ctx (Settings): settings object passed down from gui
id (int): id of submission to be deleted. # id (int): id of submission to be deleted.
""" # """
# In order to properly do this Im' going to have to delete all of the secondary table stuff as well. # # In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
# Retrieve submission # # Retrieve submission
sub = lookup_submissions(ctx=ctx, id=id) # # sub = lookup_submissions(ctx=ctx, id=id)
# Convert to dict for storing backup as a yml # sub = models.BasicSubmission.query(id=id)
backup = sub.to_dict() # # Convert to dict for storing backup as a yml
try: # sub.delete()
with open(Path(ctx.backup_path).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: # return None
yaml.dump(backup, f)
except KeyError:
pass
ctx.database_session.delete(sub)
try:
ctx.database_session.commit()
except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e:
ctx.database_session.rollback()
raise e
return None
def update_ww_sample(ctx:Settings, sample_obj:dict) -> dict|None: # def update_ww_sample(sample_obj:dict) -> dict|None:
""" # """
Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary # Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary
Args: # Args:
ctx (Settings): settings object passed down from gui # ctx (Settings): settings object passed down from gui
sample_obj (dict): dictionary representing new values for database object # sample_obj (dict): dictionary representing new values for database object
""" # """
logger.debug(f"dictionary to use for update: {pformat(sample_obj)}") # logger.debug(f"dictionary to use for update: {pformat(sample_obj)}")
logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}") # logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}")
assoc = lookup_submission_sample_association(ctx=ctx, submission=sample_obj['plate_rsl'], sample=sample_obj['sample']) # # assoc = lookup_submission_sample_association(ctx=ctx, submission=sample_obj['plate_rsl'], sample=sample_obj['sample'])
if assoc != None: # assoc = models.SubmissionSampleAssociation.query(submission=sample_obj['plate_rsl'], sample=sample_obj['sample'])
for key, value in sample_obj.items(): # if assoc != None:
# set attribute 'key' to 'value' # for key, value in sample_obj.items():
try: # # set attribute 'key' to 'value'
check = getattr(assoc, key) # try:
except AttributeError as e: # check = getattr(assoc, key)
logger.error(f"Item doesn't have field {key} due to {e}") # except AttributeError as e:
continue # logger.error(f"Item doesn't have field {key} due to {e}")
if check != value: # continue
logger.debug(f"Setting association key: {key} to {value}") # if check != value:
try: # logger.debug(f"Setting association key: {key} to {value}")
setattr(assoc, key, value) # try:
except AttributeError as e: # setattr(assoc, key, value)
logger.error(f"Can't set field {key} to {value} due to {e}") # except AttributeError as e:
continue # logger.error(f"Can't set field {key} to {value} due to {e}")
else: # continue
logger.error(f"Unable to find sample {sample_obj['sample']}") # else:
return # logger.error(f"Unable to find sample {sample_obj['sample']}")
result = store_object(ctx=ctx, object=assoc) # return
return result # # result = store_object(ctx=ctx, object=assoc)
# result = assoc.save()
# return result
def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|pydant.PydSubmission, reagenttypes:list=[]) -> dict|None: def check_kit_integrity(sub:BasicSubmission|KitType|PydSubmission, reagenttypes:list=[]) -> dict|None:
""" """
Ensures all reagents expected in kit are listed in Submission Ensures all reagents expected in kit are listed in Submission
@@ -190,11 +198,12 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|
# What type is sub? # What type is sub?
# reagenttypes = [] # reagenttypes = []
match sub: match sub:
case pydant.PydSubmission(): case PydSubmission():
ext_kit = lookup_kit_types(ctx=ctx, name=sub.extraction_kit['value']) # ext_kit = lookup_kit_types(ctx=ctx, name=sub.extraction_kit['value'])
ext_kit = KitType.query(name=sub.extraction_kit['value'])
ext_kit_rtypes = [item.name for item in ext_kit.get_reagents(required=True, submission_type=sub.submission_type['value'])] ext_kit_rtypes = [item.name for item in ext_kit.get_reagents(required=True, submission_type=sub.submission_type['value'])]
reagenttypes = [item.type for item in sub.reagents] reagenttypes = [item.type for item in sub.reagents]
case models.BasicSubmission(): case BasicSubmission():
# Get all required reagent types for this kit. # Get all required reagent types for this kit.
ext_kit_rtypes = [item.name for item in sub.extraction_kit.get_reagents(required=True, submission_type=sub.submission_type_name)] ext_kit_rtypes = [item.name for item in sub.extraction_kit.get_reagents(required=True, submission_type=sub.submission_type_name)]
# Overwrite function parameter reagenttypes # Overwrite function parameter reagenttypes
@@ -202,9 +211,10 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|
logger.debug(f"For kit integrity, looking up reagent: {reagent}") logger.debug(f"For kit integrity, looking up reagent: {reagent}")
try: try:
# rt = list(set(reagent.type).intersection(sub.extraction_kit.reagent_types))[0].name # rt = list(set(reagent.type).intersection(sub.extraction_kit.reagent_types))[0].name
rt = lookup_reagent_types(ctx=ctx, kit_type=sub.extraction_kit, reagent=reagent) # rt = lookup_reagent_types(ctx=ctx, kit_type=sub.extraction_kit, reagent=reagent)
rt = ReagentType.query(kit_type=sub.extraction_kit, reagent=reagent)
logger.debug(f"Got reagent type: {rt}") logger.debug(f"Got reagent type: {rt}")
if isinstance(rt, models.ReagentType): if isinstance(rt, ReagentType):
reagenttypes.append(rt.name) reagenttypes.append(rt.name)
except AttributeError as e: except AttributeError as e:
logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}") logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}")
@@ -212,7 +222,7 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|
except IndexError: except IndexError:
logger.error(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}") logger.error(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}")
raise ValueError(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}") raise ValueError(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}")
case models.KitType(): case KitType():
ext_kit_rtypes = [item.name for item in sub.get_reagents(required=True)] ext_kit_rtypes = [item.name for item in sub.get_reagents(required=True)]
case _: case _:
raise ValueError(f"There was no match for the integrity object.\n\nCheck to make sure they are imported from the same place because it matters.") raise ValueError(f"There was no match for the integrity object.\n\nCheck to make sure they are imported from the same place because it matters.")
@@ -231,7 +241,7 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|
result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing} result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing}
return result return result
def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission, sample:models.BasicSample, input_dict:dict) -> dict|None: def update_subsampassoc_with_pcr(submission:BasicSubmission, sample:BasicSample, input_dict:dict) -> dict|None:
""" """
Inserts PCR results into wastewater submission/sample association Inserts PCR results into wastewater submission/sample association
@@ -244,35 +254,39 @@ def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission
Returns: Returns:
dict|None: result object dict|None: result object
""" """
assoc = lookup_submission_sample_association(ctx, submission=submission, sample=sample) # assoc = lookup_submission_sample_association(ctx, submission=submission, sample=sample)
assoc = SubmissionSampleAssociation.query(submission=submission, sample=sample)
for k,v in input_dict.items(): for k,v in input_dict.items():
try: try:
setattr(assoc, k, v) setattr(assoc, k, v)
except AttributeError: except AttributeError:
logger.error(f"Can't set {k} to {v}") logger.error(f"Can't set {k} to {v}")
result = store_object(ctx=ctx, object=assoc) # result = store_object(ctx=ctx, object=assoc)
result = assoc.save()
return result return result
# def get_polymorphic_subclass(base:object|models.BasicSubmission=models.BasicSubmission, polymorphic_identity:str|None=None):
# def store_object(ctx:Settings, object) -> dict|None:
# """ # """
# Retrieves any subclasses of given base class whose polymorphic identity matches the string input. # Store an object in the database
# NOTE: Depreciated in favour of class based finders in 'submissions.py'
# Args: # Args:
# base (object): Base (parent) class # ctx (Settings): Settings object passed down from gui
# polymorphic_identity (str | None): Name of subclass of interest. (Defaults to None) # object (_type_): Object to be stored
# Returns: # Returns:
# _type_: Subclass, or parent class on # dict|None: Result of action
# """ # """
# if isinstance(polymorphic_identity, dict): # dbs = ctx.database_session
# polymorphic_identity = polymorphic_identity['value'] # dbs.merge(object)
# if polymorphic_identity == None: # try:
# return base # dbs.commit()
# else: # except (SQLIntegrityError, AlcIntegrityError) as e:
# try: # logger.debug(f"Hit an integrity error : {e}")
# return [item for item in base.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] # dbs.rollback()
# except Exception as e: # return {"message":f"This object {object} already exists, so we can't add it.\n{e}", "status":"Critical"}
# logger.error(f"Could not get polymorph {polymorphic_identity} of {base} due to {e}") # except (SQLOperationalError, AlcOperationalError):
# return base # logger.error(f"Hit an operational error: {e}")
# dbs.rollback()
# return {"message":"The database is locked for editing."}
# return None

View File

@@ -1,91 +0,0 @@
'''Contains or imports all database convenience functions'''
from tools import Settings, package_dir
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from pathlib import Path
import logging
logger = logging.getLogger(f"Submissions_{__name__}")
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""
*should* allow automatic creation of foreign keys in the database
I have no idea how it actually works.
Args:
dbapi_connection (_type_): _description_
connection_record (_type_): _description_
"""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def create_database_session(ctx:Settings) -> Session:
"""
Create database session for app.
Args:
ctx (Settings): settings passed down from gui
Raises:
FileNotFoundError: Raised if sqlite file not found
Returns:
Session: Sqlalchemy session object.
"""
database_path = ctx.database_path
if database_path == None:
# check in user's .submissions directory for submissions.db
if Path.home().joinpath(".submissions", "submissions.db").exists():
database_path = Path.home().joinpath(".submissions", "submissions.db")
# finally, look in the local dir
else:
database_path = package_dir.joinpath("submissions.db")
else:
if database_path == ":memory:":
pass
# check if user defined path is directory
elif database_path.is_dir():
database_path = database_path.joinpath("submissions.db")
# check if user defined path is a file
elif database_path.is_file():
database_path = database_path
else:
raise FileNotFoundError("No database file found. Exiting program.")
logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}", echo=True, future=True)
session = Session(engine)
return session
def store_object(ctx:Settings, object) -> dict|None:
"""
Store an object in the database
Args:
ctx (Settings): Settings object passed down from gui
object (_type_): Object to be stored
Returns:
dict|None: Result of action
"""
dbs = ctx.database_session
dbs.merge(object)
try:
dbs.commit()
except (SQLIntegrityError, AlcIntegrityError) as e:
logger.debug(f"Hit an integrity error : {e}")
dbs.rollback()
return {"message":f"This object {object} already exists, so we can't add it.\n{e}", "status":"Critical"}
except (SQLOperationalError, AlcOperationalError):
logger.error(f"Hit an operational error: {e}")
dbs.rollback()
return {"message":"The database is locked for editing."}
return None
from .lookups import *
# from .constructions import *
from .misc import *

View File

@@ -1,515 +0,0 @@
from .. import models
from tools import Settings
# from backend.namer import RSLNamer
from typing import List
import logging
from datetime import date, datetime
from dateutil.parser import parse
from sqlalchemy.orm.query import Query
from sqlalchemy import and_, JSON
from sqlalchemy.orm import Session
logger = logging.getLogger(f"submissions.{__name__}")
def query_return(query:Query, limit:int=0):
with query.session.no_autoflush:
match limit:
case 0:
return query.all()
case 1:
return query.first()
case _:
return query.limit(limit).all()
def setup_lookup(ctx:Settings, locals:dict) -> Session:
for k, v in locals.items():
if k == "kwargs":
continue
if isinstance(v, dict):
raise ValueError("Cannot use dictionary in query. Make sure you parse it first.")
# return create_database_session(ctx=ctx)
return ctx.database_session
################## Basic Lookups ####################################
def lookup_reagents(ctx:Settings,
reagent_type:str|models.ReagentType|None=None,
lot_number:str|None=None,
limit:int=0
) -> models.Reagent|List[models.Reagent]:
"""
Lookup a list of reagents from the database.
Args:
ctx (Settings): Settings object passed down from gui
reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None.
lot_number (str | None, optional): Reagent lot number. Defaults to None.
limit (int, optional): limit of results returned. Defaults to 0.
Returns:
models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter.
"""
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Reagent)
match reagent_type:
case str():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==reagent_type)
case models.ReagentType():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.filter(models.Reagent.type.contains(reagent_type))
case _:
pass
match lot_number:
case str():
logger.debug(f"Looking up reagent by lot number: {lot_number}")
query = query.filter(models.Reagent.lot==lot_number)
# In this case limit number returned.
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_kit_types(ctx:Settings,
name:str=None,
used_for:str|None=None,
id:int|None=None,
limit:int=0
) -> models.KitType|List[models.KitType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitType)
match used_for:
case str():
logger.debug(f"Looking up kit type by use: {used_for}")
query = query.filter(models.KitType.used_for.any(name=used_for))
case _:
pass
match name:
case str():
logger.debug(f"Looking up kit type by name: {name}")
query = query.filter(models.KitType.name==name)
limit = 1
case _:
pass
match id:
case int():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(models.KitType.id==id)
limit = 1
case str():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(models.KitType.id==int(id))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_reagent_types(ctx:Settings,
name: str|None=None,
kit_type: models.KitType|str|None=None,
reagent: models.Reagent|str|None=None,
limit:int=0,
) -> models.ReagentType|List[models.ReagentType]:
"""
_summary_
Args:
ctx (Settings): Settings object passed down from gui.
name (str | None, optional): Reagent type name. Defaults to None.
limit (int, optional): limit of results to return. Defaults to 0.
Returns:
models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query = setup_lookup(ctx=ctx, locals=locals()).query(models.ReagentType)
if (kit_type != None and reagent == None) or (reagent != None and kit_type == None):
raise ValueError("Cannot filter without both reagent and kit type.")
elif kit_type == None and reagent == None:
pass
else:
match kit_type:
case str():
kit_type = lookup_kit_types(ctx=ctx, name=kit_type)
case _:
pass
match reagent:
case str():
reagent = lookup_reagents(ctx=ctx, lot_number=reagent)
case _:
pass
assert reagent.type != []
logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
# logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}")
result = list(set(kit_type.reagent_types).intersection(reagent.type))
logger.debug(f"Result: {result}")
try:
return result[0]
except IndexError:
return result
match name:
case str():
logger.debug(f"Looking up reagent type by name: {name}")
query = query.filter(models.ReagentType.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_submissions(ctx:Settings,
submission_type:str|models.SubmissionType|None=None,
id:int|str|None=None,
rsl_number:str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
reagent:models.Reagent|str|None=None,
chronologic:bool=False, limit:int=0,
**kwargs
) -> models.BasicSubmission | List[models.BasicSubmission]:
if submission_type == None:
model = models.BasicSubmission.find_subclasses(ctx=ctx, attrs=kwargs)
else:
if isinstance(submission_type, models.SubmissionType):
model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type.name)
else:
model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type)
query = setup_lookup(ctx=ctx, locals=locals()).query(model)
# by submission type
match submission_type:
case models.SubmissionType():
logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(models.BasicSubmission.submission_type_name==submission_type.name)
query = query.filter(model.submission_type_name==submission_type.name)
case str():
logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(models.BasicSubmission.submission_type_name==submission_type)
query = query.filter(model.submission_type_name==submission_type)
case _:
pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
# query = query.filter(models.BasicSubmission.submitted_date.between(start_date, end_date))
query = query.filter(model.submitted_date.between(start_date, end_date))
# by reagent (for some reason)
match reagent:
case str():
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
reagent = lookup_reagents(ctx=ctx, lot_number=reagent)
query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all()
case models.Reagent:
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all()
case _:
pass
# by rsl number (returns only a single value)
match rsl_number:
case str():
# query = query.filter(models.BasicSubmission.rsl_plate_num==rsl_number)
query = query.filter(model.rsl_plate_num==rsl_number)
logger.debug(f"At this point the query gets: {query.all()}")
limit = 1
case _:
pass
# by id (returns only a single value)
match id:
case int():
logger.debug(f"Looking up BasicSubmission with id: {id}")
# query = query.filter(models.BasicSubmission.id==id)
query = query.filter(model.id==id)
limit = 1
case str():
logger.debug(f"Looking up BasicSubmission with id: {id}")
# query = query.filter(models.BasicSubmission.id==int(id))
query = query.filter(model.id==int(id))
limit = 1
case _:
pass
for k, v in kwargs.items():
attr = getattr(model, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
if chronologic:
# query.order_by(models.BasicSubmission.submitted_date)
query.order_by(model.submitted_date)
# logger.debug(f"At the end of the search, the query gets: {query.all()}")
return query_return(query=query, limit=limit)
def lookup_submission_type(ctx:Settings,
name:str|None=None,
limit:int=0
) -> models.SubmissionType|List[models.SubmissionType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionType)
match name:
case str():
logger.debug(f"Looking up submission type by name: {name}")
query = query.filter(models.SubmissionType.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_organizations(ctx:Settings,
name:str|None=None,
limit:int=0,
) -> models.Organization|List[models.Organization]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Organization)
match name:
case str():
logger.debug(f"Looking up organization with name: {name}")
query = query.filter(models.Organization.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_discounts(ctx:Settings,
organization:models.Organization|str|int,
kit_type:models.KitType|str|int,
) -> models.Discount|List[models.Discount]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Discount)
match organization:
case models.Organization():
logger.debug(f"Looking up discount with organization: {organization}")
organization = organization.id
case str():
logger.debug(f"Looking up discount with organization: {organization}")
organization = lookup_organizations(ctx=ctx, name=organization).id
case int():
logger.debug(f"Looking up discount with organization id: {organization}")
pass
case _:
raise ValueError(f"Invalid value for organization: {organization}")
match kit_type:
case models.KitType():
logger.debug(f"Looking up discount with kit type: {kit_type}")
kit_type = kit_type.id
case str():
logger.debug(f"Looking up discount with kit type: {kit_type}")
kit_type = lookup_kit_types(ctx=ctx, name=kit_type).id
case int():
logger.debug(f"Looking up discount with kit type id: {organization}")
pass
case _:
raise ValueError(f"Invalid value for kit type: {kit_type}")
return query.join(models.KitType).join(models.Organization).filter(and_(
models.KitType.id==kit_type,
models.Organization.id==organization
)).all()
def lookup_controls(ctx:Settings,
control_type:models.ControlType|str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
control_name:str|None=None,
limit:int=0
) -> models.Control|List[models.Control]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.Control)
# by control type
match control_type:
case models.ControlType():
logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(models.ControlType).filter(models.ControlType==control_type)
case str():
logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(models.ControlType).filter(models.ControlType.name==control_type)
case _:
pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
query = query.filter(models.Control.submitted_date.between(start_date, end_date))
match control_name:
case str():
query = query.filter(models.Control.name.startswith(control_name))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
def lookup_control_types(ctx:Settings, limit:int=0) -> models.ControlType|List[models.ControlType]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.ControlType)
return query_return(query=query, limit=limit)
def lookup_samples(ctx:Settings,
submitter_id:str|None=None,
sample_type:str|None=None,
limit:int=0,
**kwargs
) -> models.BasicSample|models.WastewaterSample|List[models.BasicSample]:
logger.debug(f"Length of kwargs: {len(kwargs)}")
# model = models.find_subclasses(parent=models.BasicSample, attrs=kwargs)
model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs)
query = setup_lookup(ctx=ctx, locals=locals()).query(model)
match submitter_id:
case str():
logger.debug(f"Looking up {model} with submitter id: {submitter_id}")
query = query.filter(models.BasicSample.submitter_id==submitter_id)
limit = 1
case _:
pass
match sample_type:
case str():
logger.debug(f"Looking up {model} with sample type: {sample_type}")
query = query.filter(models.BasicSample.sample_type==sample_type)
case _:
pass
for k, v in kwargs.items():
attr = getattr(model, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
return query_return(query=query, limit=limit)
def lookup_reagenttype_kittype_association(ctx:Settings,
kit_type:models.KitType|str|None,
reagent_type:models.ReagentType|str|None,
limit:int=0
) -> models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitTypeReagentTypeAssociation)
match kit_type:
case models.KitType():
query = query.filter(models.KitTypeReagentTypeAssociation.kit_type==kit_type)
case str():
query = query.join(models.KitType).filter(models.KitType.name==kit_type)
case _:
pass
match reagent_type:
case models.ReagentType():
query = query.filter(models.KitTypeReagentTypeAssociation.reagent_type==reagent_type)
case str():
query = query.join(models.ReagentType).filter(models.ReagentType.name==reagent_type)
case _:
pass
if kit_type != None and reagent_type != None:
limit = 1
return query_return(query=query, limit=limit)
def lookup_submission_sample_association(ctx:Settings,
submission:models.BasicSubmission|str|None=None,
sample:models.BasicSample|str|None=None,
row:int=0,
column:int=0,
limit:int=0,
chronologic:bool=False
) -> models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]:
query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionSampleAssociation)
match submission:
case models.BasicSubmission():
query = query.filter(models.SubmissionSampleAssociation.submission==submission)
case str():
query = query.join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==submission)
case _:
pass
match sample:
case models.BasicSample():
query = query.filter(models.SubmissionSampleAssociation.sample==sample)
case str():
query = query.join(models.BasicSample).filter(models.BasicSample.submitter_id==sample)
case _:
pass
if row > 0:
query = query.filter(models.SubmissionSampleAssociation.row==row)
if column > 0:
query = query.filter(models.SubmissionSampleAssociation.column==column)
logger.debug(f"Query count: {query.count()}")
if chronologic:
query.join(models.BasicSubmission).order_by(models.BasicSubmission.submitted_date)
if query.count() <= 1:
limit = 1
return query_return(query=query, limit=limit)
def lookup_modes(ctx:Settings) -> List[str]:
rel = ctx.database_session.query(models.Control).first()
try:
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.debug(f"Failed to get available modes from db: {e}")
cols = []
return cols
############### Complex Lookups ###################################
def lookup_sub_samp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str|models.BasicSample, rsl_sample_num:str|models.BasicSubmission) -> models.WastewaterAssociation:
"""
_summary_
Args:
ctx (Settings): _description_
rsl_plate_num (str): _description_
sample_submitter_id (_type_): _description_
Returns:
models.SubmissionSampleAssociation: _description_
"""
# logger.debug(f"{type(rsl_plate_num)}, {type(rsl_sample_num)}")
match rsl_plate_num:
case models.BasicSubmission()|models.Wastewater():
# logger.debug(f"Model for rsl_plate_num: {rsl_plate_num}")
first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.filter(models.SubmissionSampleAssociation.submission==rsl_plate_num)
case str():
# logger.debug(f"String for rsl_plate_num: {rsl_plate_num}")
first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.join(models.BasicSubmission)\
.filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)
case _:
logger.error(f"Unknown case for rsl_plate_num {rsl_plate_num}")
match rsl_sample_num:
case models.BasicSample()|models.WastewaterSample():
# logger.debug(f"Model for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
# case models.WastewaterSample:
# second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
case str():
# logger.debug(f"String for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.join(models.BasicSample)\
.filter(models.BasicSample.submitter_id==rsl_sample_num)
case _:
logger.error(f"Unknown case for rsl_sample_num {rsl_sample_num}")
try:
return second_query.first()
except UnboundLocalError:
logger.error(f"Couldn't construct second query")
return None

View File

@@ -1,45 +1,10 @@
''' '''
Contains all models for sqlalchemy Contains all models for sqlalchemy
''' '''
from sqlalchemy.orm import declarative_base, DeclarativeMeta
import logging
Base: DeclarativeMeta = declarative_base()
metadata = Base.metadata
logger = logging.getLogger(f"submissions.{__name__}")
# def find_subclasses(parent:Any, attrs:dict|None=None, rsl_number:str|None=None) -> Any:
# """
# Finds subclasses of a parent that does contain all
# attributes if the parent does not.
# NOTE: Depreciated, moved to classmethods in individual base models.
# Args:
# parent (_type_): Parent class.
# attrs (dict): Key:Value dictionary of attributes
# Raises:
# AttributeError: Raised if no subclass is found.
# Returns:
# _type_: Parent or subclass.
# """
# if len(attrs) == 0 or attrs == None:
# return parent
# if any([not hasattr(parent, attr) for attr in attrs]):
# # looks for first model that has all included kwargs
# try:
# model = [subclass for subclass in parent.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0]
# except IndexError as e:
# raise AttributeError(f"Couldn't find existing class/subclass of {parent} with all attributes:\n{pformat(attrs)}")
# else:
# model = parent
# logger.debug(f"Using model: {model}")
# return model
from .controls import Control, ControlType from .controls import Control, ControlType
from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation # import order must go: orgs, kit, subs due to circular import issues
from .organizations import Organization, Contact from .organizations import Organization, Contact
from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, BasicSample, SubmissionSampleAssociation, WastewaterAssociation from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation
from .submissions import (BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample,
BasicSample, SubmissionSampleAssociation, WastewaterAssociation)

View File

@@ -1,12 +1,16 @@
''' '''
All control related models. All control related models.
''' '''
from . import Base from __future__ import annotations
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship, Query
import logging import logging
from operator import itemgetter from operator import itemgetter
import json import json
from tools import Base, setup_lookup, query_return
from datetime import date, datetime
from typing import List
from dateutil.parser import parse
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -21,6 +25,31 @@ class ControlType(Base):
targets = Column(JSON) #: organisms checked for targets = Column(JSON) #: organisms checked for
instances = relationship("Control", back_populates="controltype") #: control samples created of this type. instances = relationship("Control", back_populates="controltype") #: control samples created of this type.
@classmethod
@setup_lookup
def query(cls,
name:str=None,
limit:int=0
) -> ControlType|List[ControlType]:
"""
Lookup control archetypes in the database
Args:
ctx (Settings): Settings object passed down from gui.
name (str, optional): Control type name (limits results to 1). Defaults to None.
limit (int, optional): Maximum number of results to return. Defaults to 0.
Returns:
models.ControlType|List[models.ControlType]: ControlType(s) of interest.
"""
query = cls.metadata.session.query(cls)
match name:
case str():
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class Control(Base): class Control(Base):
""" """
@@ -136,3 +165,88 @@ class Control(Base):
data = {} data = {}
return data return data
@classmethod
@setup_lookup
def query(cls,
control_type:ControlType|str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
control_name:str|None=None,
limit:int=0
) -> Control|List[Control]:
"""
Lookup control objects in the database based on a number of parameters.
Args:
control_type (models.ControlType | str | None, optional): Control archetype. Defaults to None.
start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None.
end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None.
control_name (str | None, optional): Name of control. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.Control|List[models.Control]: Control object of interest.
"""
query: Query = cls.metadata.session.query(cls)
# by control type
match control_type:
case ControlType():
logger.debug(f"Looking up control by control type: {control_type}")
# query = query.join(models.ControlType).filter(models.ControlType==control_type)
query = query.filter(cls.controltype==control_type)
case str():
logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(ControlType).filter(ControlType.name==control_type)
case _:
pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
query = query.filter(cls.submitted_date.between(start_date, end_date))
match control_name:
case str():
query = query.filter(cls.name.startswith(control_name))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
@classmethod
def get_modes(cls):
"""
Get all control modes from database
Args:
ctx (Settings): Settings object passed down from gui.
Returns:
List[str]: List of control mode names.
"""
rel = cls.metadata.session.query(cls).first()
try:
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.debug(f"Failed to get available modes from db: {e}")
cols = []
return cols

View File

@@ -1,19 +1,20 @@
''' '''
All kit and reagent related models All kit and reagent related models
''' '''
from . import Base from __future__ import annotations
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT
from sqlalchemy.orm import relationship, validates from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date from datetime import date
import logging import logging
from tools import Settings, check_authorization from tools import Settings, check_authorization, Base, setup_lookup, query_return
from typing import List
from . import Organization
logger = logging.getLogger(f'submissions.{__name__}') logger = logging.getLogger(f'submissions.{__name__}')
reagenttypes_reagents = Table("_reagenttypes_reagents", Base.metadata, Column("reagent_id", INTEGER, ForeignKey("_reagents.id")), Column("reagenttype_id", INTEGER, ForeignKey("_reagent_types.id"))) reagenttypes_reagents = Table("_reagenttypes_reagents", Base.metadata, Column("reagent_id", INTEGER, ForeignKey("_reagents.id")), Column("reagenttype_id", INTEGER, ForeignKey("_reagent_types.id")))
class KitType(Base): class KitType(Base):
""" """
Base of kits used in submission processing Base of kits used in submission processing
@@ -102,9 +103,59 @@ class KitType(Base):
return map return map
@check_authorization @check_authorization
def save(self, ctx:Settings): def save(self):
ctx.database_session.add(self) self.metadata.session.add(self)
ctx.database_session.commit() self.metadata.session.commit()
@classmethod
@setup_lookup
def query(cls,
name:str=None,
used_for:str|SubmissionType|None=None,
id:int|None=None,
limit:int=0
) -> KitType|List[KitType]:
"""
Lookup a list of or single KitType.
Args:
ctx (Settings): Settings object passed down from gui
name (str, optional): Name of desired kit (returns single instance). Defaults to None.
used_for (str | models.Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
id (int | None, optional): Kit id in the database. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.KitType|List[models.KitType]: KitType(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
match used_for:
case str():
logger.debug(f"Looking up kit type by use: {used_for}")
query = query.filter(cls.used_for.any(name=used_for))
case SubmissionType():
query = query.filter(cls.used_for.contains(used_for))
case _:
pass
match name:
case str():
logger.debug(f"Looking up kit type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
match id:
case int():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(cls.id==id)
limit = 1
case str():
logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(cls.id==int(id))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class ReagentType(Base): class ReagentType(Base):
""" """
@@ -140,6 +191,60 @@ class ReagentType(Base):
def __repr__(self): def __repr__(self):
return f"ReagentType({self.name})" return f"ReagentType({self.name})"
@classmethod
@setup_lookup
def query(cls,
name: str|None=None,
kit_type: KitType|str|None=None,
reagent: Reagent|str|None=None,
limit:int=0,
) -> ReagentType|List[ReagentType]:
"""
Lookup reagent types in the database.
Args:
ctx (Settings): Settings object passed down from gui.
name (str | None, optional): Reagent type name. Defaults to None.
limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0.
Returns:
models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query: Query = cls.metadata.session.query(cls)
if (kit_type != None and reagent == None) or (reagent != None and kit_type == None):
raise ValueError("Cannot filter without both reagent and kit type.")
elif kit_type == None and reagent == None:
pass
else:
match kit_type:
case str():
kit_type = KitType.query(name=kit_type)
case _:
pass
match reagent:
case str():
reagent = Reagent.query(lot_number=reagent)
case _:
pass
assert reagent.type != []
logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
# logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}")
result = list(set(kit_type.reagent_types).intersection(reagent.type))
logger.debug(f"Result: {result}")
try:
return result[0]
except IndexError:
return None
match name:
case str():
logger.debug(f"Looking up reagent type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class KitTypeReagentTypeAssociation(Base): class KitTypeReagentTypeAssociation(Base):
""" """
table containing reagenttype/kittype associations table containing reagenttype/kittype associations
@@ -179,6 +284,49 @@ class KitTypeReagentTypeAssociation(Base):
raise ValueError(f'{value} is not a reagenttype') raise ValueError(f'{value} is not a reagenttype')
return value return value
@classmethod
@setup_lookup
def query(cls,
kit_type:KitType|str|None,
reagent_type:ReagentType|str|None,
limit:int=0
) -> KitTypeReagentTypeAssociation|List[KitTypeReagentTypeAssociation]:
"""
Lookup junction of ReagentType and KitType
Args:
ctx (Settings): Settings object passed down from gui.
kit_type (models.KitType | str | None): KitType of interest.
reagent_type (models.ReagentType | str | None): ReagentType of interest.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest.
"""
query: Query = cls.metadata.session.query(cls)
match kit_type:
case KitType():
query = query.filter(cls.kit_type==kit_type)
case str():
query = query.join(KitType).filter(KitType.name==kit_type)
case _:
pass
match reagent_type:
case ReagentType():
query = query.filter(cls.reagent_type==reagent_type)
case str():
query = query.join(ReagentType).filter(ReagentType.name==reagent_type)
case _:
pass
if kit_type != None and reagent_type != None:
limit = 1
return query_return(query=query, limit=limit)
def save(self):
self.metadata.session.add(self)
self.metadata.session.commit()
return None
class Reagent(Base): class Reagent(Base):
""" """
Concrete reagent instance Concrete reagent instance
@@ -199,7 +347,6 @@ class Reagent(Base):
else: else:
return f"<Reagent({self.type.name}-{self.lot})>" return f"<Reagent({self.type.name}-{self.lot})>"
def __str__(self) -> str: def __str__(self) -> str:
""" """
string representing this object string representing this object
@@ -209,7 +356,6 @@ class Reagent(Base):
""" """
return str(self.lot) return str(self.lot)
def to_sub_dict(self, extraction_kit:KitType=None) -> dict: def to_sub_dict(self, extraction_kit:KitType=None) -> dict:
""" """
dictionary containing values necessary for gui dictionary containing values necessary for gui
@@ -282,10 +428,47 @@ class Reagent(Base):
"expiry": self.expiry.strftime("%Y-%m-%d") "expiry": self.expiry.strftime("%Y-%m-%d")
} }
def save(self, ctx:Settings): def save(self):
ctx.database_session.add(self) self.metadata.session.add(self)
ctx.database_session.commit() self.metadata.session.commit()
@classmethod
@setup_lookup
def query(cls, reagent_type:str|ReagentType|None=None,
lot_number:str|None=None,
limit:int=0
) -> Reagent|List[Reagent]:
"""
Lookup a list of reagents from the database.
Args:
ctx (Settings): Settings object passed down from gui
reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None.
lot_number (str | None, optional): Reagent lot number. Defaults to None.
limit (int, optional): limit of results returned. Defaults to 0.
Returns:
models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter.
"""
query: Query = cls.metadata.session.query(cls)
match reagent_type:
case str():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.join(cls.type, aliased=True).filter(ReagentType.name==reagent_type)
case ReagentType():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.filter(cls.type.contains(reagent_type))
case _:
pass
match lot_number:
case str():
logger.debug(f"Looking up reagent by lot number: {lot_number}")
query = query.filter(cls.lot==lot_number)
# In this case limit number returned.
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class Discount(Base): class Discount(Base):
""" """
@@ -304,6 +487,56 @@ class Discount(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<Discount({self.name})>" return f"<Discount({self.name})>"
@classmethod
@setup_lookup
def query(cls,
organization:Organization|str|int|None=None,
kit_type:KitType|str|int|None=None,
) -> Discount|List[Discount]:
"""
Lookup discount objects (union of kit and organization)
Args:
ctx (Settings): Settings object passed down from the gui.
organization (models.Organization | str | int): Organization receiving discount.
kit_type (models.KitType | str | int): Kit discount received on.
Raises:
ValueError: Invalid Organization
ValueError: Invalid kit.
Returns:
models.Discount|List[models.Discount]: Discount(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
match organization:
case Organization():
logger.debug(f"Looking up discount with organization: {organization}")
query = query.filter(cls.client==Organization)
case str():
logger.debug(f"Looking up discount with organization: {organization}")
query = query.join(Organization).filter(Organization.name==organization)
case int():
logger.debug(f"Looking up discount with organization id: {organization}")
query = query.join(Organization).filter(Organization.id==organization)
case _:
# raise ValueError(f"Invalid value for organization: {organization}")
pass
match kit_type:
case KitType():
logger.debug(f"Looking up discount with kit type: {kit_type}")
query = query.filter(cls.kit==kit_type)
case str():
logger.debug(f"Looking up discount with kit type: {kit_type}")
query = query.join(KitType).filter(KitType.name==kit_type)
case int():
logger.debug(f"Looking up discount with kit type id: {organization}")
query = query.join(KitType).filter(KitType.id==kit_type)
case _:
# raise ValueError(f"Invalid value for kit type: {kit_type}")
pass
return query.all()
class SubmissionType(Base): class SubmissionType(Base):
""" """
Abstract of types of submissions. Abstract of types of submissions.
@@ -327,6 +560,32 @@ class SubmissionType(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<SubmissionType({self.name})>" return f"<SubmissionType({self.name})>"
@classmethod
@setup_lookup
def query(cls,
name:str|None=None,
limit:int=0
) -> SubmissionType|List[SubmissionType]:
"""
Lookup submission type in the database by a number of parameters
Args:
ctx (Settings): Settings object passed down from gui
name (str | None, optional): Name of submission type. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.SubmissionType|List[models.SubmissionType]: SubmissionType(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
match name:
case str():
logger.debug(f"Looking up submission type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class SubmissionTypeKitTypeAssociation(Base): class SubmissionTypeKitTypeAssociation(Base):
""" """
@@ -352,7 +611,39 @@ class SubmissionTypeKitTypeAssociation(Base):
self.constant_cost = 0.00 self.constant_cost = 0.00
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<SubmissionTypeKitTypeAssociation({self.submission_type.name})" return f"<SubmissionTypeKitTypeAssociation({self.submission_type.name})>"
def set_attrib(self, name, value): def set_attrib(self, name, value):
self.__setattr__(name, value) self.__setattr__(name, value)
@classmethod
@setup_lookup
def query(cls,
submission_type:SubmissionType|str|int|None=None,
kit_type:KitType|str|int|None=None,
limit:int=0
):
query: Query = cls.metadata.session.query(cls)
match submission_type:
case SubmissionType():
logger.debug(f"Looking up {cls.__name__} by SubmissionType {submission_type}")
query = query.filter(cls.submission_type==submission_type)
case str():
logger.debug(f"Looking up {cls.__name__} by name {submission_type}")
query = query.join(SubmissionType).filter(SubmissionType.name==submission_type)
case int():
logger.debug(f"Looking up {cls.__name__} by id {submission_type}")
query = query.join(SubmissionType).filter(SubmissionType.id==submission_type)
match kit_type:
case KitType():
logger.debug(f"Looking up {cls.__name__} by KitType {kit_type}")
query = query.filter(cls.kit_type==kit_type)
case str():
logger.debug(f"Looking up {cls.__name__} by name {kit_type}")
query = query.join(KitType).filter(KitType.name==kit_type)
case int():
logger.debug(f"Looking up {cls.__name__} by id {kit_type}")
query = query.join(KitType).filter(KitType.id==kit_type)
limit = query.count()
return query_return(query=query, limit=limit)

View File

@@ -1,14 +1,18 @@
''' '''
All client organization related models. All client organization related models.
''' '''
from . import Base from __future__ import annotations
from sqlalchemy import Column, String, INTEGER, ForeignKey, Table from sqlalchemy import Column, String, INTEGER, ForeignKey, Table
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship, Query
from tools import Base, check_authorization, setup_lookup, query_return
from typing import List
import logging
logger = logging.getLogger(f"submissions.{__name__}")
# table containing organization/contact relationship # table containing organization/contact relationship
orgs_contacts = Table("_orgs_contacts", Base.metadata, Column("org_id", INTEGER, ForeignKey("_organizations.id")), Column("contact_id", INTEGER, ForeignKey("_contacts.id"))) orgs_contacts = Table("_orgs_contacts", Base.metadata, Column("org_id", INTEGER, ForeignKey("_organizations.id")), Column("contact_id", INTEGER, ForeignKey("_contacts.id")))
class Organization(Base): class Organization(Base):
""" """
Base of organization Base of organization
@@ -33,6 +37,7 @@ class Organization(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<Organization({self.name})>" return f"<Organization({self.name})>"
@check_authorization
def save(self, ctx): def save(self, ctx):
ctx.database_session.add(self) ctx.database_session.add(self)
ctx.database_session.commit() ctx.database_session.commit()
@@ -40,6 +45,31 @@ class Organization(Base):
def set_attribute(self, name:str, value): def set_attribute(self, name:str, value):
setattr(self, name, value) setattr(self, name, value)
@classmethod
@setup_lookup
def query(cls,
name:str|None=None,
limit:int=0,
) -> Organization|List[Organization]:
"""
Lookup organizations in the database by a number of parameters.
Args:
name (str | None, optional): Name of the organization. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
Organization|List[Organization]: _description_
"""
query: Query = cls.metadata.session.query(cls)
match name:
case str():
logger.debug(f"Looking up organization with name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class Contact(Base): class Contact(Base):
""" """
@@ -56,3 +86,44 @@ class Contact(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<Contact({self.name})>" return f"<Contact({self.name})>"
@classmethod
@setup_lookup
def query(cls,
name:str|None=None,
email:str|None=None,
phone:str|None=None,
limit:int=0,
) -> Contact|List[Contact]:
"""
Lookup contacts in the database by a number of parameters.
Args:
name (str | None, optional): Name of the contact. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
Contact|List[Contact]: _description_
"""
query: Query = cls.metadata.session.query(cls)
match name:
case str():
logger.debug(f"Looking up contact with name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
match email:
case str():
logger.debug(f"Looking up contact with email: {name}")
query = query.filter(cls.email==email)
limit = 1
case _:
pass
match phone:
case str():
logger.debug(f"Looking up contact with phone: {name}")
query = query.filter(cls.phone==phone)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)

View File

@@ -1,25 +1,29 @@
''' '''
Models for the main submission types. Models for the main submission types.
''' '''
from __future__ import annotations
from getpass import getuser from getpass import getuser
import math import math
from pprint import pformat from pprint import pformat
from . import Base from . import Reagent, SubmissionType
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, case from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, case
from sqlalchemy.orm import relationship, validates from sqlalchemy.orm import relationship, validates, Query
import logging import logging
import json import json
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from math import ceil from math import ceil
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
import uuid import uuid
from dateutil.parser import parse
import re import re
import pandas as pd import pandas as pd
from openpyxl import Workbook from openpyxl import Workbook
from tools import check_not_nan, row_map, Settings from tools import check_not_nan, row_map, Base, query_return, setup_lookup
from pathlib import Path from datetime import datetime, date
from datetime import datetime from typing import List
from dateutil.parser import parse
import yaml
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -299,7 +303,7 @@ class BasicSubmission(Base):
return input_excel return input_excel
@classmethod @classmethod
def enforce_name(cls, ctx:Settings, instr:str) -> str: def enforce_name(cls, instr:str) -> str:
logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!")
logger.debug(f"Attempting enforcement on {instr}") logger.debug(f"Attempting enforcement on {instr}")
return instr return instr
@@ -311,7 +315,7 @@ class BasicSubmission(Base):
return regex return regex
@classmethod @classmethod
def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, submission_type:str|None=None): def find_subclasses(cls, attrs:dict|None=None, submission_type:str|None=None):
if submission_type != None: if submission_type != None:
return cls.find_polymorphic_subclass(submission_type) return cls.find_polymorphic_subclass(submission_type)
if len(attrs) == 0 or attrs == None: if len(attrs) == 0 or attrs == None:
@@ -331,24 +335,156 @@ class BasicSubmission(Base):
def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None): def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None):
if isinstance(polymorphic_identity, dict): if isinstance(polymorphic_identity, dict):
polymorphic_identity = polymorphic_identity['value'] polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity == None: if polymorphic_identity != None:
return cls
else:
try: try:
return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] cls = [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0]
logger.info(f"Recruiting: {cls}")
except Exception as e: except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
return cls return cls
@classmethod @classmethod
def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list: def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list:
logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!")
return [] return []
def save(self, ctx:Settings): def save(self):
self.uploaded_by = getuser() self.uploaded_by = getuser()
ctx.database_session.add(self) self.metadata.session.add(self)
ctx.database_session.commit() self.metadata.session.commit()
return None
def delete(self):
backup = self.to_dict()
try:
with open(self.metadata.backup_path.joinpath(f"{self.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
yaml.dump(backup, f)
except KeyError:
pass
self.metadata.database_session.delete(self)
try:
self.metadata.session.commit()
except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e:
self.metadata.session.rollback()
raise e
@classmethod
@setup_lookup
def query(cls,
submission_type:str|SubmissionType|None=None,
id:int|str|None=None,
rsl_number:str|None=None,
start_date:date|str|int|None=None,
end_date:date|str|int|None=None,
reagent:Reagent|str|None=None,
chronologic:bool=False, limit:int=0,
**kwargs
) -> BasicSubmission | List[BasicSubmission]:
"""
Lookup submissions based on a number of parameters.
Args:
submission_type (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None.
id (int | str | None, optional): Submission id in the database (limits results to 1). Defaults to None.
rsl_number (str | None, optional): Submission name in the database (limits results to 1). Defaults to None.
start_date (date | str | int | None, optional): Beginning date to search by. Defaults to None.
end_date (date | str | int | None, optional): Ending date to search by. Defaults to None.
reagent (models.Reagent | str | None, optional): A reagent used in the submission. Defaults to None.
chronologic (bool, optional): Return results in chronologic order. Defaults to False.
limit (int, optional): Maximum number of results to return. Defaults to 0.
Returns:
models.BasicSubmission | List[models.BasicSubmission]: Submission(s) of interest
"""
# NOTE: if you go back to using 'model' change the appropriate cls to model in the query filters
if submission_type == None:
model = cls.find_subclasses(attrs=kwargs)
else:
if isinstance(submission_type, SubmissionType):
model = cls.find_subclasses(submission_type=submission_type.name)
else:
model = cls.find_subclasses(submission_type=submission_type)
# query: Query = setup_lookup(ctx=ctx, locals=locals()).query(model)
query: Query = cls.metadata.session.query(model)
# by submission type
# match submission_type:
# case SubmissionType():
# logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(model.submission_type_name==submission_type.name)
# case str():
# logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}")
# query = query.filter(model.submission_type_name==submission_type)
# case _:
# pass
# by date range
if start_date != None and end_date == None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
match start_date:
case date():
start_date = start_date.strftime("%Y-%m-%d")
case int():
start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
case _:
start_date = parse(start_date).strftime("%Y-%m-%d")
match end_date:
case date():
end_date = end_date.strftime("%Y-%m-%d")
case int():
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
query = query.filter(cls.submitted_date.between(start_date, end_date))
# by reagent (for some reason)
match reagent:
case str():
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
# reagent = Reagent.query(lot_number=reagent)
# query = query.join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id)
query = query.join(cls.reagents).filter(Reagent.lot==reagent)
case Reagent():
logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
query = query.join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id)
case _:
pass
# by rsl number (returns only a single value)
match rsl_number:
case str():
query = query.filter(cls.rsl_plate_num==rsl_number)
logger.debug(f"At this point the query gets: {query.all()}")
limit = 1
case _:
pass
# by id (returns only a single value)
match id:
case int():
logger.debug(f"Looking up BasicSubmission with id: {id}")
query = query.filter(cls.id==id)
limit = 1
case str():
logger.debug(f"Looking up BasicSubmission with id: {id}")
query = query.filter(cls.id==int(id))
limit = 1
case _:
pass
for k, v in kwargs.items():
attr = getattr(cls, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
if chronologic:
query.order_by(cls.submitted_date)
return query_return(query=query, limit=limit)
@classmethod
def filename_template(cls):
return "{{ rsl_plate_num }}"
# Below are the custom submission types # Below are the custom submission types
@@ -415,9 +551,9 @@ class BacterialCulture(BasicSubmission):
return input_excel return input_excel
@classmethod @classmethod
def enforce_name(cls, ctx:Settings, instr:str) -> str: def enforce_name(cls, instr:str) -> str:
outstr = super().enforce_name(ctx=ctx, instr=instr) outstr = super().enforce_name(instr=instr)
def construct(ctx) -> str: def construct() -> str:
""" """
DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1
@@ -426,7 +562,8 @@ class BacterialCulture(BasicSubmission):
""" """
logger.debug(f"Attempting to construct RSL number from scratch...") logger.debug(f"Attempting to construct RSL number from scratch...")
# directory = Path(self.ctx['directory_path']).joinpath("Bacteria") # directory = Path(self.ctx['directory_path']).joinpath("Bacteria")
directory = Path(ctx.directory_path).joinpath("Bacteria") # directory = Path(ctx.directory_path).joinpath("Bacteria")
directory = cls.metadata.directory_path.joinpath("Bacteria")
year = str(datetime.now().year)[-2:] year = str(datetime.now().year)[-2:]
if directory.exists(): if directory.exists():
logger.debug(f"Year: {year}") logger.debug(f"Year: {year}")
@@ -449,7 +586,7 @@ class BacterialCulture(BasicSubmission):
try: try:
outstr = re.sub(r"RSL(\d{2})", r"RSL-\1", outstr, flags=re.IGNORECASE) outstr = re.sub(r"RSL(\d{2})", r"RSL-\1", outstr, flags=re.IGNORECASE)
except (AttributeError, TypeError) as e: except (AttributeError, TypeError) as e:
outstr = construct(ctx=ctx) outstr = construct()
# year = datetime.now().year # year = datetime.now().year
# self.parsed_name = f"RSL-{str(year)[-2:]}-0000" # self.parsed_name = f"RSL-{str(year)[-2:]}-0000"
return re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", outstr, flags=re.IGNORECASE) return re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", outstr, flags=re.IGNORECASE)
@@ -458,6 +595,12 @@ class BacterialCulture(BasicSubmission):
def get_regex(cls): def get_regex(cls):
return "(?P<Bacterial_Culture>RSL-?\\d{2}-?\\d{4})" return "(?P<Bacterial_Culture>RSL-?\\d{2}-?\\d{4})"
@classmethod
def filename_template(cls):
template = super().filename_template()
template += "_{{ submitting_lab }}_{{ submitter_plate_num }}"
return template
class Wastewater(BasicSubmission): class Wastewater(BasicSubmission):
""" """
derivative submission type from BasicSubmission derivative submission type from BasicSubmission
@@ -537,8 +680,8 @@ class Wastewater(BasicSubmission):
return samples return samples
@classmethod @classmethod
def enforce_name(cls, ctx:Settings, instr:str) -> str: def enforce_name(cls, instr:str) -> str:
outstr = super().enforce_name(ctx=ctx, instr=instr) outstr = super().enforce_name(instr=instr)
def construct(): def construct():
today = datetime.now() today = datetime.now()
return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
@@ -620,8 +763,8 @@ class WastewaterArtic(BasicSubmission):
return input_dict return input_dict
@classmethod @classmethod
def enforce_name(cls, ctx:Settings, instr:str) -> str: def enforce_name(cls, instr:str) -> str:
outstr = super().enforce_name(ctx=ctx, instr=instr) outstr = super().enforce_name(instr=instr)
def construct(): def construct():
today = datetime.now() today = datetime.now()
return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
@@ -727,7 +870,7 @@ class BasicSample(Base):
return dict(name=self.submitter_id[:10], positive=False, tooltip=tooltip_text) return dict(name=self.submitter_id[:10], positive=False, tooltip=tooltip_text)
@classmethod @classmethod
def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, rsl_number:str|None=None): def find_subclasses(cls, attrs:dict|None=None, rsl_number:str|None=None):
if len(attrs) == 0 or attrs == None: if len(attrs) == 0 or attrs == None:
return cls return cls
if any([not hasattr(cls, attr) for attr in attrs]): if any([not hasattr(cls, attr) for attr in attrs]):
@@ -737,7 +880,7 @@ class BasicSample(Base):
except IndexError as e: except IndexError as e:
raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}")
else: else:
model = cls return cls
logger.debug(f"Using model: {model}") logger.debug(f"Using model: {model}")
return model return model
@@ -759,6 +902,51 @@ class BasicSample(Base):
# logger.debug(f"Called {cls.__name__} sample parser") # logger.debug(f"Called {cls.__name__} sample parser")
return input_dict return input_dict
@classmethod
@setup_lookup
def query(cls,
submitter_id:str|None=None,
# sample_type:str|None=None,
limit:int=0,
**kwargs
) -> BasicSample|List[BasicSample]:
"""
Lookup samples in the database by a number of parameters.
Args:
ctx (Settings): Settings object passed down from gui
submitter_id (str | None, optional): Name of the sample (limits results to 1). Defaults to None.
sample_type (str | None, optional): Sample type. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.BasicSample|List[models.BasicSample]: Sample(s) of interest.
"""
logger.debug(f"Length of kwargs: {len(kwargs)}")
# model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs)
# query: Query = setup_lookup(ctx=ctx, locals=locals()).query(model)
query: Query = cls.metadata.session.query(cls)
match submitter_id:
case str():
logger.debug(f"Looking up {cls} with submitter id: {submitter_id}")
query = query.filter(cls.submitter_id==submitter_id)
limit = 1
case _:
pass
# match sample_type:
# case str():
# logger.debug(f"Looking up {model} with sample type: {sample_type}")
# query = query.filter(models.BasicSample.sample_type==sample_type)
# case _:
# pass
for k, v in kwargs.items():
attr = getattr(cls, k)
logger.debug(f"Got attr: {attr}")
query = query.filter(attr==v)
if len(kwargs) > 0:
limit = 1
return query_return(query=query, limit=limit)
class WastewaterSample(BasicSample): class WastewaterSample(BasicSample):
""" """
Derivative wastewater sample Derivative wastewater sample
@@ -772,53 +960,6 @@ class WastewaterSample(BasicSample):
sample_location = Column(String(8)) #: location on 24 well plate sample_location = Column(String(8)) #: location on 24 well plate
__mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"}
# @validates("collected-date")
# def convert_cdate_time(self, key, value):
# logger.debug(f"Validating {key}: {value}")
# if isinstance(value, Timestamp):
# return value.date()
# if isinstance(value, str):
# return parse(value)
# return value
# @validates("rsl_number")
# def use_submitter_id(self, key, value):
# logger.debug(f"Validating {key}: {value}")
# return value or self.submitter_id
# def set_attribute(self, name:str, value):
# """
# Set an attribute of this object. Extends parent.
# Args:
# name (str): name of the attribute
# value (_type_): value to be set
# """
# # Due to the plate map being populated with RSL numbers, we have to do some shuffling.
# match name:
# case "submitter_id":
# # If submitter_id already has a value, stop
# if self.submitter_id != None:
# return
# # otherwise also set rsl_number to the same value
# else:
# super().set_attribute("rsl_number", value)
# case "ww_full_sample_id":
# # If value present, set ww_full_sample_id and make this the submitter_id
# if value != None:
# super().set_attribute(name, value)
# name = "submitter_id"
# case 'collection_date':
# # If this is a string use dateutils to parse into date()
# if isinstance(value, str):
# logger.debug(f"collection_date {value} is a string. Attempting parse...")
# value = parse(value)
# case "rsl_number":
# if value == None:
# value = self.submitter_id
# super().set_attribute(name, value)
def to_hitpick(self, submission_rsl:str) -> dict|None: def to_hitpick(self, submission_rsl:str) -> dict|None:
""" """
Outputs a dictionary usable for html plate maps. Extends parent method. Outputs a dictionary usable for html plate maps. Extends parent method.
@@ -925,6 +1066,61 @@ class SubmissionSampleAssociation(Base):
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
return cls return cls
@classmethod
@setup_lookup
def query(cls,
submission:BasicSubmission|str|None=None,
sample:BasicSample|str|None=None,
row:int=0,
column:int=0,
limit:int=0,
chronologic:bool=False
) -> SubmissionSampleAssociation|List[SubmissionSampleAssociation]:
"""
Lookup junction of Submission and Sample in the database
Args:
submission (models.BasicSubmission | str | None, optional): Submission of interest. Defaults to None.
sample (models.BasicSample | str | None, optional): Sample of interest. Defaults to None.
row (int, optional): Row of the sample location on submission plate. Defaults to 0.
column (int, optional): Column of the sample location on the submission plate. Defaults to 0.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
chronologic (bool, optional): Return results in chronologic order. Defaults to False.
Returns:
models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]: Junction(s) of interest
"""
query: Query = cls.metadata.session.query(cls)
match submission:
case BasicSubmission():
query = query.filter(cls.submission==submission)
case str():
query = query.join(BasicSubmission).filter(BasicSubmission.rsl_plate_num==submission)
case _:
pass
match sample:
case BasicSample():
query = query.filter(cls.sample==sample)
case str():
query = query.join(BasicSample).filter(BasicSample.submitter_id==sample)
case _:
pass
if row > 0:
query = query.filter(cls.row==row)
if column > 0:
query = query.filter(cls.column==column)
logger.debug(f"Query count: {query.count()}")
if chronologic:
query.join(BasicSubmission).order_by(BasicSubmission.submitted_date)
if query.count() == 1:
limit = 1
return query_return(query=query, limit=limit)
def save(self):
self.metadata.session.add(self)
self.metadata.session.commit()
return None
class WastewaterAssociation(SubmissionSampleAssociation): class WastewaterAssociation(SubmissionSampleAssociation):
""" """
Derivative custom Wastewater/Submission Association... fancy. Derivative custom Wastewater/Submission Association... fancy.

View File

@@ -110,7 +110,8 @@ class SheetParser(object):
""" """
Enforce that only allowed reagents get into the Pydantic Model Enforce that only allowed reagents get into the Pydantic Model
""" """
kit = lookup_kit_types(ctx=self.ctx, name=self.sub['extraction_kit']['value']) # kit = lookup_kit_types(ctx=self.ctx, name=self.sub['extraction_kit']['value'])
kit = models.KitType.query(name=self.sub['extraction_kit']['value'])
allowed_reagents = [item.name for item in kit.get_reagents()] allowed_reagents = [item.name for item in kit.get_reagents()]
logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}") logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}")
# self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents] # self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents]
@@ -151,7 +152,8 @@ class InfoParser(object):
if isinstance(submission_type, str): if isinstance(submission_type, str):
submission_type = dict(value=submission_type, missing=True) submission_type = dict(value=submission_type, missing=True)
logger.debug(f"Looking up submission type: {submission_type['value']}") logger.debug(f"Looking up submission type: {submission_type['value']}")
submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value']) # submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value'])
submission_type = models.SubmissionType.query(name=submission_type['value'])
info_map = submission_type.info_map info_map = submission_type.info_map
# Get the parse_info method from the submission type specified # Get the parse_info method from the submission type specified
self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info
@@ -212,7 +214,8 @@ class ReagentParser(object):
def fetch_kit_info_map(self, extraction_kit:dict, submission_type:str): def fetch_kit_info_map(self, extraction_kit:dict, submission_type:str):
if isinstance(extraction_kit, dict): if isinstance(extraction_kit, dict):
extraction_kit = extraction_kit['value'] extraction_kit = extraction_kit['value']
kit = lookup_kit_types(ctx=self.ctx, name=extraction_kit) # kit = lookup_kit_types(ctx=self.ctx, name=extraction_kit)
kit = models.KitType.query(name=extraction_kit)
if isinstance(submission_type, dict): if isinstance(submission_type, dict):
submission_type = submission_type['value'] submission_type = submission_type['value']
reagent_map = kit.construct_xl_map_for_use(submission_type.title()) reagent_map = kit.construct_xl_map_for_use(submission_type.title())
@@ -289,7 +292,8 @@ class SampleParser(object):
dict: Info locations. dict: Info locations.
""" """
logger.debug(f"Looking up submission type: {submission_type}") logger.debug(f"Looking up submission type: {submission_type}")
submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type) # submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type)
submission_type = models.SubmissionType.query(name=submission_type)
logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}") logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}")
sample_info_map = submission_type.info_map['samples'] sample_info_map = submission_type.info_map['samples']
# self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples
@@ -458,7 +462,8 @@ class SampleParser(object):
logger.error(f"Could not find the model {query}. Using generic.") logger.error(f"Could not find the model {query}. Using generic.")
database_obj = models.BasicSample database_obj = models.BasicSample
logger.debug(f"Searching database for {input_dict['submitter_id']}...") logger.debug(f"Searching database for {input_dict['submitter_id']}...")
instance = lookup_samples(ctx=self.ctx, submitter_id=str(input_dict['submitter_id'])) # instance = lookup_samples(ctx=self.ctx, submitter_id=str(input_dict['submitter_id']))
instance = models.BasicSample.query(submitter_id=str(input_dict['submitter_id']))
if instance == None: if instance == None:
logger.debug(f"Couldn't find sample {input_dict['submitter_id']}. Creating new sample.") logger.debug(f"Couldn't find sample {input_dict['submitter_id']}. Creating new sample.")
instance = database_obj() instance = database_obj()

View File

@@ -22,7 +22,7 @@ class RSLNamer(object):
if self.submission_type != None: if self.submission_type != None:
enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.parsed_name = self.retrieve_rsl_number(instr=instr, regex=enforcer.get_regex()) self.parsed_name = self.retrieve_rsl_number(instr=instr, regex=enforcer.get_regex())
self.parsed_name = enforcer.enforce_name(ctx=ctx, instr=self.parsed_name) self.parsed_name = enforcer.enforce_name(instr=self.parsed_name)
@classmethod @classmethod
def retrieve_submission_type(cls, ctx:Settings, instr:str|Path) -> str: def retrieve_submission_type(cls, ctx:Settings, instr:str|Path) -> str:

View File

@@ -2,7 +2,6 @@
Contains pydantic models and accompanying validators Contains pydantic models and accompanying validators
''' '''
import uuid import uuid
from PyQt6 import QtCore
from pydantic import BaseModel, field_validator, Field from pydantic import BaseModel, field_validator, Field
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from dateutil.parser import parse from dateutil.parser import parse
@@ -12,14 +11,10 @@ from . import RSLNamer
from pathlib import Path from pathlib import Path
import re import re
import logging import logging
from tools import check_not_nan, convert_nans_to_nones, Settings from tools import check_not_nan, convert_nans_to_nones, Settings, jinja_template_loading
from backend.db.functions import (lookup_submissions, lookup_reagent_types, lookup_reagents, lookup_kit_types,
lookup_organizations, lookup_submission_type, lookup_discounts, lookup_samples, lookup_submission_sample_association,
lookup_reagenttype_kittype_association
)
from backend.db.models import * from backend.db.models import *
from sqlalchemy.exc import InvalidRequestError, StatementError from sqlalchemy.exc import InvalidRequestError, StatementError
from PyQt6.QtWidgets import QComboBox, QWidget, QLabel, QVBoxLayout from PyQt6.QtWidgets import QComboBox, QWidget
from pprint import pformat from pprint import pformat
from openpyxl import load_workbook from openpyxl import load_workbook
@@ -47,7 +42,8 @@ class PydReagent(BaseModel):
def rescue_type_with_lookup(cls, value, values): def rescue_type_with_lookup(cls, value, values):
if value == None and values.data['lot'] != None: if value == None and values.data['lot'] != None:
try: try:
return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name # return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name
return Reagent.query(lot_number=values.data['lot'].name)
except AttributeError: except AttributeError:
return value return value
return value return value
@@ -94,7 +90,8 @@ class PydReagent(BaseModel):
def toSQL(self) -> Tuple[Reagent, dict]: def toSQL(self) -> Tuple[Reagent, dict]:
result = None result = None
logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}") logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}")
reagent = lookup_reagents(ctx=self.ctx, lot_number=self.lot) # reagent = lookup_reagents(ctx=self.ctx, lot_number=self.lot)
reagent = Reagent.query(lot_number=self.lot)
logger.debug(f"Result: {reagent}") logger.debug(f"Result: {reagent}")
if reagent == None: if reagent == None:
reagent = Reagent() reagent = Reagent()
@@ -109,7 +106,8 @@ class PydReagent(BaseModel):
case "expiry": case "expiry":
reagent.expiry = value reagent.expiry = value
case "type": case "type":
reagent_type = lookup_reagent_types(ctx=self.ctx, name=value) # reagent_type = lookup_reagent_types(ctx=self.ctx, name=value)
reagent_type = ReagentType.query(name=value)
if reagent_type != None: if reagent_type != None:
reagent.type.append(reagent_type) reagent.type.append(reagent_type)
case "name": case "name":
@@ -145,7 +143,8 @@ class PydSample(BaseModel, extra='allow'):
result = None result = None
self.__dict__.update(self.model_extra) self.__dict__.update(self.model_extra)
logger.debug(f"Here is the incoming sample dict: \n{self.__dict__}") logger.debug(f"Here is the incoming sample dict: \n{self.__dict__}")
instance = lookup_samples(ctx=ctx, submitter_id=self.submitter_id) # instance = lookup_samples(ctx=ctx, submitter_id=self.submitter_id)
instance = BasicSample.query(submitter_id=self.submitter_id)
if instance == None: if instance == None:
logger.debug(f"Sample {self.submitter_id} doesn't exist yet. Looking up sample object with polymorphic identity: {self.sample_type}") logger.debug(f"Sample {self.submitter_id} doesn't exist yet. Looking up sample object with polymorphic identity: {self.sample_type}")
instance = BasicSample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)() instance = BasicSample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)()
@@ -158,7 +157,8 @@ class PydSample(BaseModel, extra='allow'):
instance.set_attribute(name=key, value=value) instance.set_attribute(name=key, value=value)
for row, column in zip(self.row, self.column): for row, column in zip(self.row, self.column):
logger.debug(f"Looking up association with identity: ({submission.submission_type_name} Association)") logger.debug(f"Looking up association with identity: ({submission.submission_type_name} Association)")
association = lookup_submission_sample_association(ctx=ctx, submission=submission, row=row, column=column) # association = lookup_submission_sample_association(ctx=ctx, submission=submission, row=row, column=column)
association = SubmissionSampleAssociation.query(submission=submission, row=row, column=column)
logger.debug(f"Returned association: {association}") logger.debug(f"Returned association: {association}")
if association == None or association == []: if association == None or association == []:
logger.debug(f"Looked up association at row {row}, column {column} didn't exist, creating new association.") logger.debug(f"Looked up association at row {row}, column {column} didn't exist, creating new association.")
@@ -239,7 +239,8 @@ class PydSubmission(BaseModel, extra='allow'):
logger.debug(f"RSL-plate initial value: {value['value']}") logger.debug(f"RSL-plate initial value: {value['value']}")
sub_type = values.data['submission_type']['value'] sub_type = values.data['submission_type']['value']
if check_not_nan(value['value']): if check_not_nan(value['value']):
if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None: # if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None:
if BasicSubmission.query(rsl_number=value['value']) == None:
return dict(value=value['value'], missing=False) return dict(value=value['value'], missing=False)
else: else:
logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath") logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath")
@@ -325,9 +326,12 @@ class PydSubmission(BaseModel, extra='allow'):
output.append(dummy) output.append(dummy)
self.samples = output self.samples = output
def improved_dict(self): def improved_dict(self, dictionaries:bool=True):
fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) fields = list(self.model_fields.keys()) + list(self.model_extra.keys())
output = {k:getattr(self, k) for k in fields} if dictionaries:
output = {k:getattr(self, k) for k in fields}
else:
output = {k:(getattr(self, k) if not isinstance(getattr(self, k), dict) else getattr(self, k)['value']) for k in fields}
return output return output
def find_missing(self): def find_missing(self):
@@ -341,7 +345,8 @@ class PydSubmission(BaseModel, extra='allow'):
msg = None msg = None
status = None status = None
self.__dict__.update(self.model_extra) self.__dict__.update(self.model_extra)
instance = lookup_submissions(ctx=self.ctx, rsl_number=self.rsl_plate_num['value']) # instance = lookup_submissions(ctx=self.ctx, rsl_number=self.rsl_plate_num['value'])
instance = BasicSubmission.query(rsl_number=self.rsl_plate_num['value'])
if instance == None: if instance == None:
instance = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)() instance = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)()
else: else:
@@ -357,11 +362,13 @@ class PydSubmission(BaseModel, extra='allow'):
match key: match key:
case "extraction_kit": case "extraction_kit":
logger.debug(f"Looking up kit {value}") logger.debug(f"Looking up kit {value}")
field_value = lookup_kit_types(ctx=self.ctx, name=value) # field_value = lookup_kit_types(ctx=self.ctx, name=value)
field_value = KitType.query(name=value)
logger.debug(f"Got {field_value} for kit {value}") logger.debug(f"Got {field_value} for kit {value}")
case "submitting_lab": case "submitting_lab":
logger.debug(f"Looking up organization: {value}") logger.debug(f"Looking up organization: {value}")
field_value = lookup_organizations(ctx=self.ctx, name=value) # field_value = lookup_organizations(ctx=self.ctx, name=value)
field_value = Organization.query(name=value)
logger.debug(f"Got {field_value} for organization {value}") logger.debug(f"Got {field_value} for organization {value}")
case "submitter_plate_num": case "submitter_plate_num":
logger.debug(f"Submitter plate id: {value}") logger.debug(f"Submitter plate id: {value}")
@@ -376,7 +383,8 @@ class PydSubmission(BaseModel, extra='allow'):
case "reagents": case "reagents":
field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for reagent in value] field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for reagent in value]
case "submission_type": case "submission_type":
field_value = lookup_submission_type(ctx=self.ctx, name=value) # field_value = lookup_submission_type(ctx=self.ctx, name=value)
field_value = SubmissionType.query(name=value)
case "sample_count": case "sample_count":
if value == None: if value == None:
field_value = len(self.samples) field_value = len(self.samples)
@@ -404,7 +412,8 @@ class PydSubmission(BaseModel, extra='allow'):
# Apply any discounts that are applicable for client and kit. # Apply any discounts that are applicable for client and kit.
try: try:
logger.debug("Checking and applying discounts...") logger.debug("Checking and applying discounts...")
discounts = [item.amount for item in lookup_discounts(ctx=self.ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)] # discounts = [item.amount for item in lookup_discounts(ctx=self.ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)]
discounts = [item.amount for item in Discount.query(kit_type=instance.extraction_kit, organization=instance.submitting_lab)]
logger.debug(f"We got discounts: {discounts}") logger.debug(f"We got discounts: {discounts}")
if len(discounts) > 0: if len(discounts) > 0:
discounts = sum(discounts) discounts = sum(discounts)
@@ -433,7 +442,8 @@ class PydSubmission(BaseModel, extra='allow'):
if len(reagents + list(info.keys())) == 0: if len(reagents + list(info.keys())) == 0:
return None return None
logger.debug(f"We have blank info and/or reagents in the excel sheet.\n\tLet's try to fill them in.") logger.debug(f"We have blank info and/or reagents in the excel sheet.\n\tLet's try to fill them in.")
extraction_kit = lookup_kit_types(ctx=self.ctx, name=self.extraction_kit['value']) # extraction_kit = lookup_kit_types(ctx=self.ctx, name=self.extraction_kit['value'])
extraction_kit = KitType.query(name=self.extraction_kit['value'])
logger.debug(f"We have the extraction kit: {extraction_kit.name}") logger.debug(f"We have the extraction kit: {extraction_kit.name}")
excel_map = extraction_kit.construct_xl_map_for_use(self.submission_type['value']) excel_map = extraction_kit.construct_xl_map_for_use(self.submission_type['value'])
logger.debug(f"Extraction kit map:\n\n{pformat(excel_map)}") logger.debug(f"Extraction kit map:\n\n{pformat(excel_map)}")
@@ -498,6 +508,13 @@ class PydSubmission(BaseModel, extra='allow'):
workbook = custom_parser.custom_autofill(workbook) workbook = custom_parser.custom_autofill(workbook)
return workbook return workbook
def construct_filename(self):
env = jinja_template_loading()
template = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type).filename_template()
logger.debug(f"Using template string: {template}")
template = env.from_string(template)
return template.render(**self.improved_dict(dictionaries=False))
class PydContact(BaseModel): class PydContact(BaseModel):
name: str name: str
@@ -539,12 +556,14 @@ class PydReagentType(BaseModel):
return value return value
def toSQL(self, ctx:Settings, kit:KitType): def toSQL(self, ctx:Settings, kit:KitType):
instance: ReagentType = lookup_reagent_types(ctx=ctx, name=self.name) # instance: ReagentType = lookup_reagent_types(ctx=ctx, name=self.name)
instance: ReagentType = ReagentType.query(name=self.name)
if instance == None: if instance == None:
instance = ReagentType(name=self.name, eol_ext=self.eol_ext) instance = ReagentType(name=self.name, eol_ext=self.eol_ext)
logger.debug(f"This is the reagent type instance: {instance.__dict__}") logger.debug(f"This is the reagent type instance: {instance.__dict__}")
try: try:
assoc = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=instance, kit_type=kit) # assoc = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=instance, kit_type=kit)
assoc = KitTypeReagentTypeAssociation.query(reagent_type=instance, kit_type=kit)
except StatementError: except StatementError:
assoc = None assoc = None
if assoc == None: if assoc == None:
@@ -559,7 +578,8 @@ class PydKit(BaseModel):
def toSQL(self, ctx): def toSQL(self, ctx):
result = dict(message=None, status='Information') result = dict(message=None, status='Information')
instance = lookup_kit_types(ctx=ctx, name=self.name) # instance = lookup_kit_types(ctx=ctx, name=self.name)
instance = KitType.query(name=self.name)
if instance == None: if instance == None:
instance = KitType(name=self.name) instance = KitType(name=self.name)
# instance.reagent_types = [item.toSQL(ctx, instance) for item in self.reagent_types] # instance.reagent_types = [item.toSQL(ctx, instance) for item in self.reagent_types]

View File

@@ -1,7 +1,6 @@
''' '''
Constructs main application. Constructs main application.
''' '''
from pprint import pformat
import sys import sys
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QMainWindow, QToolBar, QMainWindow, QToolBar,
@@ -16,9 +15,10 @@ from pathlib import Path
from backend.db.functions import ( from backend.db.functions import (
lookup_control_types, lookup_modes lookup_control_types, lookup_modes
) )
from backend.db.models import ControlType, Control
from backend.validators import PydSubmission, PydReagent from backend.validators import PydSubmission, PydReagent
from tools import check_if_app, Settings from tools import check_if_app, Settings
from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ImportReagent, ReagentFormWidget from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ReagentFormWidget
import logging import logging
from datetime import date from datetime import date
import webbrowser import webbrowser
@@ -228,7 +228,7 @@ class App(QMainWindow):
# send reagent to db # send reagent to db
# store_reagent(ctx=self.ctx, reagent=reagent) # store_reagent(ctx=self.ctx, reagent=reagent)
sqlobj, result = reagent.toSQL() sqlobj, result = reagent.toSQL()
sqlobj.save(ctx=self.ctx) sqlobj.save()
# result = store_object(ctx=self.ctx, object=reagent.toSQL()[0]) # result = store_object(ctx=self.ctx, object=reagent.toSQL()[0])
self.result_reporter(result=result) self.result_reporter(result=result)
return reagent return reagent
@@ -369,12 +369,14 @@ class AddSubForm(QWidget):
self.control_typer = QComboBox() self.control_typer = QComboBox()
# fetch types of controls # fetch types of controls
# con_types = get_all_Control_Types_names(ctx=parent.ctx) # con_types = get_all_Control_Types_names(ctx=parent.ctx)
con_types = [item.name for item in lookup_control_types(ctx=parent.ctx)] # con_types = [item.name for item in lookup_control_types(ctx=parent.ctx)]
con_types = [item.name for item in ControlType.query()]
self.control_typer.addItems(con_types) self.control_typer.addItems(con_types)
# create custom widget to get types of analysis # create custom widget to get types of analysis
self.mode_typer = QComboBox() self.mode_typer = QComboBox()
# mode_types = get_all_available_modes(ctx=parent.ctx) # mode_types = get_all_available_modes(ctx=parent.ctx)
mode_types = lookup_modes(ctx=parent.ctx) # mode_types = lookup_modes(ctx=parent.ctx)
mode_types = Control.get_modes()
self.mode_typer.addItems(mode_types) self.mode_typer.addItems(mode_types)
# create custom widget to get subtypes of analysis # create custom widget to get subtypes of analysis
self.sub_typer = QComboBox() self.sub_typer = QComboBox()

View File

@@ -13,10 +13,9 @@ from PyQt6.QtWidgets import (
) )
from PyQt6.QtCore import Qt, QDate, QSize, pyqtSignal from PyQt6.QtCore import Qt, QDate, QSize, pyqtSignal
from tools import check_not_nan, jinja_template_loading, Settings from tools import check_not_nan, jinja_template_loading, Settings
from backend.db.functions import \ from backend.db.functions import (lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \
lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \ lookup_submissions, lookup_organizations, lookup_kit_types)
lookup_submissions, lookup_organizations, lookup_kit_types from backend.db.models import *
from backend.db.models import SubmissionTypeKitTypeAssociation
from sqlalchemy import FLOAT, INTEGER from sqlalchemy import FLOAT, INTEGER
import logging import logging
import numpy as np import numpy as np
@@ -26,6 +25,7 @@ from typing import Tuple, List
from pprint import pformat from pprint import pformat
import difflib import difflib
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
env = jinja_template_loading() env = jinja_template_loading()
@@ -67,7 +67,8 @@ class AddReagentForm(QDialog):
# widget to get reagent type info # widget to get reagent type info
self.type_input = QComboBox() self.type_input = QComboBox()
self.type_input.setObjectName('type') self.type_input.setObjectName('type')
self.type_input.addItems([item.name for item in lookup_reagent_types(ctx=ctx)]) # self.type_input.addItems([item.name for item in lookup_reagent_types(ctx=ctx)])
self.type_input.addItems([item.name for item in ReagentType.query()])
logger.debug(f"Trying to find index of {reagent_type}") logger.debug(f"Trying to find index of {reagent_type}")
# convert input to user friendly string? # convert input to user friendly string?
try: try:
@@ -103,7 +104,8 @@ class AddReagentForm(QDialog):
""" """
logger.debug(self.type_input.currentText()) logger.debug(self.type_input.currentText())
self.name_input.clear() self.name_input.clear()
lookup = lookup_reagents(ctx=self.ctx, reagent_type=self.type_input.currentText()) # lookup = lookup_reagents(ctx=self.ctx, reagent_type=self.type_input.currentText())
lookup = Reagent.query(reagent_type=self.type_input.currentText())
self.name_input.addItems(list(set([item.name for item in lookup]))) self.name_input.addItems(list(set([item.name for item in lookup])))
class ReportDatePicker(QDialog): class ReportDatePicker(QDialog):
@@ -165,7 +167,8 @@ class KitAdder(QWidget):
used_for = QComboBox() used_for = QComboBox()
used_for.setObjectName("used_for") used_for.setObjectName("used_for")
# Insert all existing sample types # Insert all existing sample types
used_for.addItems([item.name for item in lookup_submission_type(ctx=parent_ctx)]) # used_for.addItems([item.name for item in lookup_submission_type(ctx=parent_ctx)])
used_for.addItems([item.name for item in SubmissionType.query()])
used_for.setEditable(True) used_for.setEditable(True)
self.grid.addWidget(used_for,3,1) self.grid.addWidget(used_for,3,1)
# Get all fields in SubmissionTypeKitTypeAssociation # Get all fields in SubmissionTypeKitTypeAssociation
@@ -188,28 +191,6 @@ class KitAdder(QWidget):
add_widget = QLineEdit() add_widget = QLineEdit()
add_widget.setObjectName(column.name) add_widget.setObjectName(column.name)
self.grid.addWidget(add_widget, idx,1) self.grid.addWidget(add_widget, idx,1)
# self.grid.addWidget(QLabel("Constant cost per full plate (plates, work hours, etc.):"),4,0)
# # widget to get constant cost
# const_cost = QDoubleSpinBox() #QSpinBox()
# const_cost.setObjectName("const_cost")
# const_cost.setMinimum(0)
# const_cost.setMaximum(9999)
# self.grid.addWidget(const_cost,4,1)
# self.grid.addWidget(QLabel("Cost per column (multidrop reagents, etc.):"),5,0)
# # widget to get mutable costs per column
# mut_cost_col = QDoubleSpinBox() #QSpinBox()
# mut_cost_col.setObjectName("mut_cost_col")
# mut_cost_col.setMinimum(0)
# mut_cost_col.setMaximum(9999)
# self.grid.addWidget(mut_cost_col,5,1)
# self.grid.addWidget(QLabel("Cost per sample (tips, reagents, etc.):"),6,0)
# # widget to get mutable costs per column
# mut_cost_samp = QDoubleSpinBox() #QSpinBox()
# mut_cost_samp.setObjectName("mut_cost_samp")
# mut_cost_samp.setMinimum(0)
# mut_cost_samp.setMaximum(9999)
# self.grid.addWidget(mut_cost_samp,6,1)
# button to add additional reagent types
self.add_RT_btn = QPushButton("Add Reagent Type") self.add_RT_btn = QPushButton("Add Reagent Type")
self.grid.addWidget(self.add_RT_btn) self.grid.addWidget(self.add_RT_btn)
self.add_RT_btn.clicked.connect(self.add_RT) self.add_RT_btn.clicked.connect(self.add_RT)
@@ -259,7 +240,7 @@ class KitAdder(QWidget):
logger.debug(f"Output pyd object: {kit.__dict__}") logger.debug(f"Output pyd object: {kit.__dict__}")
# result = construct_kit_from_yaml(ctx=self.ctx, kit_dict=info) # result = construct_kit_from_yaml(ctx=self.ctx, kit_dict=info)
sqlobj, result = kit.toSQL(self.ctx) sqlobj, result = kit.toSQL(self.ctx)
sqlobj.save(ctx=self.ctx) sqlobj.save()
msg = AlertPop(message=result['message'], status=result['status']) msg = AlertPop(message=result['message'], status=result['status'])
msg.exec() msg.exec()
self.__init__(self.ctx) self.__init__(self.ctx)
@@ -295,7 +276,8 @@ class ReagentTypeForm(QWidget):
self.reagent_getter = QComboBox() self.reagent_getter = QComboBox()
self.reagent_getter.setObjectName("rtname") self.reagent_getter.setObjectName("rtname")
# lookup all reagent type names from db # lookup all reagent type names from db
lookup = lookup_reagent_types(ctx=ctx) # lookup = lookup_reagent_types(ctx=ctx)
lookup = ReagentType.query()
logger.debug(f"Looked up ReagentType names: {lookup}") logger.debug(f"Looked up ReagentType names: {lookup}")
self.reagent_getter.addItems([item.__str__() for item in lookup]) self.reagent_getter.addItems([item.__str__() for item in lookup])
self.reagent_getter.setEditable(True) self.reagent_getter.setEditable(True)
@@ -387,66 +369,6 @@ class ControlsDatePicker(QWidget):
def sizeHint(self) -> QSize: def sizeHint(self) -> QSize:
return QSize(80,20) return QSize(80,20)
class ImportReagent(QComboBox):
"""
NOTE: Depreciated in favour of ReagentFormWidget
"""
def __init__(self, ctx:Settings, reagent:dict|PydReagent, extraction_kit:str):
super().__init__()
self.setEditable(True)
if isinstance(reagent, dict):
reagent = PydReagent(ctx=ctx, **reagent)
# Ensure that all reagenttypes have a name that matches the items in the excel parser
query_var = reagent.type
logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}")
if isinstance(reagent.lot, np.float64):
logger.debug(f"{reagent.lot} is a numpy float!")
try:
reagent.lot = int(reagent.lot)
except ValueError:
pass
# query for reagents using type name from sheet and kit from sheet
logger.debug(f"Attempting lookup of reagents by type: {query_var}")
# below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
lookup = lookup_reagents(ctx=ctx, reagent_type=query_var)
relevant_reagents = [item.__str__() for item in lookup]
output_reg = []
for rel_reagent in relevant_reagents:
# extract strings from any sets.
if isinstance(rel_reagent, set):
for thing in rel_reagent:
output_reg.append(thing)
elif isinstance(rel_reagent, str):
output_reg.append(rel_reagent)
relevant_reagents = output_reg
# if reagent in sheet is not found insert it into the front of relevant reagents so it shows
logger.debug(f"Relevant reagents for {reagent.lot}: {relevant_reagents}")
if str(reagent.lot) not in relevant_reagents:
if check_not_nan(reagent.lot):
relevant_reagents.insert(0, str(reagent.lot))
else:
# TODO: look up the last used reagent of this type in the database
looked_up_rt = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=reagent.type, kit_type=extraction_kit)
looked_up_reg = lookup_reagents(ctx=ctx, lot_number=looked_up_rt.last_used)
logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}")
if looked_up_reg != None:
relevant_reagents.remove(str(looked_up_reg.lot))
relevant_reagents.insert(0, str(looked_up_reg.lot))
else:
if len(relevant_reagents) > 1:
logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. Moving to front of list.")
idx = relevant_reagents.index(str(reagent.lot))
logger.debug(f"The index we got for {reagent.lot} in {relevant_reagents} was {idx}")
moved_reag = relevant_reagents.pop(idx)
relevant_reagents.insert(0, moved_reag)
else:
logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
logger.debug(f"New relevant reagents: {relevant_reagents}")
self.setObjectName(f"lot_{reagent.type}")
self.addItems(relevant_reagents)
class FirstStrandSalvage(QDialog): class FirstStrandSalvage(QDialog):
def __init__(self, ctx:Settings, submitter_id:str, rsl_plate_num:str|None=None) -> None: def __init__(self, ctx:Settings, submitter_id:str, rsl_plate_num:str|None=None) -> None:
@@ -492,7 +414,8 @@ class FirstStrandPlateList(QDialog):
self.buttonBox = QDialogButtonBox(QBtn) self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept) self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject) self.buttonBox.rejected.connect(self.reject)
ww = [item.rsl_plate_num for item in lookup_submissions(ctx=ctx, submission_type="Wastewater")] # ww = [item.rsl_plate_num for item in lookup_submissions(ctx=ctx, submission_type="Wastewater")]
ww = [item.rsl_plate_num for item in BasicSubmission.query(submission_type="Wastewater")]
self.plate1 = QComboBox() self.plate1 = QComboBox()
self.plate2 = QComboBox() self.plate2 = QComboBox()
self.plate3 = QComboBox() self.plate3 = QComboBox()
@@ -532,7 +455,8 @@ class ReagentFormWidget(QWidget):
def parse_form(self) -> Tuple[PydReagent, dict]: def parse_form(self) -> Tuple[PydReagent, dict]:
lot = self.lot.currentText() lot = self.lot.currentText()
wanted_reagent = lookup_reagents(ctx=self.ctx, lot_number=lot, reagent_type=self.reagent.type) # wanted_reagent = lookup_reagents(ctx=self.ctx, lot_number=lot, reagent_type=self.reagent.type)
wanted_reagent = Reagent.query(lot_number=lot, reagent_type=self.reagent.type)
# if reagent doesn't exist in database, off to add it (uses App.add_reagent) # if reagent doesn't exist in database, off to add it (uses App.add_reagent)
if wanted_reagent == None: if wanted_reagent == None:
dlg = QuestionAsker(title=f"Add {lot}?", message=f"Couldn't find reagent type {self.reagent.type}: {lot} in the database.\n\nWould you like to add it?") dlg = QuestionAsker(title=f"Add {lot}?", message=f"Couldn't find reagent type {self.reagent.type}: {lot} in the database.\n\nWould you like to add it?")
@@ -546,10 +470,12 @@ class ReagentFormWidget(QWidget):
else: else:
# Since this now gets passed in directly from the parser -> pyd -> form and the parser gets the name # Since this now gets passed in directly from the parser -> pyd -> form and the parser gets the name
# from the db, it should no longer be necessary to query the db with reagent/kit, but with rt name directly. # from the db, it should no longer be necessary to query the db with reagent/kit, but with rt name directly.
rt = lookup_reagent_types(ctx=self.ctx, name=self.reagent.type) # rt = lookup_reagent_types(ctx=self.ctx, name=self.reagent.type)
# rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent) # rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent)
rt = ReagentType.query(name=self.reagent.type)
if rt == None: if rt == None:
rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent) # rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent)
rt = ReagentType.query(kit_type=self.extraction_kit, reagent=wanted_reagent)
return PydReagent(ctx=self.ctx, name=wanted_reagent.name, lot=wanted_reagent.lot, type=rt.name, expiry=wanted_reagent.expiry, parsed=not self.missing), None return PydReagent(ctx=self.ctx, name=wanted_reagent.name, lot=wanted_reagent.lot, type=rt.name, expiry=wanted_reagent.expiry, parsed=not self.missing), None
def updated(self): def updated(self):
@@ -584,7 +510,8 @@ class ReagentFormWidget(QWidget):
# pass # pass
logger.debug(f"Attempting lookup of reagents by type: {reagent.type}") logger.debug(f"Attempting lookup of reagents by type: {reagent.type}")
# below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
lookup = lookup_reagents(ctx=self.ctx, reagent_type=reagent.type) # lookup = lookup_reagents(ctx=self.ctx, reagent_type=reagent.type)
lookup = Reagent.query(reagent_type=reagent.type)
relevant_reagents = [item.__str__() for item in lookup] relevant_reagents = [item.__str__() for item in lookup]
output_reg = [] output_reg = []
for rel_reagent in relevant_reagents: for rel_reagent in relevant_reagents:
@@ -602,9 +529,11 @@ class ReagentFormWidget(QWidget):
relevant_reagents.insert(0, str(reagent.lot)) relevant_reagents.insert(0, str(reagent.lot))
else: else:
# TODO: look up the last used reagent of this type in the database # TODO: look up the last used reagent of this type in the database
looked_up_rt = lookup_reagenttype_kittype_association(ctx=self.ctx, reagent_type=reagent.type, kit_type=extraction_kit) # looked_up_rt = lookup_reagenttype_kittype_association(ctx=self.ctx, reagent_type=reagent.type, kit_type=extraction_kit)
looked_up_rt = KitTypeReagentTypeAssociation.query(reagent_type=reagent.type, kit_type=extraction_kit)
try: try:
looked_up_reg = lookup_reagents(ctx=self.ctx, lot_number=looked_up_rt.last_used) # looked_up_reg = lookup_reagents(ctx=self.ctx, lot_number=looked_up_rt.last_used)
looked_up_reg = Reagent.query(lot_number=looked_up_rt.last_used)
except AttributeError: except AttributeError:
looked_up_reg = None looked_up_reg = None
logger.debug(f"Because there was no reagent listed for {reagent.lot}, we will insert the last lot used: {looked_up_reg}") logger.debug(f"Because there was no reagent listed for {reagent.lot}, we will insert the last lot used: {looked_up_reg}")
@@ -743,7 +672,8 @@ class SubmissionFormWidget(QWidget):
case 'submitting_lab': case 'submitting_lab':
add_widget = QComboBox() add_widget = QComboBox()
# lookup organizations suitable for submitting_lab (ctx: self.InfoItem.SubmissionFormWidget.SubmissionFormContainer.AddSubForm ) # lookup organizations suitable for submitting_lab (ctx: self.InfoItem.SubmissionFormWidget.SubmissionFormContainer.AddSubForm )
labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)] # labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)]
labs = [item.__str__() for item in Organization.query()]
# try to set closest match to top of list # try to set closest match to top of list
try: try:
labs = difflib.get_close_matches(value, labs, len(labs), 0) labs = difflib.get_close_matches(value, labs, len(labs), 0)
@@ -760,7 +690,8 @@ class SubmissionFormWidget(QWidget):
add_widget = QComboBox() add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser # lookup existing kits by 'submission_type' decided on by sheetparser
logger.debug(f"Looking up kits used for {submission_type}") logger.debug(f"Looking up kits used for {submission_type}")
uses = [item.__str__() for item in lookup_kit_types(ctx=obj.ctx, used_for=submission_type)] # uses = [item.__str__() for item in lookup_kit_types(ctx=obj.ctx, used_for=submission_type)]
uses = [item.__str__() for item in KitType.query(used_for=submission_type)]
obj.uses = uses obj.uses = uses
logger.debug(f"Kits received for {submission_type}: {uses}") logger.debug(f"Kits received for {submission_type}: {uses}")
if check_not_nan(value): if check_not_nan(value):
@@ -786,7 +717,8 @@ class SubmissionFormWidget(QWidget):
case 'submission_category': case 'submission_category':
add_widget = QComboBox() add_widget = QComboBox()
cats = ['Diagnostic', "Surveillance", "Research"] cats = ['Diagnostic', "Surveillance", "Research"]
cats += [item.name for item in lookup_submission_type(ctx=obj.ctx)] # cats += [item.name for item in lookup_submission_type(ctx=obj.ctx)]
cats += [item.name for item in SubmissionType.query()]
try: try:
cats.insert(0, cats.pop(cats.index(value))) cats.insert(0, cats.pop(cats.index(value)))
except ValueError: except ValueError:

View File

@@ -8,6 +8,7 @@ from PyQt6.QtWidgets import (
from tools import jinja_template_loading from tools import jinja_template_loading
import logging import logging
from backend.db.functions import lookup_kit_types, lookup_submission_type from backend.db.functions import lookup_kit_types, lookup_submission_type
from backend.db.models import KitType, SubmissionType
from typing import Literal from typing import Literal
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -53,7 +54,8 @@ class KitSelector(QDialog):
super().__init__() super().__init__()
self.setWindowTitle(title) self.setWindowTitle(title)
self.widget = QComboBox() self.widget = QComboBox()
kits = [item.__str__() for item in lookup_kit_types(ctx=ctx)] # kits = [item.__str__() for item in lookup_kit_types(ctx=ctx)]
kits = [item.__str__() for item in KitType.query()]
self.widget.addItems(kits) self.widget.addItems(kits)
self.widget.setEditable(False) self.widget.setEditable(False)
# set yes/no buttons # set yes/no buttons
@@ -80,7 +82,8 @@ class SubmissionTypeSelector(QDialog):
super().__init__() super().__init__()
self.setWindowTitle(title) self.setWindowTitle(title)
self.widget = QComboBox() self.widget = QComboBox()
sub_type = [item.name for item in lookup_submission_type(ctx=ctx)] # sub_type = [item.name for item in lookup_submission_type(ctx=ctx)]
sub_type = [item.name for item in SubmissionType.query()]
self.widget.addItems(sub_type) self.widget.addItems(sub_type)
self.widget.setEditable(False) self.widget.setEditable(False)
# set yes/no buttons # set yes/no buttons

View File

@@ -15,7 +15,8 @@ from PyQt6.QtWidgets import (
from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel
from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter
from backend.db.functions import submissions_to_df, delete_submission, lookup_submissions from backend.db.functions import submissions_to_df
from backend.db.models import BasicSubmission
from backend.excel import make_hitpicks from backend.excel import make_hitpicks
from tools import check_if_app, Settings from tools import check_if_app, Settings
from tools import jinja_template_loading from tools import jinja_template_loading
@@ -98,7 +99,7 @@ class SubmissionsSheet(QTableView):
""" """
sets data in model sets data in model
""" """
self.data = submissions_to_df(ctx=self.ctx) self.data = submissions_to_df()
try: try:
self.data['id'] = self.data['id'].apply(str) self.data['id'] = self.data['id'].apply(str)
self.data['id'] = self.data['id'].str.zfill(3) self.data['id'] = self.data['id'].str.zfill(3)
@@ -180,7 +181,8 @@ class SubmissionsSheet(QTableView):
logger.debug(index) logger.debug(index)
msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n") msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n")
if msg.exec(): if msg.exec():
delete_submission(ctx=self.ctx, id=value) # delete_submission(id=value)
BasicSubmission.query(id=value).delete()
else: else:
return return
self.setData() self.setData()
@@ -199,7 +201,8 @@ class SubmissionsSheet(QTableView):
logger.error(f"Error: Had to truncate number of plates to 4.") logger.error(f"Error: Had to truncate number of plates to 4.")
indices = indices[:4] indices = indices[:4]
# lookup ids in the database # lookup ids in the database
subs = [lookup_submissions(ctx=self.ctx, id=id) for id in indices] # subs = [lookup_submissions(ctx=self.ctx, id=id) for id in indices]
subs = [BasicSubmission.query(id=id) for id in indices]
# full list of samples # full list of samples
dicto = [] dicto = []
# list to contain plate images # list to contain plate images
@@ -256,7 +259,8 @@ class SubmissionDetails(QDialog):
interior = QScrollArea() interior = QScrollArea()
interior.setParent(self) interior.setParent(self)
# get submision from db # get submision from db
sub = lookup_submissions(ctx=ctx, id=id) # sub = lookup_submissions(ctx=ctx, id=id)
sub = BasicSubmission.query(id=id)
logger.debug(f"Submission details data:\n{pprint.pformat(sub.to_dict())}") logger.debug(f"Submission details data:\n{pprint.pformat(sub.to_dict())}")
self.base_dict = sub.to_dict(full_data=True) self.base_dict = sub.to_dict(full_data=True)
# don't want id # don't want id
@@ -427,7 +431,8 @@ class SubmissionComment(QDialog):
full_comment = {"name":commenter, "time": dt, "text": comment} full_comment = {"name":commenter, "time": dt, "text": comment}
logger.debug(f"Full comment: {full_comment}") logger.debug(f"Full comment: {full_comment}")
# sub = lookup_submission_by_rsl_num(ctx = self.ctx, rsl_num=self.rsl) # sub = lookup_submission_by_rsl_num(ctx = self.ctx, rsl_num=self.rsl)
sub = lookup_submissions(ctx = self.ctx, rsl_number=self.rsl) # sub = lookup_submissions(ctx = self.ctx, rsl_number=self.rsl)
sub = BasicSubmission.query(rsl_number=self.rsl)
try: try:
sub.comment.append(full_comment) sub.comment.append(full_comment)
except AttributeError: except AttributeError:

View File

@@ -281,7 +281,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
obj.pyd: PydSubmission = obj.form.parse_form() obj.pyd: PydSubmission = obj.form.parse_form()
logger.debug(f"Submission: {pprint.pformat(obj.pyd)}") logger.debug(f"Submission: {pprint.pformat(obj.pyd)}")
logger.debug("Checking kit integrity...") logger.debug("Checking kit integrity...")
kit_integrity = check_kit_integrity(ctx=obj.ctx, sub=obj.pyd) kit_integrity = check_kit_integrity(sub=obj.pyd)
if kit_integrity != None: if kit_integrity != None:
return obj, dict(message=kit_integrity['message'], status="critical") return obj, dict(message=kit_integrity['message'], status="critical")
base_submission, result = obj.pyd.toSQL() base_submission, result = obj.pyd.toSQL()
@@ -307,11 +307,11 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
pass pass
# add reagents to submission object # add reagents to submission object
for reagent in base_submission.reagents: for reagent in base_submission.reagents:
update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit) update_last_used(reagent=reagent, kit=base_submission.extraction_kit)
logger.debug(f"Here is the final submission: {pprint.pformat(base_submission.__dict__)}") logger.debug(f"Here is the final submission: {pprint.pformat(base_submission.__dict__)}")
logger.debug(f"Parsed reagents: {pprint.pformat(base_submission.reagents)}") logger.debug(f"Parsed reagents: {pprint.pformat(base_submission.reagents)}")
logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
base_submission.save(ctx=obj.ctx) base_submission.save()
# update summary sheet # update summary sheet
obj.table_widget.sub_wid.setData() obj.table_widget.sub_wid.setData()
# reset form # reset form
@@ -319,12 +319,12 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"All attributes of obj: {pprint.pformat(obj.__dict__)}") logger.debug(f"All attributes of obj: {pprint.pformat(obj.__dict__)}")
wkb = obj.pyd.autofill_excel() wkb = obj.pyd.autofill_excel()
if wkb != None: if wkb != None:
fname = select_save_file(obj=obj, default_name=obj.pyd.rsl_plate_num['value'], extension="xlsx") fname = select_save_file(obj=obj, default_name=obj.pyd.construct_filename(), extension="xlsx")
wkb.save(filename=fname.__str__()) wkb.save(filename=fname.__str__())
if hasattr(obj.pyd, 'csv'): if hasattr(obj.pyd, 'csv'):
dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?")
if dlg.exec(): if dlg.exec():
fname = select_save_file(obj, f"{base_submission.rsl_plate_num}.csv", extension="csv") fname = select_save_file(obj, f"{obj.pyd.rsl_plate_num['value']}.csv", extension="csv")
try: try:
obj.csv.to_csv(fname.__str__(), index=False) obj.csv.to_csv(fname.__str__(), index=False)
except PermissionError: except PermissionError:
@@ -348,7 +348,8 @@ def generate_report_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
info = dlg.parse_form() info = dlg.parse_form()
logger.debug(f"Report info: {info}") logger.debug(f"Report info: {info}")
# find submissions based on date range # find submissions based on date range
subs = lookup_submissions(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date']) # subs = lookup_submissions(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date'])
subs = BasicSubmission.query(start_date=info['start_date'], end_date=info['end_date'])
# convert each object to dict # convert each object to dict
records = [item.report_dict() for item in subs] records = [item.report_dict() for item in subs]
# make dataframe from record dictionaries # make dataframe from record dictionaries
@@ -468,7 +469,7 @@ def controls_getter_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
obj.mode = obj.table_widget.mode_typer.currentText() obj.mode = obj.table_widget.mode_typer.currentText()
obj.table_widget.sub_typer.clear() obj.table_widget.sub_typer.clear()
# lookup subtypes # lookup subtypes
sub_types = get_control_subtypes(ctx=obj.ctx, type=obj.con_type, mode=obj.mode) sub_types = get_control_subtypes(type=obj.con_type, mode=obj.mode)
# sub_types = lookup_controls(ctx=obj.ctx, control_type=obj.con_type) # sub_types = lookup_controls(ctx=obj.ctx, control_type=obj.con_type)
if sub_types != []: if sub_types != []:
# block signal that will rerun controls getter and update sub_typer # block signal that will rerun controls getter and update sub_typer
@@ -502,7 +503,8 @@ def chart_maker_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"Subtype: {obj.subtype}") logger.debug(f"Subtype: {obj.subtype}")
# query all controls using the type/start and end dates from the gui # query all controls using the type/start and end dates from the gui
# controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) # controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
controls = lookup_controls(ctx=obj.ctx, control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) # controls = lookup_controls(ctx=obj.ctx, control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
controls = Control.query(control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date)
# if no data found from query set fig to none for reporting in webview # if no data found from query set fig to none for reporting in webview
if controls == None: if controls == None:
fig = None fig = None
@@ -544,10 +546,12 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
""" """
result = None result = None
# all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture") # all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture")
all_bcs = lookup_submissions(ctx=obj.ctx, submission_type="Bacterial Culture") # all_bcs = lookup_submissions(ctx=obj.ctx, submission_type="Bacterial Culture")
all_bcs = BasicSubmission.query(submission_type="Bacterial Culture")
logger.debug(all_bcs) logger.debug(all_bcs)
# all_controls = get_all_controls(obj.ctx) # all_controls = get_all_controls(obj.ctx)
all_controls = lookup_controls(ctx=obj.ctx) # all_controls = lookup_controls(ctx=obj.ctx)
all_controls = Control.query()
ac_list = [control.name for control in all_controls] ac_list = [control.name for control in all_controls]
count = 0 count = 0
for bcs in all_bcs: for bcs in all_bcs:
@@ -615,7 +619,8 @@ def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
new_run[f"column{str(ii-5)}_vol"] = run[ii] new_run[f"column{str(ii-5)}_vol"] = run[ii]
# Lookup imported submissions # Lookup imported submissions
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num'])
sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num'])
# If no such submission exists, move onto the next run # If no such submission exists, move onto the next run
if sub == None: if sub == None:
continue continue
@@ -680,7 +685,8 @@ def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
) )
# lookup imported submission # lookup imported submission
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num'])
sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num'])
# if imported submission doesn't exist move on to next run # if imported submission doesn't exist move on to next run
if sub == None: if sub == None:
continue continue
@@ -734,7 +740,7 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
parser = PCRParser(ctx=obj.ctx, filepath=fname) parser = PCRParser(ctx=obj.ctx, filepath=fname)
logger.debug(f"Attempting lookup for {parser.plate_num}") logger.debug(f"Attempting lookup for {parser.plate_num}")
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num) sub = BasicSubmission.query(rsl_number=parser.plate_num)
try: try:
logger.debug(f"Found submission: {sub.rsl_plate_num}") logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError: except AttributeError:
@@ -742,7 +748,8 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.") logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.")
parser.plate_num = "-".join(parser.plate_num.split("-")[:-1]) parser.plate_num = "-".join(parser.plate_num.split("-")[:-1])
# sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num) # sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num)
sub = BasicSubmission.query(rsl_number=parser.plate_num)
try: try:
logger.debug(f"Found submission: {sub.rsl_plate_num}") logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError: except AttributeError:
@@ -779,7 +786,7 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
sample_dict = [item for item in parser.samples if item['sample']==sample.rsl_number][0] sample_dict = [item for item in parser.samples if item['sample']==sample.rsl_number][0]
except IndexError: except IndexError:
continue continue
update_subsampassoc_with_pcr(ctx=obj.ctx, submission=sub, sample=sample, input_dict=sample_dict) update_subsampassoc_with_pcr(submission=sub, sample=sample, input_dict=sample_dict)
result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information') result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information')
return obj, result return obj, result
@@ -877,8 +884,6 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx")
workbook.save(filename=fname.__str__()) workbook.save(filename=fname.__str__())
def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
""" """
Generates a csv file from client submitted xlsx file. Generates a csv file from client submitted xlsx file.
@@ -891,13 +896,16 @@ def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]
""" """
def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]: def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]:
logger.debug(f"Looking up {input_sample_number} in {plates}") logger.debug(f"Looking up {input_sample_number} in {plates}")
samp = lookup_samples(ctx=obj.ctx, ww_processing_num=input_sample_number) # samp = lookup_samples(ctx=obj.ctx, ww_processing_num=input_sample_number)
samp = BasicSample.query(ww_processing_num=input_sample_number)
if samp == None: if samp == None:
samp = lookup_samples(ctx=obj.ctx, submitter_id=input_sample_number) # samp = lookup_samples(ctx=obj.ctx, submitter_id=input_sample_number)
samp = BasicSample.query(submitter_id=input_sample_number)
if samp == None: if samp == None:
return None, None return None, None
logger.debug(f"Got sample: {samp}") logger.debug(f"Got sample: {samp}")
new_plates = [(iii+1, lookup_submission_sample_association(ctx=obj.ctx, sample=samp, submission=plate)) for iii, plate in enumerate(plates)] # new_plates = [(iii+1, lookup_submission_sample_association(ctx=obj.ctx, sample=samp, submission=plate)) for iii, plate in enumerate(plates)]
new_plates = [(iii+1, SubmissionSampleAssociation.query(sample=samp, submission=plate)) for iii, plate in enumerate(plates)]
logger.debug(f"Associations: {pprint.pformat(new_plates)}") logger.debug(f"Associations: {pprint.pformat(new_plates)}")
try: try:
plate_num, plate = next(assoc for assoc in new_plates if assoc[1]) plate_num, plate = next(assoc for assoc in new_plates if assoc[1])

View File

@@ -12,7 +12,7 @@ import sys, os, stat, platform, getpass
import logging import logging
from logging import handlers from logging import handlers
from pathlib import Path from pathlib import Path
from sqlalchemy.orm import Session from sqlalchemy.orm import Session, declarative_base, DeclarativeMeta, Query
from sqlalchemy import create_engine from sqlalchemy import create_engine
from pydantic import field_validator from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -37,6 +37,9 @@ LOGDIR = main_aux_dir.joinpath("logs")
row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"} row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"}
Base: DeclarativeMeta = declarative_base()
metadata = Base.metadata
def check_not_nan(cell_contents) -> bool: def check_not_nan(cell_contents) -> bool:
""" """
Check to ensure excel sheet cell contents are not blank. Check to ensure excel sheet cell contents are not blank.
@@ -160,12 +163,21 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file_encoding='utf-8') model_config = SettingsConfigDict(env_file_encoding='utf-8')
@field_validator('backup_path')
@classmethod
def set_backup_path(cls, value):
if isinstance(value, str):
value = Path(value)
metadata.backup_path = value
return value
@field_validator('directory_path', mode="before") @field_validator('directory_path', mode="before")
@classmethod @classmethod
def ensure_directory_exists(cls, value): def ensure_directory_exists(cls, value):
if isinstance(value, str): if isinstance(value, str):
value = Path(value) value = Path(value)
if value.exists(): if value.exists():
metadata.directory_path = value
return value return value
else: else:
raise FileNotFoundError(f"Couldn't find settings file {value}") raise FileNotFoundError(f"Couldn't find settings file {value}")
@@ -210,6 +222,8 @@ class Settings(BaseSettings):
logger.debug(f"Using {database_path} for database file.") logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True) engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True)
session = Session(engine) session = Session(engine)
metadata.session = session
return session return session
@field_validator('package', mode="before") @field_validator('package', mode="before")
@@ -287,43 +301,6 @@ def get_config(settings_path: Path|str|None=None) -> Settings:
settings = yaml.load(stream, Loader=yaml.Loader) settings = yaml.load(stream, Loader=yaml.Loader)
return Settings(**settings) return Settings(**settings)
def create_database_session(database_path: Path|str|None=None) -> Session:
"""
Creates a session to sqlite3 database from path or default database if database_path is blank.
DEPRECIATED: THIS IS NOW HANDLED BY THE PYDANTIC SETTINGS OBJECT.
Args:
database_path (Path | str | None, optional): path to sqlite database. Defaults to None.
Returns:
Session: database session
"""
# convert string to path object
if isinstance(database_path, str):
database_path = Path(database_path)
# check if database path defined by user
if database_path == None:
# check in user's .submissions directory for submissions.db
if Path.home().joinpath(".submissions", "submissions.db").exists():
database_path = Path.home().joinpath(".submissions", "submissions.db")
# finally, look in the local dir
else:
database_path = package_dir.joinpath("submissions.db")
else:
# check if user defined path is directory
if database_path.is_dir():
database_path = database_path.joinpath("submissions.db")
# check if user defined path is a file
elif database_path.is_file():
database_path = database_path
else:
logger.error("No database file found. Exiting program.")
sys.exit()
logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}")
session = Session(engine)
return session
def setup_logger(verbosity:int=3): def setup_logger(verbosity:int=3):
""" """
Set logger levels using settings. Set logger levels using settings.
@@ -454,3 +431,33 @@ def convert_well_to_row_column(input_str:str) -> Tuple[int, int]:
except IndexError: except IndexError:
return None, None return None, None
return row, column return row, column
def query_return(query:Query, limit:int=0):
"""
Execute sqlalchemy query.
Args:
query (Query): Query object
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
_type_: Query result.
"""
with query.session.no_autoflush:
match limit:
case 0:
return query.all()
case 1:
return query.first()
case _:
return query.limit(limit).all()
def setup_lookup(func):
def wrapper(*args, **kwargs):
for k, v in locals().items():
if k == "kwargs":
continue
if isinstance(v, dict):
raise ValueError("Cannot use dictionary in query. Make sure you parse it first.")
return func(*args, **kwargs)
return wrapper