From 5570d87b7c2c6c3f6a79ca458498a4cc4c72c989 Mon Sep 17 00:00:00 2001 From: Landon Wark Date: Fri, 3 Nov 2023 10:49:37 -0500 Subject: [PATCH] Pre-major refactor --- TODO.md | 7 +- src/submissions/backend/db/__init__.py | 3 +- .../db/{functions/misc.py => functions.py} | 218 ++++---- .../backend/db/functions/__init__.py | 91 ---- .../backend/db/functions/lookups.py | 515 ------------------ src/submissions/backend/db/models/__init__.py | 43 +- src/submissions/backend/db/models/controls.py | 118 +++- src/submissions/backend/db/models/kits.py | 321 ++++++++++- .../backend/db/models/organizations.py | 77 ++- .../backend/db/models/submissions.py | 344 +++++++++--- src/submissions/backend/excel/parser.py | 15 +- .../backend/validators/__init__.py | 2 +- src/submissions/backend/validators/pydant.py | 68 ++- src/submissions/frontend/__init__.py | 12 +- .../frontend/custom_widgets/misc.py | 134 ++--- .../frontend/custom_widgets/pop_ups.py | 7 +- .../frontend/custom_widgets/sub_details.py | 17 +- .../frontend/main_window_functions.py | 48 +- src/submissions/tools/__init__.py | 83 +-- 19 files changed, 1078 insertions(+), 1045 deletions(-) rename src/submissions/backend/db/{functions/misc.py => functions.py} (54%) delete mode 100644 src/submissions/backend/db/functions/__init__.py delete mode 100644 src/submissions/backend/db/functions/lookups.py diff --git a/TODO.md b/TODO.md index 29e5a49..d574505 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,9 @@ -- [ ] Update artic submission type database entry to +- [ ] Clear out any unnecessary ctx passes now that queries are improved. +- [ ] Make a 'query or create' method in all db objects to go with new query. +- [ ] Ensure Bacterial plates end up with RSL_YY_###_{submitterName}_{submitterPlateID}.xlsx format. +- [x] Move lookup functions into class methods of db objects? + - Not sure if will work for associations. +- [x] Update artic submission type database entry to add more technicians. - [ ] Document code - [x] Rewrite tests... again. - [x] Have InfoItem change status self.missing to True if value changed. diff --git a/src/submissions/backend/db/__init__.py b/src/submissions/backend/db/__init__.py index e4b232c..b167430 100644 --- a/src/submissions/backend/db/__init__.py +++ b/src/submissions/backend/db/__init__.py @@ -1,4 +1,5 @@ ''' All database related operations. ''' -# from .functions import * \ No newline at end of file +from .models import * +from .functions import * diff --git a/src/submissions/backend/db/functions/misc.py b/src/submissions/backend/db/functions.py similarity index 54% rename from src/submissions/backend/db/functions/misc.py rename to src/submissions/backend/db/functions.py index 1d78777..6fe3f61 100644 --- a/src/submissions/backend/db/functions/misc.py +++ b/src/submissions/backend/db/functions.py @@ -1,25 +1,36 @@ -''' -Contains convenience functions for using database -''' -import sys +'''Contains or imports all database convenience functions''' from tools import Settings -from .lookups import * +from sqlalchemy import event +from sqlalchemy.engine import Engine +from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError +from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError +import logging import pandas as pd import json from pathlib import Path -import yaml -from .. import models -from . import store_object -from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError -from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError -from pprint import pformat +from .models import * +# from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError +# from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError import logging -from backend.validators import pydant +from backend.validators.pydant import * +logger = logging.getLogger(f"Submissions_{__name__}") -logger = logging.getLogger(f"submissions.{__name__}") +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + """ + *should* allow automatic creation of foreign keys in the database + I have no idea how it actually works. -def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) -> pd.DataFrame: + Args: + dbapi_connection (_type_): _description_ + connection_record (_type_): _description_ + """ + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + +def submissions_to_df(submission_type:str|None=None, limit:int=0) -> pd.DataFrame: """ Convert submissions looked up by type to dataframe @@ -34,7 +45,8 @@ def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) logger.debug(f"Querying Type: {submission_type}") logger.debug(f"Using limit: {limit}") # use lookup function to create list of dicts - subs = [item.to_dict() for item in lookup_submissions(ctx=ctx, submission_type=submission_type, limit=limit)] + # subs = [item.to_dict() for item in lookup_submissions(ctx=ctx, submission_type=submission_type, limit=limit)] + subs = [item.to_dict() for item in BasicSubmission.query(submission_type=submission_type, limit=limit)] logger.debug(f"Got {len(subs)} results.") # make df from dicts (records) in list df = pd.DataFrame.from_records(subs) @@ -66,7 +78,7 @@ def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) pass return df -def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]: +def get_control_subtypes(type:str, mode:str) -> list[str]: """ Get subtypes for a control analysis mode @@ -80,7 +92,8 @@ def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]: """ # Only the first control of type is necessary since they all share subtypes try: - outs = lookup_controls(ctx=ctx, control_type=type, limit=1) + # outs = lookup_controls(ctx=ctx, control_type=type, limit=1) + outs = Control.query(control_type=type, limit=1) except (TypeError, IndexError): return [] # Get analysis mode data as dict @@ -93,7 +106,7 @@ def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]: subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] return subtypes -def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType): +def update_last_used(reagent:Reagent, kit:KitType): """ Updates the 'last_used' field in kittypes/reagenttypes @@ -104,78 +117,73 @@ def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType): """ # rt = list(set(reagent.type).intersection(kit.reagent_types))[0] logger.debug(f"Attempting update of reagent type at intersection of ({reagent}), ({kit})") - rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent) + # rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent) + rt = ReagentType.query(kit_type=kit, reagent=reagent) if rt != None: - assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt) + # assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt) + assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt) if assoc != None: if assoc.last_used != reagent.lot: logger.debug(f"Updating {assoc} last used to {reagent.lot}") assoc.last_used = reagent.lot # ctx.database_session.merge(assoc) # ctx.database_session.commit() - result = store_object(ctx=ctx, object=assoc) + # result = store_object(ctx=ctx, object=assoc) + result = assoc.save() return result return dict(message=f"Updating last used {rt} was not performed.") -def delete_submission(ctx:Settings, id:int) -> dict|None: - """ - Deletes a submission and its associated samples from the database. +# def delete_submission(id:int) -> dict|None: +# """ +# Deletes a submission and its associated samples from the database. - Args: - ctx (Settings): settings object passed down from gui - id (int): id of submission to be deleted. - """ - # In order to properly do this Im' going to have to delete all of the secondary table stuff as well. - # Retrieve submission - sub = lookup_submissions(ctx=ctx, id=id) - # Convert to dict for storing backup as a yml - backup = sub.to_dict() - try: - with open(Path(ctx.backup_path).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: - yaml.dump(backup, f) - except KeyError: - pass - ctx.database_session.delete(sub) - try: - ctx.database_session.commit() - except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e: - ctx.database_session.rollback() - raise e - return None +# Args: +# ctx (Settings): settings object passed down from gui +# id (int): id of submission to be deleted. +# """ +# # In order to properly do this Im' going to have to delete all of the secondary table stuff as well. +# # Retrieve submission +# # sub = lookup_submissions(ctx=ctx, id=id) +# sub = models.BasicSubmission.query(id=id) +# # Convert to dict for storing backup as a yml +# sub.delete() +# return None -def update_ww_sample(ctx:Settings, sample_obj:dict) -> dict|None: - """ - Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary +# def update_ww_sample(sample_obj:dict) -> dict|None: +# """ +# Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary - Args: - ctx (Settings): settings object passed down from gui - sample_obj (dict): dictionary representing new values for database object - """ - logger.debug(f"dictionary to use for update: {pformat(sample_obj)}") - logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}") - assoc = lookup_submission_sample_association(ctx=ctx, submission=sample_obj['plate_rsl'], sample=sample_obj['sample']) - if assoc != None: - for key, value in sample_obj.items(): - # set attribute 'key' to 'value' - try: - check = getattr(assoc, key) - except AttributeError as e: - logger.error(f"Item doesn't have field {key} due to {e}") - continue - if check != value: - logger.debug(f"Setting association key: {key} to {value}") - try: - setattr(assoc, key, value) - except AttributeError as e: - logger.error(f"Can't set field {key} to {value} due to {e}") - continue - else: - logger.error(f"Unable to find sample {sample_obj['sample']}") - return - result = store_object(ctx=ctx, object=assoc) - return result +# Args: +# ctx (Settings): settings object passed down from gui +# sample_obj (dict): dictionary representing new values for database object +# """ +# logger.debug(f"dictionary to use for update: {pformat(sample_obj)}") +# logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}") +# # assoc = lookup_submission_sample_association(ctx=ctx, submission=sample_obj['plate_rsl'], sample=sample_obj['sample']) +# assoc = models.SubmissionSampleAssociation.query(submission=sample_obj['plate_rsl'], sample=sample_obj['sample']) +# if assoc != None: +# for key, value in sample_obj.items(): +# # set attribute 'key' to 'value' +# try: +# check = getattr(assoc, key) +# except AttributeError as e: +# logger.error(f"Item doesn't have field {key} due to {e}") +# continue +# if check != value: +# logger.debug(f"Setting association key: {key} to {value}") +# try: +# setattr(assoc, key, value) +# except AttributeError as e: +# logger.error(f"Can't set field {key} to {value} due to {e}") +# continue +# else: +# logger.error(f"Unable to find sample {sample_obj['sample']}") +# return +# # result = store_object(ctx=ctx, object=assoc) +# result = assoc.save() +# return result -def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType|pydant.PydSubmission, reagenttypes:list=[]) -> dict|None: +def check_kit_integrity(sub:BasicSubmission|KitType|PydSubmission, reagenttypes:list=[]) -> dict|None: """ Ensures all reagents expected in kit are listed in Submission @@ -190,11 +198,12 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType| # What type is sub? # reagenttypes = [] match sub: - case pydant.PydSubmission(): - ext_kit = lookup_kit_types(ctx=ctx, name=sub.extraction_kit['value']) + case PydSubmission(): + # ext_kit = lookup_kit_types(ctx=ctx, name=sub.extraction_kit['value']) + ext_kit = KitType.query(name=sub.extraction_kit['value']) ext_kit_rtypes = [item.name for item in ext_kit.get_reagents(required=True, submission_type=sub.submission_type['value'])] reagenttypes = [item.type for item in sub.reagents] - case models.BasicSubmission(): + case BasicSubmission(): # Get all required reagent types for this kit. ext_kit_rtypes = [item.name for item in sub.extraction_kit.get_reagents(required=True, submission_type=sub.submission_type_name)] # Overwrite function parameter reagenttypes @@ -202,9 +211,10 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType| logger.debug(f"For kit integrity, looking up reagent: {reagent}") try: # rt = list(set(reagent.type).intersection(sub.extraction_kit.reagent_types))[0].name - rt = lookup_reagent_types(ctx=ctx, kit_type=sub.extraction_kit, reagent=reagent) + # rt = lookup_reagent_types(ctx=ctx, kit_type=sub.extraction_kit, reagent=reagent) + rt = ReagentType.query(kit_type=sub.extraction_kit, reagent=reagent) logger.debug(f"Got reagent type: {rt}") - if isinstance(rt, models.ReagentType): + if isinstance(rt, ReagentType): reagenttypes.append(rt.name) except AttributeError as e: logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}") @@ -212,7 +222,7 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType| except IndexError: logger.error(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}") raise ValueError(f"No intersection of {reagent} type {reagent.type} and {sub.extraction_kit.reagent_types}") - case models.KitType(): + case KitType(): ext_kit_rtypes = [item.name for item in sub.get_reagents(required=True)] case _: raise ValueError(f"There was no match for the integrity object.\n\nCheck to make sure they are imported from the same place because it matters.") @@ -231,7 +241,7 @@ def check_kit_integrity(ctx:Settings, sub:models.BasicSubmission|models.KitType| result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing} return result -def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission, sample:models.BasicSample, input_dict:dict) -> dict|None: +def update_subsampassoc_with_pcr(submission:BasicSubmission, sample:BasicSample, input_dict:dict) -> dict|None: """ Inserts PCR results into wastewater submission/sample association @@ -244,35 +254,39 @@ def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission Returns: dict|None: result object """ - assoc = lookup_submission_sample_association(ctx, submission=submission, sample=sample) + # assoc = lookup_submission_sample_association(ctx, submission=submission, sample=sample) + assoc = SubmissionSampleAssociation.query(submission=submission, sample=sample) for k,v in input_dict.items(): try: setattr(assoc, k, v) except AttributeError: logger.error(f"Can't set {k} to {v}") - result = store_object(ctx=ctx, object=assoc) + # result = store_object(ctx=ctx, object=assoc) + result = assoc.save() return result -# def get_polymorphic_subclass(base:object|models.BasicSubmission=models.BasicSubmission, polymorphic_identity:str|None=None): + +# def store_object(ctx:Settings, object) -> dict|None: # """ -# Retrieves any subclasses of given base class whose polymorphic identity matches the string input. -# NOTE: Depreciated in favour of class based finders in 'submissions.py' +# Store an object in the database # Args: -# base (object): Base (parent) class -# polymorphic_identity (str | None): Name of subclass of interest. (Defaults to None) +# ctx (Settings): Settings object passed down from gui +# object (_type_): Object to be stored # Returns: -# _type_: Subclass, or parent class on +# dict|None: Result of action # """ -# if isinstance(polymorphic_identity, dict): -# polymorphic_identity = polymorphic_identity['value'] -# if polymorphic_identity == None: -# return base -# else: -# try: -# return [item for item in base.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] -# except Exception as e: -# logger.error(f"Could not get polymorph {polymorphic_identity} of {base} due to {e}") -# return base - +# dbs = ctx.database_session +# dbs.merge(object) +# try: +# dbs.commit() +# except (SQLIntegrityError, AlcIntegrityError) as e: +# logger.debug(f"Hit an integrity error : {e}") +# dbs.rollback() +# return {"message":f"This object {object} already exists, so we can't add it.\n{e}", "status":"Critical"} +# except (SQLOperationalError, AlcOperationalError): +# logger.error(f"Hit an operational error: {e}") +# dbs.rollback() +# return {"message":"The database is locked for editing."} +# return None diff --git a/src/submissions/backend/db/functions/__init__.py b/src/submissions/backend/db/functions/__init__.py deleted file mode 100644 index 691846f..0000000 --- a/src/submissions/backend/db/functions/__init__.py +++ /dev/null @@ -1,91 +0,0 @@ -'''Contains or imports all database convenience functions''' -from tools import Settings, package_dir -from sqlalchemy.orm import Session -from sqlalchemy import create_engine, event -from sqlalchemy.engine import Engine -from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError -from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError -from pathlib import Path -import logging - -logger = logging.getLogger(f"Submissions_{__name__}") - -@event.listens_for(Engine, "connect") -def set_sqlite_pragma(dbapi_connection, connection_record): - """ - *should* allow automatic creation of foreign keys in the database - I have no idea how it actually works. - - Args: - dbapi_connection (_type_): _description_ - connection_record (_type_): _description_ - """ - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA foreign_keys=ON") - cursor.close() - -def create_database_session(ctx:Settings) -> Session: - """ - Create database session for app. - - Args: - ctx (Settings): settings passed down from gui - - Raises: - FileNotFoundError: Raised if sqlite file not found - - Returns: - Session: Sqlalchemy session object. - """ - database_path = ctx.database_path - if database_path == None: - # check in user's .submissions directory for submissions.db - if Path.home().joinpath(".submissions", "submissions.db").exists(): - database_path = Path.home().joinpath(".submissions", "submissions.db") - # finally, look in the local dir - else: - database_path = package_dir.joinpath("submissions.db") - else: - if database_path == ":memory:": - pass - # check if user defined path is directory - elif database_path.is_dir(): - database_path = database_path.joinpath("submissions.db") - # check if user defined path is a file - elif database_path.is_file(): - database_path = database_path - else: - raise FileNotFoundError("No database file found. Exiting program.") - logger.debug(f"Using {database_path} for database file.") - engine = create_engine(f"sqlite:///{database_path}", echo=True, future=True) - session = Session(engine) - return session - -def store_object(ctx:Settings, object) -> dict|None: - """ - Store an object in the database - - Args: - ctx (Settings): Settings object passed down from gui - object (_type_): Object to be stored - - Returns: - dict|None: Result of action - """ - dbs = ctx.database_session - dbs.merge(object) - try: - dbs.commit() - except (SQLIntegrityError, AlcIntegrityError) as e: - logger.debug(f"Hit an integrity error : {e}") - dbs.rollback() - return {"message":f"This object {object} already exists, so we can't add it.\n{e}", "status":"Critical"} - except (SQLOperationalError, AlcOperationalError): - logger.error(f"Hit an operational error: {e}") - dbs.rollback() - return {"message":"The database is locked for editing."} - return None - -from .lookups import * -# from .constructions import * -from .misc import * diff --git a/src/submissions/backend/db/functions/lookups.py b/src/submissions/backend/db/functions/lookups.py deleted file mode 100644 index a42be60..0000000 --- a/src/submissions/backend/db/functions/lookups.py +++ /dev/null @@ -1,515 +0,0 @@ -from .. import models -from tools import Settings -# from backend.namer import RSLNamer -from typing import List -import logging -from datetime import date, datetime -from dateutil.parser import parse -from sqlalchemy.orm.query import Query -from sqlalchemy import and_, JSON -from sqlalchemy.orm import Session - -logger = logging.getLogger(f"submissions.{__name__}") - -def query_return(query:Query, limit:int=0): - with query.session.no_autoflush: - match limit: - case 0: - return query.all() - case 1: - return query.first() - case _: - return query.limit(limit).all() - -def setup_lookup(ctx:Settings, locals:dict) -> Session: - for k, v in locals.items(): - if k == "kwargs": - continue - if isinstance(v, dict): - raise ValueError("Cannot use dictionary in query. Make sure you parse it first.") - # return create_database_session(ctx=ctx) - return ctx.database_session - -################## Basic Lookups #################################### - -def lookup_reagents(ctx:Settings, - reagent_type:str|models.ReagentType|None=None, - lot_number:str|None=None, - limit:int=0 - ) -> models.Reagent|List[models.Reagent]: - """ - Lookup a list of reagents from the database. - - Args: - ctx (Settings): Settings object passed down from gui - reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None. - lot_number (str | None, optional): Reagent lot number. Defaults to None. - limit (int, optional): limit of results returned. Defaults to 0. - - Returns: - models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter. - """ - query = setup_lookup(ctx=ctx, locals=locals()).query(models.Reagent) - match reagent_type: - case str(): - logger.debug(f"Looking up reagents by reagent type: {reagent_type}") - query = query.join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==reagent_type) - case models.ReagentType(): - logger.debug(f"Looking up reagents by reagent type: {reagent_type}") - query = query.filter(models.Reagent.type.contains(reagent_type)) - case _: - pass - match lot_number: - case str(): - logger.debug(f"Looking up reagent by lot number: {lot_number}") - query = query.filter(models.Reagent.lot==lot_number) - # In this case limit number returned. - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_kit_types(ctx:Settings, - name:str=None, - used_for:str|None=None, - id:int|None=None, - limit:int=0 - ) -> models.KitType|List[models.KitType]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitType) - match used_for: - case str(): - logger.debug(f"Looking up kit type by use: {used_for}") - query = query.filter(models.KitType.used_for.any(name=used_for)) - case _: - pass - match name: - case str(): - logger.debug(f"Looking up kit type by name: {name}") - query = query.filter(models.KitType.name==name) - limit = 1 - case _: - pass - match id: - case int(): - logger.debug(f"Looking up kit type by id: {id}") - query = query.filter(models.KitType.id==id) - limit = 1 - case str(): - logger.debug(f"Looking up kit type by id: {id}") - query = query.filter(models.KitType.id==int(id)) - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_reagent_types(ctx:Settings, - name: str|None=None, - kit_type: models.KitType|str|None=None, - reagent: models.Reagent|str|None=None, - limit:int=0, - ) -> models.ReagentType|List[models.ReagentType]: - """ - _summary_ - - Args: - ctx (Settings): Settings object passed down from gui. - name (str | None, optional): Reagent type name. Defaults to None. - limit (int, optional): limit of results to return. Defaults to 0. - - Returns: - models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter. - """ - query = setup_lookup(ctx=ctx, locals=locals()).query(models.ReagentType) - if (kit_type != None and reagent == None) or (reagent != None and kit_type == None): - raise ValueError("Cannot filter without both reagent and kit type.") - elif kit_type == None and reagent == None: - pass - else: - match kit_type: - case str(): - kit_type = lookup_kit_types(ctx=ctx, name=kit_type) - case _: - pass - match reagent: - case str(): - reagent = lookup_reagents(ctx=ctx, lot_number=reagent) - case _: - pass - assert reagent.type != [] - logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}") - logger.debug(f"Kit reagent types: {kit_type.reagent_types}") - # logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}") - result = list(set(kit_type.reagent_types).intersection(reagent.type)) - logger.debug(f"Result: {result}") - try: - return result[0] - except IndexError: - return result - match name: - case str(): - logger.debug(f"Looking up reagent type by name: {name}") - query = query.filter(models.ReagentType.name==name) - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_submissions(ctx:Settings, - submission_type:str|models.SubmissionType|None=None, - id:int|str|None=None, - rsl_number:str|None=None, - start_date:date|str|int|None=None, - end_date:date|str|int|None=None, - reagent:models.Reagent|str|None=None, - chronologic:bool=False, limit:int=0, - **kwargs - ) -> models.BasicSubmission | List[models.BasicSubmission]: - if submission_type == None: - model = models.BasicSubmission.find_subclasses(ctx=ctx, attrs=kwargs) - else: - if isinstance(submission_type, models.SubmissionType): - model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type.name) - else: - model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type) - query = setup_lookup(ctx=ctx, locals=locals()).query(model) - # by submission type - match submission_type: - case models.SubmissionType(): - logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}") - # query = query.filter(models.BasicSubmission.submission_type_name==submission_type.name) - query = query.filter(model.submission_type_name==submission_type.name) - case str(): - logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}") - # query = query.filter(models.BasicSubmission.submission_type_name==submission_type) - query = query.filter(model.submission_type_name==submission_type) - case _: - pass - # by date range - if start_date != None and end_date == None: - logger.warning(f"Start date with no end date, using today.") - end_date = date.today() - if end_date != None and start_date == None: - logger.warning(f"End date with no start date, using Jan 1, 2023") - start_date = date(2023, 1, 1) - if start_date != None: - match start_date: - case date(): - start_date = start_date.strftime("%Y-%m-%d") - case int(): - start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") - case _: - start_date = parse(start_date).strftime("%Y-%m-%d") - match end_date: - case date(): - end_date = end_date.strftime("%Y-%m-%d") - case int(): - end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d") - case _: - end_date = parse(end_date).strftime("%Y-%m-%d") - logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") - # query = query.filter(models.BasicSubmission.submitted_date.between(start_date, end_date)) - query = query.filter(model.submitted_date.between(start_date, end_date)) - # by reagent (for some reason) - match reagent: - case str(): - logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") - reagent = lookup_reagents(ctx=ctx, lot_number=reagent) - query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all() - case models.Reagent: - logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") - query = query.join(models.submissions.reagents_submissions).filter(models.submissions.reagents_submissions.c.reagent_id==reagent.id).all() - case _: - pass - # by rsl number (returns only a single value) - match rsl_number: - case str(): - # query = query.filter(models.BasicSubmission.rsl_plate_num==rsl_number) - query = query.filter(model.rsl_plate_num==rsl_number) - logger.debug(f"At this point the query gets: {query.all()}") - limit = 1 - case _: - pass - # by id (returns only a single value) - match id: - case int(): - logger.debug(f"Looking up BasicSubmission with id: {id}") - # query = query.filter(models.BasicSubmission.id==id) - query = query.filter(model.id==id) - limit = 1 - case str(): - logger.debug(f"Looking up BasicSubmission with id: {id}") - # query = query.filter(models.BasicSubmission.id==int(id)) - query = query.filter(model.id==int(id)) - limit = 1 - case _: - pass - for k, v in kwargs.items(): - attr = getattr(model, k) - logger.debug(f"Got attr: {attr}") - query = query.filter(attr==v) - if len(kwargs) > 0: - limit = 1 - if chronologic: - # query.order_by(models.BasicSubmission.submitted_date) - query.order_by(model.submitted_date) - # logger.debug(f"At the end of the search, the query gets: {query.all()}") - return query_return(query=query, limit=limit) - -def lookup_submission_type(ctx:Settings, - name:str|None=None, - limit:int=0 - ) -> models.SubmissionType|List[models.SubmissionType]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionType) - match name: - case str(): - logger.debug(f"Looking up submission type by name: {name}") - query = query.filter(models.SubmissionType.name==name) - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_organizations(ctx:Settings, - name:str|None=None, - limit:int=0, - ) -> models.Organization|List[models.Organization]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.Organization) - match name: - case str(): - logger.debug(f"Looking up organization with name: {name}") - query = query.filter(models.Organization.name==name) - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_discounts(ctx:Settings, - organization:models.Organization|str|int, - kit_type:models.KitType|str|int, - ) -> models.Discount|List[models.Discount]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.Discount) - match organization: - case models.Organization(): - logger.debug(f"Looking up discount with organization: {organization}") - organization = organization.id - case str(): - logger.debug(f"Looking up discount with organization: {organization}") - organization = lookup_organizations(ctx=ctx, name=organization).id - case int(): - logger.debug(f"Looking up discount with organization id: {organization}") - pass - case _: - raise ValueError(f"Invalid value for organization: {organization}") - match kit_type: - case models.KitType(): - logger.debug(f"Looking up discount with kit type: {kit_type}") - kit_type = kit_type.id - case str(): - logger.debug(f"Looking up discount with kit type: {kit_type}") - kit_type = lookup_kit_types(ctx=ctx, name=kit_type).id - case int(): - logger.debug(f"Looking up discount with kit type id: {organization}") - pass - case _: - raise ValueError(f"Invalid value for kit type: {kit_type}") - return query.join(models.KitType).join(models.Organization).filter(and_( - models.KitType.id==kit_type, - models.Organization.id==organization - )).all() - -def lookup_controls(ctx:Settings, - control_type:models.ControlType|str|None=None, - start_date:date|str|int|None=None, - end_date:date|str|int|None=None, - control_name:str|None=None, - limit:int=0 - ) -> models.Control|List[models.Control]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.Control) - # by control type - match control_type: - case models.ControlType(): - logger.debug(f"Looking up control by control type: {control_type}") - query = query.join(models.ControlType).filter(models.ControlType==control_type) - case str(): - logger.debug(f"Looking up control by control type: {control_type}") - query = query.join(models.ControlType).filter(models.ControlType.name==control_type) - case _: - pass - # by date range - if start_date != None and end_date == None: - logger.warning(f"Start date with no end date, using today.") - end_date = date.today() - if end_date != None and start_date == None: - logger.warning(f"End date with no start date, using Jan 1, 2023") - start_date = date(2023, 1, 1) - if start_date != None: - match start_date: - case date(): - start_date = start_date.strftime("%Y-%m-%d") - case int(): - start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") - case _: - start_date = parse(start_date).strftime("%Y-%m-%d") - match end_date: - case date(): - end_date = end_date.strftime("%Y-%m-%d") - case int(): - end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d") - case _: - end_date = parse(end_date).strftime("%Y-%m-%d") - logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") - query = query.filter(models.Control.submitted_date.between(start_date, end_date)) - match control_name: - case str(): - query = query.filter(models.Control.name.startswith(control_name)) - limit = 1 - case _: - pass - return query_return(query=query, limit=limit) - -def lookup_control_types(ctx:Settings, limit:int=0) -> models.ControlType|List[models.ControlType]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.ControlType) - return query_return(query=query, limit=limit) - -def lookup_samples(ctx:Settings, - submitter_id:str|None=None, - sample_type:str|None=None, - limit:int=0, - **kwargs - ) -> models.BasicSample|models.WastewaterSample|List[models.BasicSample]: - logger.debug(f"Length of kwargs: {len(kwargs)}") - # model = models.find_subclasses(parent=models.BasicSample, attrs=kwargs) - model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs) - query = setup_lookup(ctx=ctx, locals=locals()).query(model) - match submitter_id: - case str(): - logger.debug(f"Looking up {model} with submitter id: {submitter_id}") - query = query.filter(models.BasicSample.submitter_id==submitter_id) - limit = 1 - case _: - pass - match sample_type: - case str(): - logger.debug(f"Looking up {model} with sample type: {sample_type}") - query = query.filter(models.BasicSample.sample_type==sample_type) - case _: - pass - for k, v in kwargs.items(): - attr = getattr(model, k) - logger.debug(f"Got attr: {attr}") - query = query.filter(attr==v) - if len(kwargs) > 0: - limit = 1 - return query_return(query=query, limit=limit) - -def lookup_reagenttype_kittype_association(ctx:Settings, - kit_type:models.KitType|str|None, - reagent_type:models.ReagentType|str|None, - limit:int=0 - ) -> models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.KitTypeReagentTypeAssociation) - match kit_type: - case models.KitType(): - query = query.filter(models.KitTypeReagentTypeAssociation.kit_type==kit_type) - case str(): - query = query.join(models.KitType).filter(models.KitType.name==kit_type) - case _: - pass - match reagent_type: - case models.ReagentType(): - query = query.filter(models.KitTypeReagentTypeAssociation.reagent_type==reagent_type) - case str(): - query = query.join(models.ReagentType).filter(models.ReagentType.name==reagent_type) - case _: - pass - if kit_type != None and reagent_type != None: - limit = 1 - return query_return(query=query, limit=limit) - -def lookup_submission_sample_association(ctx:Settings, - submission:models.BasicSubmission|str|None=None, - sample:models.BasicSample|str|None=None, - row:int=0, - column:int=0, - limit:int=0, - chronologic:bool=False - ) -> models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]: - query = setup_lookup(ctx=ctx, locals=locals()).query(models.SubmissionSampleAssociation) - match submission: - case models.BasicSubmission(): - query = query.filter(models.SubmissionSampleAssociation.submission==submission) - case str(): - query = query.join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==submission) - case _: - pass - match sample: - case models.BasicSample(): - query = query.filter(models.SubmissionSampleAssociation.sample==sample) - case str(): - query = query.join(models.BasicSample).filter(models.BasicSample.submitter_id==sample) - case _: - pass - if row > 0: - query = query.filter(models.SubmissionSampleAssociation.row==row) - if column > 0: - query = query.filter(models.SubmissionSampleAssociation.column==column) - logger.debug(f"Query count: {query.count()}") - if chronologic: - query.join(models.BasicSubmission).order_by(models.BasicSubmission.submitted_date) - if query.count() <= 1: - limit = 1 - return query_return(query=query, limit=limit) - -def lookup_modes(ctx:Settings) -> List[str]: - rel = ctx.database_session.query(models.Control).first() - try: - cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)] - except AttributeError as e: - logger.debug(f"Failed to get available modes from db: {e}") - cols = [] - return cols - -############### Complex Lookups ################################### - -def lookup_sub_samp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str|models.BasicSample, rsl_sample_num:str|models.BasicSubmission) -> models.WastewaterAssociation: - """ - _summary_ - - Args: - ctx (Settings): _description_ - rsl_plate_num (str): _description_ - sample_submitter_id (_type_): _description_ - - Returns: - models.SubmissionSampleAssociation: _description_ - """ - # logger.debug(f"{type(rsl_plate_num)}, {type(rsl_sample_num)}") - match rsl_plate_num: - case models.BasicSubmission()|models.Wastewater(): - # logger.debug(f"Model for rsl_plate_num: {rsl_plate_num}") - first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\ - .filter(models.SubmissionSampleAssociation.submission==rsl_plate_num) - case str(): - # logger.debug(f"String for rsl_plate_num: {rsl_plate_num}") - first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\ - .join(models.BasicSubmission)\ - .filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num) - case _: - logger.error(f"Unknown case for rsl_plate_num {rsl_plate_num}") - match rsl_sample_num: - case models.BasicSample()|models.WastewaterSample(): - # logger.debug(f"Model for rsl_sample_num: {rsl_sample_num}") - second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num) - # case models.WastewaterSample: - # second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num) - case str(): - # logger.debug(f"String for rsl_sample_num: {rsl_sample_num}") - second_query = first_query.join(models.BasicSample)\ - .filter(models.BasicSample.submitter_id==rsl_sample_num) - case _: - logger.error(f"Unknown case for rsl_sample_num {rsl_sample_num}") - try: - return second_query.first() - except UnboundLocalError: - logger.error(f"Couldn't construct second query") - return None \ No newline at end of file diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 3ad4a23..22128d6 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -1,45 +1,10 @@ ''' Contains all models for sqlalchemy ''' -from sqlalchemy.orm import declarative_base, DeclarativeMeta -import logging - -Base: DeclarativeMeta = declarative_base() -metadata = Base.metadata - -logger = logging.getLogger(f"submissions.{__name__}") - -# def find_subclasses(parent:Any, attrs:dict|None=None, rsl_number:str|None=None) -> Any: -# """ -# Finds subclasses of a parent that does contain all -# attributes if the parent does not. -# NOTE: Depreciated, moved to classmethods in individual base models. - -# Args: -# parent (_type_): Parent class. -# attrs (dict): Key:Value dictionary of attributes - -# Raises: -# AttributeError: Raised if no subclass is found. - -# Returns: -# _type_: Parent or subclass. -# """ -# if len(attrs) == 0 or attrs == None: -# return parent -# if any([not hasattr(parent, attr) for attr in attrs]): -# # looks for first model that has all included kwargs -# try: -# model = [subclass for subclass in parent.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] -# except IndexError as e: -# raise AttributeError(f"Couldn't find existing class/subclass of {parent} with all attributes:\n{pformat(attrs)}") -# else: -# model = parent -# logger.debug(f"Using model: {model}") -# return model - from .controls import Control, ControlType -from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation +# import order must go: orgs, kit, subs due to circular import issues from .organizations import Organization, Contact -from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, BasicSample, SubmissionSampleAssociation, WastewaterAssociation +from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation +from .submissions import (BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, + BasicSample, SubmissionSampleAssociation, WastewaterAssociation) diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index 99aec75..a92568e 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -1,12 +1,16 @@ ''' All control related models. ''' -from . import Base +from __future__ import annotations from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey -from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship, Query import logging from operator import itemgetter import json +from tools import Base, setup_lookup, query_return +from datetime import date, datetime +from typing import List +from dateutil.parser import parse logger = logging.getLogger(f"submissions.{__name__}") @@ -21,6 +25,31 @@ class ControlType(Base): targets = Column(JSON) #: organisms checked for instances = relationship("Control", back_populates="controltype") #: control samples created of this type. + @classmethod + @setup_lookup + def query(cls, + name:str=None, + limit:int=0 + ) -> ControlType|List[ControlType]: + """ + Lookup control archetypes in the database + + Args: + ctx (Settings): Settings object passed down from gui. + name (str, optional): Control type name (limits results to 1). Defaults to None. + limit (int, optional): Maximum number of results to return. Defaults to 0. + + Returns: + models.ControlType|List[models.ControlType]: ControlType(s) of interest. + """ + query = cls.metadata.session.query(cls) + match name: + case str(): + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) class Control(Base): """ @@ -136,3 +165,88 @@ class Control(Base): data = {} return data + @classmethod + @setup_lookup + def query(cls, + control_type:ControlType|str|None=None, + start_date:date|str|int|None=None, + end_date:date|str|int|None=None, + control_name:str|None=None, + limit:int=0 + ) -> Control|List[Control]: + """ + Lookup control objects in the database based on a number of parameters. + + Args: + control_type (models.ControlType | str | None, optional): Control archetype. Defaults to None. + start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None. + end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None. + control_name (str | None, optional): Name of control. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + models.Control|List[models.Control]: Control object of interest. + """ + query: Query = cls.metadata.session.query(cls) + # by control type + match control_type: + case ControlType(): + logger.debug(f"Looking up control by control type: {control_type}") + # query = query.join(models.ControlType).filter(models.ControlType==control_type) + query = query.filter(cls.controltype==control_type) + case str(): + logger.debug(f"Looking up control by control type: {control_type}") + query = query.join(ControlType).filter(ControlType.name==control_type) + case _: + pass + # by date range + if start_date != None and end_date == None: + logger.warning(f"Start date with no end date, using today.") + end_date = date.today() + if end_date != None and start_date == None: + logger.warning(f"End date with no start date, using Jan 1, 2023") + start_date = date(2023, 1, 1) + if start_date != None: + match start_date: + case date(): + start_date = start_date.strftime("%Y-%m-%d") + case int(): + start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") + case _: + start_date = parse(start_date).strftime("%Y-%m-%d") + match end_date: + case date(): + end_date = end_date.strftime("%Y-%m-%d") + case int(): + end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d") + case _: + end_date = parse(end_date).strftime("%Y-%m-%d") + logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") + query = query.filter(cls.submitted_date.between(start_date, end_date)) + match control_name: + case str(): + query = query.filter(cls.name.startswith(control_name)) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) + + @classmethod + def get_modes(cls): + """ + Get all control modes from database + + Args: + ctx (Settings): Settings object passed down from gui. + + Returns: + List[str]: List of control mode names. + """ + rel = cls.metadata.session.query(cls).first() + try: + cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)] + except AttributeError as e: + logger.debug(f"Failed to get available modes from db: {e}") + cols = [] + return cols + diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 10e62df..b9b06e0 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -1,19 +1,20 @@ ''' All kit and reagent related models ''' -from . import Base +from __future__ import annotations from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT -from sqlalchemy.orm import relationship, validates +from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.ext.associationproxy import association_proxy from datetime import date import logging -from tools import Settings, check_authorization +from tools import Settings, check_authorization, Base, setup_lookup, query_return +from typing import List +from . import Organization logger = logging.getLogger(f'submissions.{__name__}') reagenttypes_reagents = Table("_reagenttypes_reagents", Base.metadata, Column("reagent_id", INTEGER, ForeignKey("_reagents.id")), Column("reagenttype_id", INTEGER, ForeignKey("_reagent_types.id"))) - class KitType(Base): """ Base of kits used in submission processing @@ -102,9 +103,59 @@ class KitType(Base): return map @check_authorization - def save(self, ctx:Settings): - ctx.database_session.add(self) - ctx.database_session.commit() + def save(self): + self.metadata.session.add(self) + self.metadata.session.commit() + + @classmethod + @setup_lookup + def query(cls, + name:str=None, + used_for:str|SubmissionType|None=None, + id:int|None=None, + limit:int=0 + ) -> KitType|List[KitType]: + """ + Lookup a list of or single KitType. + + Args: + ctx (Settings): Settings object passed down from gui + name (str, optional): Name of desired kit (returns single instance). Defaults to None. + used_for (str | models.Submissiontype | None, optional): Submission type the kit is used for. Defaults to None. + id (int | None, optional): Kit id in the database. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + models.KitType|List[models.KitType]: KitType(s) of interest. + """ + query: Query = cls.metadata.session.query(cls) + match used_for: + case str(): + logger.debug(f"Looking up kit type by use: {used_for}") + query = query.filter(cls.used_for.any(name=used_for)) + case SubmissionType(): + query = query.filter(cls.used_for.contains(used_for)) + case _: + pass + match name: + case str(): + logger.debug(f"Looking up kit type by name: {name}") + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + match id: + case int(): + logger.debug(f"Looking up kit type by id: {id}") + query = query.filter(cls.id==id) + limit = 1 + case str(): + logger.debug(f"Looking up kit type by id: {id}") + query = query.filter(cls.id==int(id)) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) class ReagentType(Base): """ @@ -140,6 +191,60 @@ class ReagentType(Base): def __repr__(self): return f"ReagentType({self.name})" + @classmethod + @setup_lookup + def query(cls, + name: str|None=None, + kit_type: KitType|str|None=None, + reagent: Reagent|str|None=None, + limit:int=0, + ) -> ReagentType|List[ReagentType]: + """ + Lookup reagent types in the database. + + Args: + ctx (Settings): Settings object passed down from gui. + name (str | None, optional): Reagent type name. Defaults to None. + limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0. + + Returns: + models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter. + """ + query: Query = cls.metadata.session.query(cls) + if (kit_type != None and reagent == None) or (reagent != None and kit_type == None): + raise ValueError("Cannot filter without both reagent and kit type.") + elif kit_type == None and reagent == None: + pass + else: + match kit_type: + case str(): + kit_type = KitType.query(name=kit_type) + case _: + pass + match reagent: + case str(): + reagent = Reagent.query(lot_number=reagent) + case _: + pass + assert reagent.type != [] + logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}") + logger.debug(f"Kit reagent types: {kit_type.reagent_types}") + # logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}") + result = list(set(kit_type.reagent_types).intersection(reagent.type)) + logger.debug(f"Result: {result}") + try: + return result[0] + except IndexError: + return None + match name: + case str(): + logger.debug(f"Looking up reagent type by name: {name}") + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) + class KitTypeReagentTypeAssociation(Base): """ table containing reagenttype/kittype associations @@ -178,6 +283,49 @@ class KitTypeReagentTypeAssociation(Base): if not isinstance(value, ReagentType): raise ValueError(f'{value} is not a reagenttype') return value + + @classmethod + @setup_lookup + def query(cls, + kit_type:KitType|str|None, + reagent_type:ReagentType|str|None, + limit:int=0 + ) -> KitTypeReagentTypeAssociation|List[KitTypeReagentTypeAssociation]: + """ + Lookup junction of ReagentType and KitType + + Args: + ctx (Settings): Settings object passed down from gui. + kit_type (models.KitType | str | None): KitType of interest. + reagent_type (models.ReagentType | str | None): ReagentType of interest. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest. + """ + query: Query = cls.metadata.session.query(cls) + match kit_type: + case KitType(): + query = query.filter(cls.kit_type==kit_type) + case str(): + query = query.join(KitType).filter(KitType.name==kit_type) + case _: + pass + match reagent_type: + case ReagentType(): + query = query.filter(cls.reagent_type==reagent_type) + case str(): + query = query.join(ReagentType).filter(ReagentType.name==reagent_type) + case _: + pass + if kit_type != None and reagent_type != None: + limit = 1 + return query_return(query=query, limit=limit) + + def save(self): + self.metadata.session.add(self) + self.metadata.session.commit() + return None class Reagent(Base): """ @@ -199,7 +347,6 @@ class Reagent(Base): else: return f"" - def __str__(self) -> str: """ string representing this object @@ -209,7 +356,6 @@ class Reagent(Base): """ return str(self.lot) - def to_sub_dict(self, extraction_kit:KitType=None) -> dict: """ dictionary containing values necessary for gui @@ -282,10 +428,47 @@ class Reagent(Base): "expiry": self.expiry.strftime("%Y-%m-%d") } - def save(self, ctx:Settings): - ctx.database_session.add(self) - ctx.database_session.commit() + def save(self): + self.metadata.session.add(self) + self.metadata.session.commit() + @classmethod + @setup_lookup + def query(cls, reagent_type:str|ReagentType|None=None, + lot_number:str|None=None, + limit:int=0 + ) -> Reagent|List[Reagent]: + """ + Lookup a list of reagents from the database. + + Args: + ctx (Settings): Settings object passed down from gui + reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None. + lot_number (str | None, optional): Reagent lot number. Defaults to None. + limit (int, optional): limit of results returned. Defaults to 0. + + Returns: + models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter. + """ + query: Query = cls.metadata.session.query(cls) + match reagent_type: + case str(): + logger.debug(f"Looking up reagents by reagent type: {reagent_type}") + query = query.join(cls.type, aliased=True).filter(ReagentType.name==reagent_type) + case ReagentType(): + logger.debug(f"Looking up reagents by reagent type: {reagent_type}") + query = query.filter(cls.type.contains(reagent_type)) + case _: + pass + match lot_number: + case str(): + logger.debug(f"Looking up reagent by lot number: {lot_number}") + query = query.filter(cls.lot==lot_number) + # In this case limit number returned. + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) class Discount(Base): """ @@ -303,6 +486,56 @@ class Discount(Base): def __repr__(self) -> str: return f"" + + @classmethod + @setup_lookup + def query(cls, + organization:Organization|str|int|None=None, + kit_type:KitType|str|int|None=None, + ) -> Discount|List[Discount]: + """ + Lookup discount objects (union of kit and organization) + + Args: + ctx (Settings): Settings object passed down from the gui. + organization (models.Organization | str | int): Organization receiving discount. + kit_type (models.KitType | str | int): Kit discount received on. + + Raises: + ValueError: Invalid Organization + ValueError: Invalid kit. + + Returns: + models.Discount|List[models.Discount]: Discount(s) of interest. + """ + query: Query = cls.metadata.session.query(cls) + match organization: + case Organization(): + logger.debug(f"Looking up discount with organization: {organization}") + query = query.filter(cls.client==Organization) + case str(): + logger.debug(f"Looking up discount with organization: {organization}") + query = query.join(Organization).filter(Organization.name==organization) + case int(): + logger.debug(f"Looking up discount with organization id: {organization}") + query = query.join(Organization).filter(Organization.id==organization) + case _: + # raise ValueError(f"Invalid value for organization: {organization}") + pass + match kit_type: + case KitType(): + logger.debug(f"Looking up discount with kit type: {kit_type}") + query = query.filter(cls.kit==kit_type) + case str(): + logger.debug(f"Looking up discount with kit type: {kit_type}") + query = query.join(KitType).filter(KitType.name==kit_type) + case int(): + logger.debug(f"Looking up discount with kit type id: {organization}") + query = query.join(KitType).filter(KitType.id==kit_type) + case _: + # raise ValueError(f"Invalid value for kit type: {kit_type}") + pass + return query.all() class SubmissionType(Base): """ @@ -326,8 +559,34 @@ class SubmissionType(Base): def __repr__(self) -> str: return f"" - + @classmethod + @setup_lookup + def query(cls, + name:str|None=None, + limit:int=0 + ) -> SubmissionType|List[SubmissionType]: + """ + Lookup submission type in the database by a number of parameters + + Args: + ctx (Settings): Settings object passed down from gui + name (str | None, optional): Name of submission type. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + models.SubmissionType|List[models.SubmissionType]: SubmissionType(s) of interest. + """ + query: Query = cls.metadata.session.query(cls) + match name: + case str(): + logger.debug(f"Looking up submission type by name: {name}") + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) + class SubmissionTypeKitTypeAssociation(Base): """ Abstract of relationship between kits and their submission type. @@ -352,7 +611,39 @@ class SubmissionTypeKitTypeAssociation(Base): self.constant_cost = 0.00 def __repr__(self) -> str: - return f"" def set_attrib(self, name, value): - self.__setattr__(name, value) \ No newline at end of file + self.__setattr__(name, value) + + @classmethod + @setup_lookup + def query(cls, + submission_type:SubmissionType|str|int|None=None, + kit_type:KitType|str|int|None=None, + limit:int=0 + ): + query: Query = cls.metadata.session.query(cls) + match submission_type: + case SubmissionType(): + logger.debug(f"Looking up {cls.__name__} by SubmissionType {submission_type}") + query = query.filter(cls.submission_type==submission_type) + case str(): + logger.debug(f"Looking up {cls.__name__} by name {submission_type}") + query = query.join(SubmissionType).filter(SubmissionType.name==submission_type) + case int(): + logger.debug(f"Looking up {cls.__name__} by id {submission_type}") + query = query.join(SubmissionType).filter(SubmissionType.id==submission_type) + match kit_type: + case KitType(): + logger.debug(f"Looking up {cls.__name__} by KitType {kit_type}") + query = query.filter(cls.kit_type==kit_type) + case str(): + logger.debug(f"Looking up {cls.__name__} by name {kit_type}") + query = query.join(KitType).filter(KitType.name==kit_type) + case int(): + logger.debug(f"Looking up {cls.__name__} by id {kit_type}") + query = query.join(KitType).filter(KitType.id==kit_type) + limit = query.count() + return query_return(query=query, limit=limit) + diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py index e1baa1b..d54e773 100644 --- a/src/submissions/backend/db/models/organizations.py +++ b/src/submissions/backend/db/models/organizations.py @@ -1,14 +1,18 @@ ''' All client organization related models. ''' -from . import Base +from __future__ import annotations from sqlalchemy import Column, String, INTEGER, ForeignKey, Table -from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship, Query +from tools import Base, check_authorization, setup_lookup, query_return +from typing import List +import logging + +logger = logging.getLogger(f"submissions.{__name__}") # table containing organization/contact relationship orgs_contacts = Table("_orgs_contacts", Base.metadata, Column("org_id", INTEGER, ForeignKey("_organizations.id")), Column("contact_id", INTEGER, ForeignKey("_contacts.id"))) - class Organization(Base): """ Base of organization @@ -33,6 +37,7 @@ class Organization(Base): def __repr__(self) -> str: return f"" + @check_authorization def save(self, ctx): ctx.database_session.add(self) ctx.database_session.commit() @@ -40,6 +45,31 @@ class Organization(Base): def set_attribute(self, name:str, value): setattr(self, name, value) + @classmethod + @setup_lookup + def query(cls, + name:str|None=None, + limit:int=0, + ) -> Organization|List[Organization]: + """ + Lookup organizations in the database by a number of parameters. + + Args: + name (str | None, optional): Name of the organization. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + Organization|List[Organization]: _description_ + """ + query: Query = cls.metadata.session.query(cls) + match name: + case str(): + logger.debug(f"Looking up organization with name: {name}") + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) class Contact(Base): """ @@ -56,3 +86,44 @@ class Contact(Base): def __repr__(self) -> str: return f"" + @classmethod + @setup_lookup + def query(cls, + name:str|None=None, + email:str|None=None, + phone:str|None=None, + limit:int=0, + ) -> Contact|List[Contact]: + """ + Lookup contacts in the database by a number of parameters. + + Args: + name (str | None, optional): Name of the contact. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + Contact|List[Contact]: _description_ + """ + query: Query = cls.metadata.session.query(cls) + match name: + case str(): + logger.debug(f"Looking up contact with name: {name}") + query = query.filter(cls.name==name) + limit = 1 + case _: + pass + match email: + case str(): + logger.debug(f"Looking up contact with email: {name}") + query = query.filter(cls.email==email) + limit = 1 + case _: + pass + match phone: + case str(): + logger.debug(f"Looking up contact with phone: {name}") + query = query.filter(cls.phone==phone) + limit = 1 + case _: + pass + return query_return(query=query, limit=limit) \ No newline at end of file diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index aec5071..f6ec668 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -1,25 +1,29 @@ ''' Models for the main submission types. ''' +from __future__ import annotations from getpass import getuser import math from pprint import pformat -from . import Base +from . import Reagent, SubmissionType from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, case -from sqlalchemy.orm import relationship, validates +from sqlalchemy.orm import relationship, validates, Query import logging import json from json.decoder import JSONDecodeError from math import ceil from sqlalchemy.ext.associationproxy import association_proxy import uuid -from dateutil.parser import parse import re import pandas as pd from openpyxl import Workbook -from tools import check_not_nan, row_map, Settings -from pathlib import Path -from datetime import datetime +from tools import check_not_nan, row_map, Base, query_return, setup_lookup +from datetime import datetime, date +from typing import List +from dateutil.parser import parse +import yaml +from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError +from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError logger = logging.getLogger(f"submissions.{__name__}") @@ -299,7 +303,7 @@ class BasicSubmission(Base): return input_excel @classmethod - def enforce_name(cls, ctx:Settings, instr:str) -> str: + def enforce_name(cls, instr:str) -> str: logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") logger.debug(f"Attempting enforcement on {instr}") return instr @@ -311,7 +315,7 @@ class BasicSubmission(Base): return regex @classmethod - def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, submission_type:str|None=None): + def find_subclasses(cls, attrs:dict|None=None, submission_type:str|None=None): if submission_type != None: return cls.find_polymorphic_subclass(submission_type) if len(attrs) == 0 or attrs == None: @@ -331,24 +335,156 @@ class BasicSubmission(Base): def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None): if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] - if polymorphic_identity == None: - return cls - else: + if polymorphic_identity != None: try: - return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + cls = [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + logger.info(f"Recruiting: {cls}") except Exception as e: logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") - return cls + return cls @classmethod def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list: logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") return [] - def save(self, ctx:Settings): + def save(self): self.uploaded_by = getuser() - ctx.database_session.add(self) - ctx.database_session.commit() + self.metadata.session.add(self) + self.metadata.session.commit() + return None + + def delete(self): + backup = self.to_dict() + try: + with open(self.metadata.backup_path.joinpath(f"{self.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: + yaml.dump(backup, f) + except KeyError: + pass + self.metadata.database_session.delete(self) + try: + self.metadata.session.commit() + except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e: + self.metadata.session.rollback() + raise e + + @classmethod + @setup_lookup + def query(cls, + submission_type:str|SubmissionType|None=None, + id:int|str|None=None, + rsl_number:str|None=None, + start_date:date|str|int|None=None, + end_date:date|str|int|None=None, + reagent:Reagent|str|None=None, + chronologic:bool=False, limit:int=0, + **kwargs + ) -> BasicSubmission | List[BasicSubmission]: + """ + Lookup submissions based on a number of parameters. + + Args: + submission_type (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None. + id (int | str | None, optional): Submission id in the database (limits results to 1). Defaults to None. + rsl_number (str | None, optional): Submission name in the database (limits results to 1). Defaults to None. + start_date (date | str | int | None, optional): Beginning date to search by. Defaults to None. + end_date (date | str | int | None, optional): Ending date to search by. Defaults to None. + reagent (models.Reagent | str | None, optional): A reagent used in the submission. Defaults to None. + chronologic (bool, optional): Return results in chronologic order. Defaults to False. + limit (int, optional): Maximum number of results to return. Defaults to 0. + + Returns: + models.BasicSubmission | List[models.BasicSubmission]: Submission(s) of interest + """ + # NOTE: if you go back to using 'model' change the appropriate cls to model in the query filters + if submission_type == None: + model = cls.find_subclasses(attrs=kwargs) + else: + if isinstance(submission_type, SubmissionType): + model = cls.find_subclasses(submission_type=submission_type.name) + else: + model = cls.find_subclasses(submission_type=submission_type) + # query: Query = setup_lookup(ctx=ctx, locals=locals()).query(model) + query: Query = cls.metadata.session.query(model) + # by submission type + # match submission_type: + # case SubmissionType(): + # logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}") + # query = query.filter(model.submission_type_name==submission_type.name) + # case str(): + # logger.debug(f"Looking up BasicSubmission with submission type: {submission_type}") + # query = query.filter(model.submission_type_name==submission_type) + # case _: + # pass + # by date range + if start_date != None and end_date == None: + logger.warning(f"Start date with no end date, using today.") + end_date = date.today() + if end_date != None and start_date == None: + logger.warning(f"End date with no start date, using Jan 1, 2023") + start_date = date(2023, 1, 1) + if start_date != None: + match start_date: + case date(): + start_date = start_date.strftime("%Y-%m-%d") + case int(): + start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") + case _: + start_date = parse(start_date).strftime("%Y-%m-%d") + match end_date: + case date(): + end_date = end_date.strftime("%Y-%m-%d") + case int(): + end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d") + case _: + end_date = parse(end_date).strftime("%Y-%m-%d") + logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") + query = query.filter(cls.submitted_date.between(start_date, end_date)) + # by reagent (for some reason) + match reagent: + case str(): + logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") + # reagent = Reagent.query(lot_number=reagent) + # query = query.join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id) + query = query.join(cls.reagents).filter(Reagent.lot==reagent) + case Reagent(): + logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") + query = query.join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id) + case _: + pass + # by rsl number (returns only a single value) + match rsl_number: + case str(): + query = query.filter(cls.rsl_plate_num==rsl_number) + logger.debug(f"At this point the query gets: {query.all()}") + limit = 1 + case _: + pass + # by id (returns only a single value) + match id: + case int(): + logger.debug(f"Looking up BasicSubmission with id: {id}") + query = query.filter(cls.id==id) + limit = 1 + case str(): + logger.debug(f"Looking up BasicSubmission with id: {id}") + query = query.filter(cls.id==int(id)) + limit = 1 + case _: + pass + for k, v in kwargs.items(): + attr = getattr(cls, k) + logger.debug(f"Got attr: {attr}") + query = query.filter(attr==v) + if len(kwargs) > 0: + limit = 1 + if chronologic: + query.order_by(cls.submitted_date) + return query_return(query=query, limit=limit) + + @classmethod + def filename_template(cls): + return "{{ rsl_plate_num }}" # Below are the custom submission types @@ -415,9 +551,9 @@ class BacterialCulture(BasicSubmission): return input_excel @classmethod - def enforce_name(cls, ctx:Settings, instr:str) -> str: - outstr = super().enforce_name(ctx=ctx, instr=instr) - def construct(ctx) -> str: + def enforce_name(cls, instr:str) -> str: + outstr = super().enforce_name(instr=instr) + def construct() -> str: """ DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 @@ -426,7 +562,8 @@ class BacterialCulture(BasicSubmission): """ logger.debug(f"Attempting to construct RSL number from scratch...") # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") - directory = Path(ctx.directory_path).joinpath("Bacteria") + # directory = Path(ctx.directory_path).joinpath("Bacteria") + directory = cls.metadata.directory_path.joinpath("Bacteria") year = str(datetime.now().year)[-2:] if directory.exists(): logger.debug(f"Year: {year}") @@ -449,7 +586,7 @@ class BacterialCulture(BasicSubmission): try: outstr = re.sub(r"RSL(\d{2})", r"RSL-\1", outstr, flags=re.IGNORECASE) except (AttributeError, TypeError) as e: - outstr = construct(ctx=ctx) + outstr = construct() # year = datetime.now().year # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" return re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", outstr, flags=re.IGNORECASE) @@ -458,6 +595,12 @@ class BacterialCulture(BasicSubmission): def get_regex(cls): return "(?PRSL-?\\d{2}-?\\d{4})" + @classmethod + def filename_template(cls): + template = super().filename_template() + template += "_{{ submitting_lab }}_{{ submitter_plate_num }}" + return template + class Wastewater(BasicSubmission): """ derivative submission type from BasicSubmission @@ -537,8 +680,8 @@ class Wastewater(BasicSubmission): return samples @classmethod - def enforce_name(cls, ctx:Settings, instr:str) -> str: - outstr = super().enforce_name(ctx=ctx, instr=instr) + def enforce_name(cls, instr:str) -> str: + outstr = super().enforce_name(instr=instr) def construct(): today = datetime.now() return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" @@ -620,8 +763,8 @@ class WastewaterArtic(BasicSubmission): return input_dict @classmethod - def enforce_name(cls, ctx:Settings, instr:str) -> str: - outstr = super().enforce_name(ctx=ctx, instr=instr) + def enforce_name(cls, instr:str) -> str: + outstr = super().enforce_name(instr=instr) def construct(): today = datetime.now() return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" @@ -727,7 +870,7 @@ class BasicSample(Base): return dict(name=self.submitter_id[:10], positive=False, tooltip=tooltip_text) @classmethod - def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, rsl_number:str|None=None): + def find_subclasses(cls, attrs:dict|None=None, rsl_number:str|None=None): if len(attrs) == 0 or attrs == None: return cls if any([not hasattr(cls, attr) for attr in attrs]): @@ -737,7 +880,7 @@ class BasicSample(Base): except IndexError as e: raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") else: - model = cls + return cls logger.debug(f"Using model: {model}") return model @@ -758,6 +901,51 @@ class BasicSample(Base): def parse_sample(cls, input_dict:dict) -> dict: # logger.debug(f"Called {cls.__name__} sample parser") return input_dict + + @classmethod + @setup_lookup + def query(cls, + submitter_id:str|None=None, + # sample_type:str|None=None, + limit:int=0, + **kwargs + ) -> BasicSample|List[BasicSample]: + """ + Lookup samples in the database by a number of parameters. + + Args: + ctx (Settings): Settings object passed down from gui + submitter_id (str | None, optional): Name of the sample (limits results to 1). Defaults to None. + sample_type (str | None, optional): Sample type. Defaults to None. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + models.BasicSample|List[models.BasicSample]: Sample(s) of interest. + """ + logger.debug(f"Length of kwargs: {len(kwargs)}") + # model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs) + # query: Query = setup_lookup(ctx=ctx, locals=locals()).query(model) + query: Query = cls.metadata.session.query(cls) + match submitter_id: + case str(): + logger.debug(f"Looking up {cls} with submitter id: {submitter_id}") + query = query.filter(cls.submitter_id==submitter_id) + limit = 1 + case _: + pass + # match sample_type: + # case str(): + # logger.debug(f"Looking up {model} with sample type: {sample_type}") + # query = query.filter(models.BasicSample.sample_type==sample_type) + # case _: + # pass + for k, v in kwargs.items(): + attr = getattr(cls, k) + logger.debug(f"Got attr: {attr}") + query = query.filter(attr==v) + if len(kwargs) > 0: + limit = 1 + return query_return(query=query, limit=limit) class WastewaterSample(BasicSample): """ @@ -772,53 +960,6 @@ class WastewaterSample(BasicSample): sample_location = Column(String(8)) #: location on 24 well plate __mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"} - - # @validates("collected-date") - # def convert_cdate_time(self, key, value): - # logger.debug(f"Validating {key}: {value}") - # if isinstance(value, Timestamp): - # return value.date() - # if isinstance(value, str): - # return parse(value) - # return value - - # @validates("rsl_number") - # def use_submitter_id(self, key, value): - # logger.debug(f"Validating {key}: {value}") - # return value or self.submitter_id - - # def set_attribute(self, name:str, value): - # """ - # Set an attribute of this object. Extends parent. - - # Args: - # name (str): name of the attribute - # value (_type_): value to be set - # """ - # # Due to the plate map being populated with RSL numbers, we have to do some shuffling. - # match name: - # case "submitter_id": - # # If submitter_id already has a value, stop - # if self.submitter_id != None: - # return - # # otherwise also set rsl_number to the same value - # else: - # super().set_attribute("rsl_number", value) - # case "ww_full_sample_id": - # # If value present, set ww_full_sample_id and make this the submitter_id - # if value != None: - # super().set_attribute(name, value) - # name = "submitter_id" - # case 'collection_date': - # # If this is a string use dateutils to parse into date() - # if isinstance(value, str): - # logger.debug(f"collection_date {value} is a string. Attempting parse...") - # value = parse(value) - # case "rsl_number": - # if value == None: - # value = self.submitter_id - # super().set_attribute(name, value) - def to_hitpick(self, submission_rsl:str) -> dict|None: """ Outputs a dictionary usable for html plate maps. Extends parent method. @@ -924,6 +1065,61 @@ class SubmissionSampleAssociation(Base): except Exception as e: logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") return cls + + @classmethod + @setup_lookup + def query(cls, + submission:BasicSubmission|str|None=None, + sample:BasicSample|str|None=None, + row:int=0, + column:int=0, + limit:int=0, + chronologic:bool=False + ) -> SubmissionSampleAssociation|List[SubmissionSampleAssociation]: + """ + Lookup junction of Submission and Sample in the database + + Args: + submission (models.BasicSubmission | str | None, optional): Submission of interest. Defaults to None. + sample (models.BasicSample | str | None, optional): Sample of interest. Defaults to None. + row (int, optional): Row of the sample location on submission plate. Defaults to 0. + column (int, optional): Column of the sample location on the submission plate. Defaults to 0. + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + chronologic (bool, optional): Return results in chronologic order. Defaults to False. + + Returns: + models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]: Junction(s) of interest + """ + query: Query = cls.metadata.session.query(cls) + match submission: + case BasicSubmission(): + query = query.filter(cls.submission==submission) + case str(): + query = query.join(BasicSubmission).filter(BasicSubmission.rsl_plate_num==submission) + case _: + pass + match sample: + case BasicSample(): + query = query.filter(cls.sample==sample) + case str(): + query = query.join(BasicSample).filter(BasicSample.submitter_id==sample) + case _: + pass + if row > 0: + query = query.filter(cls.row==row) + if column > 0: + query = query.filter(cls.column==column) + logger.debug(f"Query count: {query.count()}") + if chronologic: + query.join(BasicSubmission).order_by(BasicSubmission.submitted_date) + if query.count() == 1: + limit = 1 + return query_return(query=query, limit=limit) + + def save(self): + self.metadata.session.add(self) + self.metadata.session.commit() + return None class WastewaterAssociation(SubmissionSampleAssociation): """ diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index ec8fe05..fa44384 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -110,7 +110,8 @@ class SheetParser(object): """ Enforce that only allowed reagents get into the Pydantic Model """ - kit = lookup_kit_types(ctx=self.ctx, name=self.sub['extraction_kit']['value']) + # kit = lookup_kit_types(ctx=self.ctx, name=self.sub['extraction_kit']['value']) + kit = models.KitType.query(name=self.sub['extraction_kit']['value']) allowed_reagents = [item.name for item in kit.get_reagents()] logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}") # self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents] @@ -151,7 +152,8 @@ class InfoParser(object): if isinstance(submission_type, str): submission_type = dict(value=submission_type, missing=True) logger.debug(f"Looking up submission type: {submission_type['value']}") - submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value']) + # submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value']) + submission_type = models.SubmissionType.query(name=submission_type['value']) info_map = submission_type.info_map # Get the parse_info method from the submission type specified self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info @@ -212,7 +214,8 @@ class ReagentParser(object): def fetch_kit_info_map(self, extraction_kit:dict, submission_type:str): if isinstance(extraction_kit, dict): extraction_kit = extraction_kit['value'] - kit = lookup_kit_types(ctx=self.ctx, name=extraction_kit) + # kit = lookup_kit_types(ctx=self.ctx, name=extraction_kit) + kit = models.KitType.query(name=extraction_kit) if isinstance(submission_type, dict): submission_type = submission_type['value'] reagent_map = kit.construct_xl_map_for_use(submission_type.title()) @@ -289,7 +292,8 @@ class SampleParser(object): dict: Info locations. """ logger.debug(f"Looking up submission type: {submission_type}") - submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type) + # submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type) + submission_type = models.SubmissionType.query(name=submission_type) logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}") sample_info_map = submission_type.info_map['samples'] # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples @@ -458,7 +462,8 @@ class SampleParser(object): logger.error(f"Could not find the model {query}. Using generic.") database_obj = models.BasicSample logger.debug(f"Searching database for {input_dict['submitter_id']}...") - instance = lookup_samples(ctx=self.ctx, submitter_id=str(input_dict['submitter_id'])) + # instance = lookup_samples(ctx=self.ctx, submitter_id=str(input_dict['submitter_id'])) + instance = models.BasicSample.query(submitter_id=str(input_dict['submitter_id'])) if instance == None: logger.debug(f"Couldn't find sample {input_dict['submitter_id']}. Creating new sample.") instance = database_obj() diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py index 0fdd715..a571393 100644 --- a/src/submissions/backend/validators/__init__.py +++ b/src/submissions/backend/validators/__init__.py @@ -22,7 +22,7 @@ class RSLNamer(object): if self.submission_type != None: enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.parsed_name = self.retrieve_rsl_number(instr=instr, regex=enforcer.get_regex()) - self.parsed_name = enforcer.enforce_name(ctx=ctx, instr=self.parsed_name) + self.parsed_name = enforcer.enforce_name(instr=self.parsed_name) @classmethod def retrieve_submission_type(cls, ctx:Settings, instr:str|Path) -> str: diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 7cb134e..2c7cf90 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -2,7 +2,6 @@ Contains pydantic models and accompanying validators ''' import uuid -from PyQt6 import QtCore from pydantic import BaseModel, field_validator, Field from datetime import date, datetime, timedelta from dateutil.parser import parse @@ -12,14 +11,10 @@ from . import RSLNamer from pathlib import Path import re import logging -from tools import check_not_nan, convert_nans_to_nones, Settings -from backend.db.functions import (lookup_submissions, lookup_reagent_types, lookup_reagents, lookup_kit_types, - lookup_organizations, lookup_submission_type, lookup_discounts, lookup_samples, lookup_submission_sample_association, - lookup_reagenttype_kittype_association -) +from tools import check_not_nan, convert_nans_to_nones, Settings, jinja_template_loading from backend.db.models import * from sqlalchemy.exc import InvalidRequestError, StatementError -from PyQt6.QtWidgets import QComboBox, QWidget, QLabel, QVBoxLayout +from PyQt6.QtWidgets import QComboBox, QWidget from pprint import pformat from openpyxl import load_workbook @@ -47,7 +42,8 @@ class PydReagent(BaseModel): def rescue_type_with_lookup(cls, value, values): if value == None and values.data['lot'] != None: try: - return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name + # return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name + return Reagent.query(lot_number=values.data['lot'].name) except AttributeError: return value return value @@ -94,7 +90,8 @@ class PydReagent(BaseModel): def toSQL(self) -> Tuple[Reagent, dict]: result = None logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}") - reagent = lookup_reagents(ctx=self.ctx, lot_number=self.lot) + # reagent = lookup_reagents(ctx=self.ctx, lot_number=self.lot) + reagent = Reagent.query(lot_number=self.lot) logger.debug(f"Result: {reagent}") if reagent == None: reagent = Reagent() @@ -109,7 +106,8 @@ class PydReagent(BaseModel): case "expiry": reagent.expiry = value case "type": - reagent_type = lookup_reagent_types(ctx=self.ctx, name=value) + # reagent_type = lookup_reagent_types(ctx=self.ctx, name=value) + reagent_type = ReagentType.query(name=value) if reagent_type != None: reagent.type.append(reagent_type) case "name": @@ -145,7 +143,8 @@ class PydSample(BaseModel, extra='allow'): result = None self.__dict__.update(self.model_extra) logger.debug(f"Here is the incoming sample dict: \n{self.__dict__}") - instance = lookup_samples(ctx=ctx, submitter_id=self.submitter_id) + # instance = lookup_samples(ctx=ctx, submitter_id=self.submitter_id) + instance = BasicSample.query(submitter_id=self.submitter_id) if instance == None: logger.debug(f"Sample {self.submitter_id} doesn't exist yet. Looking up sample object with polymorphic identity: {self.sample_type}") instance = BasicSample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)() @@ -158,7 +157,8 @@ class PydSample(BaseModel, extra='allow'): instance.set_attribute(name=key, value=value) for row, column in zip(self.row, self.column): logger.debug(f"Looking up association with identity: ({submission.submission_type_name} Association)") - association = lookup_submission_sample_association(ctx=ctx, submission=submission, row=row, column=column) + # association = lookup_submission_sample_association(ctx=ctx, submission=submission, row=row, column=column) + association = SubmissionSampleAssociation.query(submission=submission, row=row, column=column) logger.debug(f"Returned association: {association}") if association == None or association == []: logger.debug(f"Looked up association at row {row}, column {column} didn't exist, creating new association.") @@ -239,7 +239,8 @@ class PydSubmission(BaseModel, extra='allow'): logger.debug(f"RSL-plate initial value: {value['value']}") sub_type = values.data['submission_type']['value'] if check_not_nan(value['value']): - if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None: + # if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None: + if BasicSubmission.query(rsl_number=value['value']) == None: return dict(value=value['value'], missing=False) else: logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath") @@ -325,9 +326,12 @@ class PydSubmission(BaseModel, extra='allow'): output.append(dummy) self.samples = output - def improved_dict(self): + def improved_dict(self, dictionaries:bool=True): fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) - output = {k:getattr(self, k) for k in fields} + if dictionaries: + output = {k:getattr(self, k) for k in fields} + else: + output = {k:(getattr(self, k) if not isinstance(getattr(self, k), dict) else getattr(self, k)['value']) for k in fields} return output def find_missing(self): @@ -341,7 +345,8 @@ class PydSubmission(BaseModel, extra='allow'): msg = None status = None self.__dict__.update(self.model_extra) - instance = lookup_submissions(ctx=self.ctx, rsl_number=self.rsl_plate_num['value']) + # instance = lookup_submissions(ctx=self.ctx, rsl_number=self.rsl_plate_num['value']) + instance = BasicSubmission.query(rsl_number=self.rsl_plate_num['value']) if instance == None: instance = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)() else: @@ -357,11 +362,13 @@ class PydSubmission(BaseModel, extra='allow'): match key: case "extraction_kit": logger.debug(f"Looking up kit {value}") - field_value = lookup_kit_types(ctx=self.ctx, name=value) + # field_value = lookup_kit_types(ctx=self.ctx, name=value) + field_value = KitType.query(name=value) logger.debug(f"Got {field_value} for kit {value}") case "submitting_lab": logger.debug(f"Looking up organization: {value}") - field_value = lookup_organizations(ctx=self.ctx, name=value) + # field_value = lookup_organizations(ctx=self.ctx, name=value) + field_value = Organization.query(name=value) logger.debug(f"Got {field_value} for organization {value}") case "submitter_plate_num": logger.debug(f"Submitter plate id: {value}") @@ -376,7 +383,8 @@ class PydSubmission(BaseModel, extra='allow'): case "reagents": field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for reagent in value] case "submission_type": - field_value = lookup_submission_type(ctx=self.ctx, name=value) + # field_value = lookup_submission_type(ctx=self.ctx, name=value) + field_value = SubmissionType.query(name=value) case "sample_count": if value == None: field_value = len(self.samples) @@ -404,7 +412,8 @@ class PydSubmission(BaseModel, extra='allow'): # Apply any discounts that are applicable for client and kit. try: logger.debug("Checking and applying discounts...") - discounts = [item.amount for item in lookup_discounts(ctx=self.ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)] + # discounts = [item.amount for item in lookup_discounts(ctx=self.ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)] + discounts = [item.amount for item in Discount.query(kit_type=instance.extraction_kit, organization=instance.submitting_lab)] logger.debug(f"We got discounts: {discounts}") if len(discounts) > 0: discounts = sum(discounts) @@ -433,7 +442,8 @@ class PydSubmission(BaseModel, extra='allow'): if len(reagents + list(info.keys())) == 0: return None logger.debug(f"We have blank info and/or reagents in the excel sheet.\n\tLet's try to fill them in.") - extraction_kit = lookup_kit_types(ctx=self.ctx, name=self.extraction_kit['value']) + # extraction_kit = lookup_kit_types(ctx=self.ctx, name=self.extraction_kit['value']) + extraction_kit = KitType.query(name=self.extraction_kit['value']) logger.debug(f"We have the extraction kit: {extraction_kit.name}") excel_map = extraction_kit.construct_xl_map_for_use(self.submission_type['value']) logger.debug(f"Extraction kit map:\n\n{pformat(excel_map)}") @@ -498,6 +508,13 @@ class PydSubmission(BaseModel, extra='allow'): workbook = custom_parser.custom_autofill(workbook) return workbook + def construct_filename(self): + env = jinja_template_loading() + template = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type).filename_template() + logger.debug(f"Using template string: {template}") + template = env.from_string(template) + return template.render(**self.improved_dict(dictionaries=False)) + class PydContact(BaseModel): name: str @@ -539,12 +556,14 @@ class PydReagentType(BaseModel): return value def toSQL(self, ctx:Settings, kit:KitType): - instance: ReagentType = lookup_reagent_types(ctx=ctx, name=self.name) + # instance: ReagentType = lookup_reagent_types(ctx=ctx, name=self.name) + instance: ReagentType = ReagentType.query(name=self.name) if instance == None: instance = ReagentType(name=self.name, eol_ext=self.eol_ext) logger.debug(f"This is the reagent type instance: {instance.__dict__}") try: - assoc = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=instance, kit_type=kit) + # assoc = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=instance, kit_type=kit) + assoc = KitTypeReagentTypeAssociation.query(reagent_type=instance, kit_type=kit) except StatementError: assoc = None if assoc == None: @@ -559,7 +578,8 @@ class PydKit(BaseModel): def toSQL(self, ctx): result = dict(message=None, status='Information') - instance = lookup_kit_types(ctx=ctx, name=self.name) + # instance = lookup_kit_types(ctx=ctx, name=self.name) + instance = KitType.query(name=self.name) if instance == None: instance = KitType(name=self.name) # instance.reagent_types = [item.toSQL(ctx, instance) for item in self.reagent_types] diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py index c719b8d..1687932 100644 --- a/src/submissions/frontend/__init__.py +++ b/src/submissions/frontend/__init__.py @@ -1,7 +1,6 @@ ''' Constructs main application. ''' -from pprint import pformat import sys from PyQt6.QtWidgets import ( QMainWindow, QToolBar, @@ -16,9 +15,10 @@ from pathlib import Path from backend.db.functions import ( lookup_control_types, lookup_modes ) +from backend.db.models import ControlType, Control from backend.validators import PydSubmission, PydReagent from tools import check_if_app, Settings -from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ImportReagent, ReagentFormWidget +from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ReagentFormWidget import logging from datetime import date import webbrowser @@ -228,7 +228,7 @@ class App(QMainWindow): # send reagent to db # store_reagent(ctx=self.ctx, reagent=reagent) sqlobj, result = reagent.toSQL() - sqlobj.save(ctx=self.ctx) + sqlobj.save() # result = store_object(ctx=self.ctx, object=reagent.toSQL()[0]) self.result_reporter(result=result) return reagent @@ -369,12 +369,14 @@ class AddSubForm(QWidget): self.control_typer = QComboBox() # fetch types of controls # con_types = get_all_Control_Types_names(ctx=parent.ctx) - con_types = [item.name for item in lookup_control_types(ctx=parent.ctx)] + # con_types = [item.name for item in lookup_control_types(ctx=parent.ctx)] + con_types = [item.name for item in ControlType.query()] self.control_typer.addItems(con_types) # create custom widget to get types of analysis self.mode_typer = QComboBox() # mode_types = get_all_available_modes(ctx=parent.ctx) - mode_types = lookup_modes(ctx=parent.ctx) + # mode_types = lookup_modes(ctx=parent.ctx) + mode_types = Control.get_modes() self.mode_typer.addItems(mode_types) # create custom widget to get subtypes of analysis self.sub_typer = QComboBox() diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index 41ce586..091879d 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -13,10 +13,9 @@ from PyQt6.QtWidgets import ( ) from PyQt6.QtCore import Qt, QDate, QSize, pyqtSignal from tools import check_not_nan, jinja_template_loading, Settings -from backend.db.functions import \ - lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \ - lookup_submissions, lookup_organizations, lookup_kit_types -from backend.db.models import SubmissionTypeKitTypeAssociation +from backend.db.functions import (lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \ + lookup_submissions, lookup_organizations, lookup_kit_types) +from backend.db.models import * from sqlalchemy import FLOAT, INTEGER import logging import numpy as np @@ -26,6 +25,7 @@ from typing import Tuple, List from pprint import pformat import difflib + logger = logging.getLogger(f"submissions.{__name__}") env = jinja_template_loading() @@ -67,7 +67,8 @@ class AddReagentForm(QDialog): # widget to get reagent type info self.type_input = QComboBox() self.type_input.setObjectName('type') - self.type_input.addItems([item.name for item in lookup_reagent_types(ctx=ctx)]) + # self.type_input.addItems([item.name for item in lookup_reagent_types(ctx=ctx)]) + self.type_input.addItems([item.name for item in ReagentType.query()]) logger.debug(f"Trying to find index of {reagent_type}") # convert input to user friendly string? try: @@ -103,7 +104,8 @@ class AddReagentForm(QDialog): """ logger.debug(self.type_input.currentText()) self.name_input.clear() - lookup = lookup_reagents(ctx=self.ctx, reagent_type=self.type_input.currentText()) + # lookup = lookup_reagents(ctx=self.ctx, reagent_type=self.type_input.currentText()) + lookup = Reagent.query(reagent_type=self.type_input.currentText()) self.name_input.addItems(list(set([item.name for item in lookup]))) class ReportDatePicker(QDialog): @@ -165,7 +167,8 @@ class KitAdder(QWidget): used_for = QComboBox() used_for.setObjectName("used_for") # Insert all existing sample types - used_for.addItems([item.name for item in lookup_submission_type(ctx=parent_ctx)]) + # used_for.addItems([item.name for item in lookup_submission_type(ctx=parent_ctx)]) + used_for.addItems([item.name for item in SubmissionType.query()]) used_for.setEditable(True) self.grid.addWidget(used_for,3,1) # Get all fields in SubmissionTypeKitTypeAssociation @@ -188,28 +191,6 @@ class KitAdder(QWidget): add_widget = QLineEdit() add_widget.setObjectName(column.name) self.grid.addWidget(add_widget, idx,1) - # self.grid.addWidget(QLabel("Constant cost per full plate (plates, work hours, etc.):"),4,0) - # # widget to get constant cost - # const_cost = QDoubleSpinBox() #QSpinBox() - # const_cost.setObjectName("const_cost") - # const_cost.setMinimum(0) - # const_cost.setMaximum(9999) - # self.grid.addWidget(const_cost,4,1) - # self.grid.addWidget(QLabel("Cost per column (multidrop reagents, etc.):"),5,0) - # # widget to get mutable costs per column - # mut_cost_col = QDoubleSpinBox() #QSpinBox() - # mut_cost_col.setObjectName("mut_cost_col") - # mut_cost_col.setMinimum(0) - # mut_cost_col.setMaximum(9999) - # self.grid.addWidget(mut_cost_col,5,1) - # self.grid.addWidget(QLabel("Cost per sample (tips, reagents, etc.):"),6,0) - # # widget to get mutable costs per column - # mut_cost_samp = QDoubleSpinBox() #QSpinBox() - # mut_cost_samp.setObjectName("mut_cost_samp") - # mut_cost_samp.setMinimum(0) - # mut_cost_samp.setMaximum(9999) - # self.grid.addWidget(mut_cost_samp,6,1) - # button to add additional reagent types self.add_RT_btn = QPushButton("Add Reagent Type") self.grid.addWidget(self.add_RT_btn) self.add_RT_btn.clicked.connect(self.add_RT) @@ -259,7 +240,7 @@ class KitAdder(QWidget): logger.debug(f"Output pyd object: {kit.__dict__}") # result = construct_kit_from_yaml(ctx=self.ctx, kit_dict=info) sqlobj, result = kit.toSQL(self.ctx) - sqlobj.save(ctx=self.ctx) + sqlobj.save() msg = AlertPop(message=result['message'], status=result['status']) msg.exec() self.__init__(self.ctx) @@ -295,7 +276,8 @@ class ReagentTypeForm(QWidget): self.reagent_getter = QComboBox() self.reagent_getter.setObjectName("rtname") # lookup all reagent type names from db - lookup = lookup_reagent_types(ctx=ctx) + # lookup = lookup_reagent_types(ctx=ctx) + lookup = ReagentType.query() logger.debug(f"Looked up ReagentType names: {lookup}") self.reagent_getter.addItems([item.__str__() for item in lookup]) self.reagent_getter.setEditable(True) @@ -387,66 +369,6 @@ class ControlsDatePicker(QWidget): def sizeHint(self) -> QSize: return QSize(80,20) -class ImportReagent(QComboBox): - - """ - NOTE: Depreciated in favour of ReagentFormWidget - """ - - def __init__(self, ctx:Settings, reagent:dict|PydReagent, extraction_kit:str): - super().__init__() - self.setEditable(True) - if isinstance(reagent, dict): - reagent = PydReagent(ctx=ctx, **reagent) - # Ensure that all reagenttypes have a name that matches the items in the excel parser - query_var = reagent.type - logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}") - if isinstance(reagent.lot, np.float64): - logger.debug(f"{reagent.lot} is a numpy float!") - try: - reagent.lot = int(reagent.lot) - except ValueError: - pass - # query for reagents using type name from sheet and kit from sheet - logger.debug(f"Attempting lookup of reagents by type: {query_var}") - # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. - lookup = lookup_reagents(ctx=ctx, reagent_type=query_var) - relevant_reagents = [item.__str__() for item in lookup] - output_reg = [] - for rel_reagent in relevant_reagents: - # extract strings from any sets. - if isinstance(rel_reagent, set): - for thing in rel_reagent: - output_reg.append(thing) - elif isinstance(rel_reagent, str): - output_reg.append(rel_reagent) - relevant_reagents = output_reg - # if reagent in sheet is not found insert it into the front of relevant reagents so it shows - logger.debug(f"Relevant reagents for {reagent.lot}: {relevant_reagents}") - if str(reagent.lot) not in relevant_reagents: - if check_not_nan(reagent.lot): - relevant_reagents.insert(0, str(reagent.lot)) - else: - # TODO: look up the last used reagent of this type in the database - looked_up_rt = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=reagent.type, kit_type=extraction_kit) - looked_up_reg = lookup_reagents(ctx=ctx, lot_number=looked_up_rt.last_used) - logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}") - if looked_up_reg != None: - relevant_reagents.remove(str(looked_up_reg.lot)) - relevant_reagents.insert(0, str(looked_up_reg.lot)) - else: - if len(relevant_reagents) > 1: - logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. Moving to front of list.") - idx = relevant_reagents.index(str(reagent.lot)) - logger.debug(f"The index we got for {reagent.lot} in {relevant_reagents} was {idx}") - moved_reag = relevant_reagents.pop(idx) - relevant_reagents.insert(0, moved_reag) - else: - logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. But no need to move due to short list.") - logger.debug(f"New relevant reagents: {relevant_reagents}") - self.setObjectName(f"lot_{reagent.type}") - self.addItems(relevant_reagents) - class FirstStrandSalvage(QDialog): def __init__(self, ctx:Settings, submitter_id:str, rsl_plate_num:str|None=None) -> None: @@ -492,7 +414,8 @@ class FirstStrandPlateList(QDialog): self.buttonBox = QDialogButtonBox(QBtn) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) - ww = [item.rsl_plate_num for item in lookup_submissions(ctx=ctx, submission_type="Wastewater")] + # ww = [item.rsl_plate_num for item in lookup_submissions(ctx=ctx, submission_type="Wastewater")] + ww = [item.rsl_plate_num for item in BasicSubmission.query(submission_type="Wastewater")] self.plate1 = QComboBox() self.plate2 = QComboBox() self.plate3 = QComboBox() @@ -532,7 +455,8 @@ class ReagentFormWidget(QWidget): def parse_form(self) -> Tuple[PydReagent, dict]: lot = self.lot.currentText() - wanted_reagent = lookup_reagents(ctx=self.ctx, lot_number=lot, reagent_type=self.reagent.type) + # wanted_reagent = lookup_reagents(ctx=self.ctx, lot_number=lot, reagent_type=self.reagent.type) + wanted_reagent = Reagent.query(lot_number=lot, reagent_type=self.reagent.type) # if reagent doesn't exist in database, off to add it (uses App.add_reagent) if wanted_reagent == None: dlg = QuestionAsker(title=f"Add {lot}?", message=f"Couldn't find reagent type {self.reagent.type}: {lot} in the database.\n\nWould you like to add it?") @@ -546,10 +470,12 @@ class ReagentFormWidget(QWidget): else: # Since this now gets passed in directly from the parser -> pyd -> form and the parser gets the name # from the db, it should no longer be necessary to query the db with reagent/kit, but with rt name directly. - rt = lookup_reagent_types(ctx=self.ctx, name=self.reagent.type) + # rt = lookup_reagent_types(ctx=self.ctx, name=self.reagent.type) # rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent) + rt = ReagentType.query(name=self.reagent.type) if rt == None: - rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent) + # rt = lookup_reagent_types(ctx=self.ctx, kit_type=self.extraction_kit, reagent=wanted_reagent) + rt = ReagentType.query(kit_type=self.extraction_kit, reagent=wanted_reagent) return PydReagent(ctx=self.ctx, name=wanted_reagent.name, lot=wanted_reagent.lot, type=rt.name, expiry=wanted_reagent.expiry, parsed=not self.missing), None def updated(self): @@ -584,7 +510,8 @@ class ReagentFormWidget(QWidget): # pass logger.debug(f"Attempting lookup of reagents by type: {reagent.type}") # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. - lookup = lookup_reagents(ctx=self.ctx, reagent_type=reagent.type) + # lookup = lookup_reagents(ctx=self.ctx, reagent_type=reagent.type) + lookup = Reagent.query(reagent_type=reagent.type) relevant_reagents = [item.__str__() for item in lookup] output_reg = [] for rel_reagent in relevant_reagents: @@ -602,9 +529,11 @@ class ReagentFormWidget(QWidget): relevant_reagents.insert(0, str(reagent.lot)) else: # TODO: look up the last used reagent of this type in the database - looked_up_rt = lookup_reagenttype_kittype_association(ctx=self.ctx, reagent_type=reagent.type, kit_type=extraction_kit) + # looked_up_rt = lookup_reagenttype_kittype_association(ctx=self.ctx, reagent_type=reagent.type, kit_type=extraction_kit) + looked_up_rt = KitTypeReagentTypeAssociation.query(reagent_type=reagent.type, kit_type=extraction_kit) try: - looked_up_reg = lookup_reagents(ctx=self.ctx, lot_number=looked_up_rt.last_used) + # looked_up_reg = lookup_reagents(ctx=self.ctx, lot_number=looked_up_rt.last_used) + looked_up_reg = Reagent.query(lot_number=looked_up_rt.last_used) except AttributeError: looked_up_reg = None logger.debug(f"Because there was no reagent listed for {reagent.lot}, we will insert the last lot used: {looked_up_reg}") @@ -743,7 +672,8 @@ class SubmissionFormWidget(QWidget): case 'submitting_lab': add_widget = QComboBox() # lookup organizations suitable for submitting_lab (ctx: self.InfoItem.SubmissionFormWidget.SubmissionFormContainer.AddSubForm ) - labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)] + # labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)] + labs = [item.__str__() for item in Organization.query()] # try to set closest match to top of list try: labs = difflib.get_close_matches(value, labs, len(labs), 0) @@ -760,7 +690,8 @@ class SubmissionFormWidget(QWidget): add_widget = QComboBox() # lookup existing kits by 'submission_type' decided on by sheetparser logger.debug(f"Looking up kits used for {submission_type}") - uses = [item.__str__() for item in lookup_kit_types(ctx=obj.ctx, used_for=submission_type)] + # uses = [item.__str__() for item in lookup_kit_types(ctx=obj.ctx, used_for=submission_type)] + uses = [item.__str__() for item in KitType.query(used_for=submission_type)] obj.uses = uses logger.debug(f"Kits received for {submission_type}: {uses}") if check_not_nan(value): @@ -786,7 +717,8 @@ class SubmissionFormWidget(QWidget): case 'submission_category': add_widget = QComboBox() cats = ['Diagnostic', "Surveillance", "Research"] - cats += [item.name for item in lookup_submission_type(ctx=obj.ctx)] + # cats += [item.name for item in lookup_submission_type(ctx=obj.ctx)] + cats += [item.name for item in SubmissionType.query()] try: cats.insert(0, cats.pop(cats.index(value))) except ValueError: diff --git a/src/submissions/frontend/custom_widgets/pop_ups.py b/src/submissions/frontend/custom_widgets/pop_ups.py index 43492e1..7ad763c 100644 --- a/src/submissions/frontend/custom_widgets/pop_ups.py +++ b/src/submissions/frontend/custom_widgets/pop_ups.py @@ -8,6 +8,7 @@ from PyQt6.QtWidgets import ( from tools import jinja_template_loading import logging from backend.db.functions import lookup_kit_types, lookup_submission_type +from backend.db.models import KitType, SubmissionType from typing import Literal logger = logging.getLogger(f"submissions.{__name__}") @@ -53,7 +54,8 @@ class KitSelector(QDialog): super().__init__() self.setWindowTitle(title) self.widget = QComboBox() - kits = [item.__str__() for item in lookup_kit_types(ctx=ctx)] + # kits = [item.__str__() for item in lookup_kit_types(ctx=ctx)] + kits = [item.__str__() for item in KitType.query()] self.widget.addItems(kits) self.widget.setEditable(False) # set yes/no buttons @@ -80,7 +82,8 @@ class SubmissionTypeSelector(QDialog): super().__init__() self.setWindowTitle(title) self.widget = QComboBox() - sub_type = [item.name for item in lookup_submission_type(ctx=ctx)] + # sub_type = [item.name for item in lookup_submission_type(ctx=ctx)] + sub_type = [item.name for item in SubmissionType.query()] self.widget.addItems(sub_type) self.widget.setEditable(False) # set yes/no buttons diff --git a/src/submissions/frontend/custom_widgets/sub_details.py b/src/submissions/frontend/custom_widgets/sub_details.py index 32332f1..eea8990 100644 --- a/src/submissions/frontend/custom_widgets/sub_details.py +++ b/src/submissions/frontend/custom_widgets/sub_details.py @@ -15,7 +15,8 @@ from PyQt6.QtWidgets import ( from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter -from backend.db.functions import submissions_to_df, delete_submission, lookup_submissions +from backend.db.functions import submissions_to_df +from backend.db.models import BasicSubmission from backend.excel import make_hitpicks from tools import check_if_app, Settings from tools import jinja_template_loading @@ -98,7 +99,7 @@ class SubmissionsSheet(QTableView): """ sets data in model """ - self.data = submissions_to_df(ctx=self.ctx) + self.data = submissions_to_df() try: self.data['id'] = self.data['id'].apply(str) self.data['id'] = self.data['id'].str.zfill(3) @@ -180,7 +181,8 @@ class SubmissionsSheet(QTableView): logger.debug(index) msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {index.sibling(index.row(),1).data()}?\n") if msg.exec(): - delete_submission(ctx=self.ctx, id=value) + # delete_submission(id=value) + BasicSubmission.query(id=value).delete() else: return self.setData() @@ -199,7 +201,8 @@ class SubmissionsSheet(QTableView): logger.error(f"Error: Had to truncate number of plates to 4.") indices = indices[:4] # lookup ids in the database - subs = [lookup_submissions(ctx=self.ctx, id=id) for id in indices] + # subs = [lookup_submissions(ctx=self.ctx, id=id) for id in indices] + subs = [BasicSubmission.query(id=id) for id in indices] # full list of samples dicto = [] # list to contain plate images @@ -256,7 +259,8 @@ class SubmissionDetails(QDialog): interior = QScrollArea() interior.setParent(self) # get submision from db - sub = lookup_submissions(ctx=ctx, id=id) + # sub = lookup_submissions(ctx=ctx, id=id) + sub = BasicSubmission.query(id=id) logger.debug(f"Submission details data:\n{pprint.pformat(sub.to_dict())}") self.base_dict = sub.to_dict(full_data=True) # don't want id @@ -427,7 +431,8 @@ class SubmissionComment(QDialog): full_comment = {"name":commenter, "time": dt, "text": comment} logger.debug(f"Full comment: {full_comment}") # sub = lookup_submission_by_rsl_num(ctx = self.ctx, rsl_num=self.rsl) - sub = lookup_submissions(ctx = self.ctx, rsl_number=self.rsl) + # sub = lookup_submissions(ctx = self.ctx, rsl_number=self.rsl) + sub = BasicSubmission.query(rsl_number=self.rsl) try: sub.comment.append(full_comment) except AttributeError: diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index 4029946..741fba1 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -281,7 +281,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: obj.pyd: PydSubmission = obj.form.parse_form() logger.debug(f"Submission: {pprint.pformat(obj.pyd)}") logger.debug("Checking kit integrity...") - kit_integrity = check_kit_integrity(ctx=obj.ctx, sub=obj.pyd) + kit_integrity = check_kit_integrity(sub=obj.pyd) if kit_integrity != None: return obj, dict(message=kit_integrity['message'], status="critical") base_submission, result = obj.pyd.toSQL() @@ -307,11 +307,11 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: pass # add reagents to submission object for reagent in base_submission.reagents: - update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit) + update_last_used(reagent=reagent, kit=base_submission.extraction_kit) logger.debug(f"Here is the final submission: {pprint.pformat(base_submission.__dict__)}") logger.debug(f"Parsed reagents: {pprint.pformat(base_submission.reagents)}") logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") - base_submission.save(ctx=obj.ctx) + base_submission.save() # update summary sheet obj.table_widget.sub_wid.setData() # reset form @@ -319,12 +319,12 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"All attributes of obj: {pprint.pformat(obj.__dict__)}") wkb = obj.pyd.autofill_excel() if wkb != None: - fname = select_save_file(obj=obj, default_name=obj.pyd.rsl_plate_num['value'], extension="xlsx") + fname = select_save_file(obj=obj, default_name=obj.pyd.construct_filename(), extension="xlsx") wkb.save(filename=fname.__str__()) if hasattr(obj.pyd, 'csv'): dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") if dlg.exec(): - fname = select_save_file(obj, f"{base_submission.rsl_plate_num}.csv", extension="csv") + fname = select_save_file(obj, f"{obj.pyd.rsl_plate_num['value']}.csv", extension="csv") try: obj.csv.to_csv(fname.__str__(), index=False) except PermissionError: @@ -348,7 +348,8 @@ def generate_report_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: info = dlg.parse_form() logger.debug(f"Report info: {info}") # find submissions based on date range - subs = lookup_submissions(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date']) + # subs = lookup_submissions(ctx=obj.ctx, start_date=info['start_date'], end_date=info['end_date']) + subs = BasicSubmission.query(start_date=info['start_date'], end_date=info['end_date']) # convert each object to dict records = [item.report_dict() for item in subs] # make dataframe from record dictionaries @@ -468,7 +469,7 @@ def controls_getter_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: obj.mode = obj.table_widget.mode_typer.currentText() obj.table_widget.sub_typer.clear() # lookup subtypes - sub_types = get_control_subtypes(ctx=obj.ctx, type=obj.con_type, mode=obj.mode) + sub_types = get_control_subtypes(type=obj.con_type, mode=obj.mode) # sub_types = lookup_controls(ctx=obj.ctx, control_type=obj.con_type) if sub_types != []: # block signal that will rerun controls getter and update sub_typer @@ -502,7 +503,8 @@ def chart_maker_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"Subtype: {obj.subtype}") # query all controls using the type/start and end dates from the gui # controls = get_all_controls_by_type(ctx=obj.ctx, con_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) - controls = lookup_controls(ctx=obj.ctx, control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) + # controls = lookup_controls(ctx=obj.ctx, control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) + controls = Control.query(control_type=obj.con_type, start_date=obj.start_date, end_date=obj.end_date) # if no data found from query set fig to none for reporting in webview if controls == None: fig = None @@ -544,10 +546,12 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: """ result = None # all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture") - all_bcs = lookup_submissions(ctx=obj.ctx, submission_type="Bacterial Culture") + # all_bcs = lookup_submissions(ctx=obj.ctx, submission_type="Bacterial Culture") + all_bcs = BasicSubmission.query(submission_type="Bacterial Culture") logger.debug(all_bcs) # all_controls = get_all_controls(obj.ctx) - all_controls = lookup_controls(ctx=obj.ctx) + # all_controls = lookup_controls(ctx=obj.ctx) + all_controls = Control.query() ac_list = [control.name for control in all_controls] count = 0 for bcs in all_bcs: @@ -615,7 +619,8 @@ def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: new_run[f"column{str(ii-5)}_vol"] = run[ii] # Lookup imported submissions # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) - sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) + # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) + sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) # If no such submission exists, move onto the next run if sub == None: continue @@ -680,7 +685,8 @@ def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: ) # lookup imported submission # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) - sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) + # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) + sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) # if imported submission doesn't exist move on to next run if sub == None: continue @@ -734,7 +740,7 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: parser = PCRParser(ctx=obj.ctx, filepath=fname) logger.debug(f"Attempting lookup for {parser.plate_num}") # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) - sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num) + sub = BasicSubmission.query(rsl_number=parser.plate_num) try: logger.debug(f"Found submission: {sub.rsl_plate_num}") except AttributeError: @@ -742,7 +748,8 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.") parser.plate_num = "-".join(parser.plate_num.split("-")[:-1]) # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num) - sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num) + # sub = lookup_submissions(ctx=obj.ctx, rsl_number=parser.plate_num) + sub = BasicSubmission.query(rsl_number=parser.plate_num) try: logger.debug(f"Found submission: {sub.rsl_plate_num}") except AttributeError: @@ -779,7 +786,7 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: sample_dict = [item for item in parser.samples if item['sample']==sample.rsl_number][0] except IndexError: continue - update_subsampassoc_with_pcr(ctx=obj.ctx, submission=sub, sample=sample, input_dict=sample_dict) + update_subsampassoc_with_pcr(submission=sub, sample=sample, input_dict=sample_dict) result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information') return obj, result @@ -877,8 +884,6 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") workbook.save(filename=fname.__str__()) - - def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: """ Generates a csv file from client submitted xlsx file. @@ -891,13 +896,16 @@ def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict] """ def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]: logger.debug(f"Looking up {input_sample_number} in {plates}") - samp = lookup_samples(ctx=obj.ctx, ww_processing_num=input_sample_number) + # samp = lookup_samples(ctx=obj.ctx, ww_processing_num=input_sample_number) + samp = BasicSample.query(ww_processing_num=input_sample_number) if samp == None: - samp = lookup_samples(ctx=obj.ctx, submitter_id=input_sample_number) + # samp = lookup_samples(ctx=obj.ctx, submitter_id=input_sample_number) + samp = BasicSample.query(submitter_id=input_sample_number) if samp == None: return None, None logger.debug(f"Got sample: {samp}") - new_plates = [(iii+1, lookup_submission_sample_association(ctx=obj.ctx, sample=samp, submission=plate)) for iii, plate in enumerate(plates)] + # new_plates = [(iii+1, lookup_submission_sample_association(ctx=obj.ctx, sample=samp, submission=plate)) for iii, plate in enumerate(plates)] + new_plates = [(iii+1, SubmissionSampleAssociation.query(sample=samp, submission=plate)) for iii, plate in enumerate(plates)] logger.debug(f"Associations: {pprint.pformat(new_plates)}") try: plate_num, plate = next(assoc for assoc in new_plates if assoc[1]) diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 874049e..09760b8 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -12,7 +12,7 @@ import sys, os, stat, platform, getpass import logging from logging import handlers from pathlib import Path -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, declarative_base, DeclarativeMeta, Query from sqlalchemy import create_engine from pydantic import field_validator from pydantic_settings import BaseSettings, SettingsConfigDict @@ -37,6 +37,9 @@ LOGDIR = main_aux_dir.joinpath("logs") row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"} +Base: DeclarativeMeta = declarative_base() +metadata = Base.metadata + def check_not_nan(cell_contents) -> bool: """ Check to ensure excel sheet cell contents are not blank. @@ -160,12 +163,21 @@ class Settings(BaseSettings): model_config = SettingsConfigDict(env_file_encoding='utf-8') + @field_validator('backup_path') + @classmethod + def set_backup_path(cls, value): + if isinstance(value, str): + value = Path(value) + metadata.backup_path = value + return value + @field_validator('directory_path', mode="before") @classmethod def ensure_directory_exists(cls, value): if isinstance(value, str): value = Path(value) if value.exists(): + metadata.directory_path = value return value else: raise FileNotFoundError(f"Couldn't find settings file {value}") @@ -210,6 +222,8 @@ class Settings(BaseSettings): logger.debug(f"Using {database_path} for database file.") engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True) session = Session(engine) + metadata.session = session + return session @field_validator('package', mode="before") @@ -287,43 +301,6 @@ def get_config(settings_path: Path|str|None=None) -> Settings: settings = yaml.load(stream, Loader=yaml.Loader) return Settings(**settings) -def create_database_session(database_path: Path|str|None=None) -> Session: - """ - Creates a session to sqlite3 database from path or default database if database_path is blank. - DEPRECIATED: THIS IS NOW HANDLED BY THE PYDANTIC SETTINGS OBJECT. - - Args: - database_path (Path | str | None, optional): path to sqlite database. Defaults to None. - - Returns: - Session: database session - """ - # convert string to path object - if isinstance(database_path, str): - database_path = Path(database_path) - # check if database path defined by user - if database_path == None: - # check in user's .submissions directory for submissions.db - if Path.home().joinpath(".submissions", "submissions.db").exists(): - database_path = Path.home().joinpath(".submissions", "submissions.db") - # finally, look in the local dir - else: - database_path = package_dir.joinpath("submissions.db") - else: - # check if user defined path is directory - if database_path.is_dir(): - database_path = database_path.joinpath("submissions.db") - # check if user defined path is a file - elif database_path.is_file(): - database_path = database_path - else: - logger.error("No database file found. Exiting program.") - sys.exit() - logger.debug(f"Using {database_path} for database file.") - engine = create_engine(f"sqlite:///{database_path}") - session = Session(engine) - return session - def setup_logger(verbosity:int=3): """ Set logger levels using settings. @@ -454,3 +431,33 @@ def convert_well_to_row_column(input_str:str) -> Tuple[int, int]: except IndexError: return None, None return row, column + +def query_return(query:Query, limit:int=0): + """ + Execute sqlalchemy query. + + Args: + query (Query): Query object + limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. + + Returns: + _type_: Query result. + """ + with query.session.no_autoflush: + match limit: + case 0: + return query.all() + case 1: + return query.first() + case _: + return query.limit(limit).all() + +def setup_lookup(func): + def wrapper(*args, **kwargs): + for k, v in locals().items(): + if k == "kwargs": + continue + if isinstance(v, dict): + raise ValueError("Cannot use dictionary in query. Make sure you parse it first.") + return func(*args, **kwargs) + return wrapper \ No newline at end of file