diff --git a/CHANGELOG.md b/CHANGELOG.md index 66b152b..c31fbda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## 202307.04 +- Large scale refactor to clean up code. +- Settings now in the form of a pydantic object. - Individual plate details now in html format. ## 202307.03 diff --git a/TODO.md b/TODO.md index f79239d..935195f 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,11 @@ -- [ ] Check robotics' use of Artic submission forms (i.e. will we be able to make our own forms?) +- [ ] Fix tests. +- [ ] Reorganize wastewater artic parser. - [ ] Streamline addition of new kits by moving as much into DB as possible. +- [X] Large scale refactor (2023-07-24). - [x] Make plate details from html, same as export. - [x] Put in SN controls I guess. - [x] Code clean-up and refactor (2023-07). -- [ ] Migrate context settings to pydantic-settings model. +- [X] Migrate context settings to pydantic-settings model. - [x] Insert column into reagent type to indicate if reagent is required for kit. - Needed to keep interchangeable bead plates from being forced into forms. - [x] Migrate the parser.sub dictionary to pydantic models. diff --git a/src/submissions/__main__.py b/src/submissions/__main__.py index 7ece35c..8c5dea5 100644 --- a/src/submissions/__main__.py +++ b/src/submissions/__main__.py @@ -1,27 +1,17 @@ import sys -from pathlib import Path import os # environment variable must be set to enable qtwebengine in network path if getattr(sys, 'frozen', False): os.environ['QTWEBENGINE_DISABLE_SANDBOX'] = "1" -else : - pass -from configure import get_config, create_database_session, setup_logger +from tools import get_config, setup_logger # setup custom logger logger = setup_logger(verbosity=3) -# import config +# create settings object ctx = get_config(None) from PyQt6.QtWidgets import QApplication from frontend import App -import __init__ as package - -# create database session for use with gui session -ctx["database_session"] = create_database_session(Path(ctx['database'])) -# set package information from __init__ -ctx['package'] = package if __name__ == '__main__': - # app = QApplication(['', '--no-sandbox']) ex = App(ctx=ctx) sys.exit(app.exec()) diff --git a/src/submissions/backend/db/__init__.py b/src/submissions/backend/db/__init__.py index 11f467d..5c4e6d5 100644 --- a/src/submissions/backend/db/__init__.py +++ b/src/submissions/backend/db/__init__.py @@ -1,670 +1,4 @@ ''' All database related operations. ''' -from .functions import * - - -# from . import models -# from .models.kits import reagenttypes_kittypes -# from .models.submissions import reagents_submissions -# import pandas as pd -# import sqlalchemy.exc -# import sqlite3 -# import logging -# from datetime import date, datetime, timedelta -# from sqlalchemy import and_ -# import uuid -# # import base64 -# from sqlalchemy import JSON, event -# from sqlalchemy.engine import Engine -# import json -# # from dateutil.relativedelta import relativedelta -# from getpass import getuser -# import numpy as np -# from tools import check_not_nan, check_is_power_user -# import yaml -# from pathlib import Path - -# logger = logging.getLogger(f"submissions.{__name__}") - -# # The below _should_ allow automatic creation of foreign keys in the database -# @event.listens_for(Engine, "connect") -# def set_sqlite_pragma(dbapi_connection, connection_record): -# cursor = dbapi_connection.cursor() -# cursor.execute("PRAGMA foreign_keys=ON") -# cursor.close() - -# def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict: -# """ -# Upserts submissions into database - -# Args: -# ctx (dict): settings passed down from gui -# base_submission (models.BasicSubmission): submission to be add to db - -# Returns: -# None|dict : object that indicates issue raised for reporting in gui -# """ -# logger.debug(f"Hello from store_submission") -# # Add all samples to sample table -# for sample in base_submission.samples: -# sample.rsl_plate = base_submission -# logger.debug(f"Attempting to add sample: {sample.to_string()}") -# try: -# ctx['database_session'].add(sample) -# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: -# logger.debug(f"Hit an integrity error : {e}") -# continue -# # Add submission to submission table -# ctx['database_session'].add(base_submission) -# logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}") -# try: -# ctx['database_session'].commit() -# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: -# logger.debug(f"Hit an integrity error : {e}") -# ctx['database_session'].rollback() -# return {"message":"This plate number already exists, so we can't add it."} -# except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e: -# logger.debug(f"Hit an operational error: {e}") -# ctx['database_session'].rollback() -# return {"message":"The database is locked for editing."} -# return None - - -# def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict: -# """ -# Inserts a reagent into the database. - -# Args: -# ctx (dict): settings passed down from gui -# reagent (models.Reagent): Reagent object to be added to db - -# Returns: -# None|dict: object indicating issue to be reported in the gui -# """ -# logger.debug(f"Reagent dictionary: {reagent.__dict__}") -# ctx['database_session'].add(reagent) -# try: -# ctx['database_session'].commit() -# except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError): -# return {"message":"The database is locked for editing."} -# return None - - -# def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission: -# """ -# Construct submission object from dictionary - -# Args: -# ctx (dict): settings passed down from gui -# info_dict (dict): dictionary to be transformed - -# Returns: -# models.BasicSubmission: Constructed submission object -# """ -# # convert submission type into model name -# query = info_dict['submission_type'].replace(" ", "") -# # Ensure an rsl plate number exists for the plate -# if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]): -# instance = None -# msg = "A proper RSL plate number is required." -# return instance, {'code': 2, 'message': "A proper RSL plate number is required."} -# # check database for existing object -# instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() -# # get model based on submission type converted above -# logger.debug(f"Looking at models for submission type: {query}") -# model = getattr(models, query) -# logger.debug(f"We've got the model: {type(model)}") -# info_dict['submission_type'] = info_dict['submission_type'].replace(" ", "_").lower() -# # if query return nothing, ie doesn't already exist in db -# if instance == None: -# instance = model() -# logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}") -# msg = None -# code = 0 -# else: -# code = 1 -# msg = "This submission already exists.\nWould you like to overwrite?" -# for item in info_dict: -# logger.debug(f"Setting {item} to {info_dict[item]}") -# # set fields based on keys in dictionary -# match item: -# case "extraction_kit": -# q_str = info_dict[item] -# logger.debug(f"Looking up kit {q_str}") -# try: -# field_value = lookup_kittype_by_name(ctx=ctx, name=q_str) -# except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: -# logger.error(f"Hit an integrity error: {e}") -# logger.debug(f"Got {field_value} for kit {q_str}") -# case "submitting_lab": -# q_str = info_dict[item].replace(" ", "_").lower() -# logger.debug(f"Looking up organization: {q_str}") -# field_value = lookup_org_by_name(ctx=ctx, name=q_str) -# logger.debug(f"Got {field_value} for organization {q_str}") -# case "submitter_plate_num": -# # Because of unique constraint, there will be problems with -# # multiple submissions named 'None', so... -# logger.debug(f"Submitter plate id: {info_dict[item]}") -# if info_dict[item] == None or info_dict[item] == "None": -# logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.") -# info_dict[item] = uuid.uuid4().hex.upper() -# field_value = info_dict[item] -# case _: -# field_value = info_dict[item] -# # insert into field -# try: -# setattr(instance, item, field_value) -# except AttributeError: -# logger.debug(f"Could not set attribute: {item} to {info_dict[item]}") -# continue -# # calculate cost of the run: immutable cost + mutable times number of columns -# # This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future. -# try: -# instance.run_cost = instance.extraction_kit.immutable_cost + (instance.extraction_kit.mutable_cost * ((instance.sample_count / 8)/12)) -# except (TypeError, AttributeError): -# logger.debug(f"Looks like that kit doesn't have cost breakdown yet, using full plate cost.") -# instance.run_cost = instance.extraction_kit.cost_per_run -# # We need to make sure there's a proper rsl plate number -# try: -# logger.debug(f"Constructed instance: {instance.to_string()}") -# except AttributeError as e: -# logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}") -# logger.debug(f"Constructed submissions message: {msg}") -# return instance, {'code':code, 'message':msg} - - -# def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: -# """ -# Construct reagent object from dictionary - -# Args: -# ctx (dict): settings passed down from gui -# info_dict (dict): dictionary to be converted - -# Returns: -# models.Reagent: Constructed reagent object -# """ -# reagent = models.Reagent() -# for item in info_dict: -# logger.debug(f"Reagent info item: {item}") -# # set fields based on keys in dictionary -# match item: -# case "lot": -# reagent.lot = info_dict[item].upper() -# case "expiry": -# reagent.expiry = info_dict[item] -# case "type": -# reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower()) -# # add end-of-life extension from reagent type to expiry date -# # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions -# # try: -# # reagent.expiry = reagent.expiry + reagent.type.eol_ext -# # except TypeError as e: -# # logger.debug(f"We got a type error: {e}.") -# # except AttributeError: -# # pass -# return reagent - - -# def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent: -# """ -# Query db for reagent based on lot number - -# Args: -# ctx (dict): settings passed down from gui -# reagent_lot (str): lot number to query - -# Returns: -# models.Reagent: looked up reagent -# """ -# lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() -# return lookedup - - -# def get_all_reagenttype_names(ctx:dict) -> list[str]: -# """ -# Lookup all reagent types and get names - -# Args: -# ctx (dict): settings passed from gui - -# Returns: -# list[str]: reagent type names -# """ -# lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()] -# return lookedup - - -# def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType: -# """ -# Lookup a single reagent type by name - -# Args: -# ctx (dict): settings passed from gui -# rt_name (str): reagent type name to look up - -# Returns: -# models.ReagentType: looked up reagent type -# """ -# logger.debug(f"Looking up ReagentType by name: {rt_name}") -# lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() -# logger.debug(f"Found ReagentType: {lookedup}") -# return lookedup - - -# def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]: -# """ -# Lookup kits by a sample type its used for - -# Args: -# ctx (dict): settings passed from gui -# used_by (str): sample type (should be string in D3 of excel sheet) - -# Returns: -# list[models.KitType]: list of kittypes that have that sample type in their uses -# """ -# return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() - - -# def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType: -# """ -# Lookup a kit type by name - -# Args: -# ctx (dict): settings passed from bui -# name (str): name of kit to query - -# Returns: -# models.KitType: retrieved kittype -# """ -# logger.debug(f"Querying kittype: {name}") -# return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first() - - -# def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]: -# """ -# Lookup reagents by their type name - -# Args: -# ctx (dict): settings passed from gui -# type_name (str): reagent type name - -# Returns: -# list[models.Reagent]: list of retrieved reagents -# """ -# return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() - - -# def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]: -# """ -# Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.) - -# Args: -# ctx (dict): settings pass by gui -# type_name (str): reagent type name -# kit_name (str): kit name - -# Returns: -# list[models.Reagent]: list of retrieved reagents -# """ -# # What I want to do is get the reagent type by name -# # Hang on, this is going to be a long one. -# # by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all() -# rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name)) -# # add filter for kit name... -# try: -# check = not np.isnan(kit_name) -# except TypeError: -# check = True -# if check: -# kit_type = lookup_kittype_by_name(ctx=ctx, name=kit_name) -# logger.debug(f"reagenttypes: {[item.name for item in rt_types.all()]}, kit: {kit_type.name}") -# # add in lookup for related kit_id -# rt_types = rt_types.join(reagenttypes_kittypes).filter(reagenttypes_kittypes.c.kits_id==kit_type.id).first() -# else: -# rt_types = rt_types.first() -# output = rt_types.instances -# return output - - -# def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]: -# """ -# Get all submissions, filtering by type if given - -# Args: -# ctx (dict): settings pass from gui -# type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None. - -# Returns: -# list[models.BasicSubmission]: list of retrieved submissions -# """ -# if sub_type == None: -# subs = ctx['database_session'].query(models.BasicSubmission).all() -# else: -# subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all() -# return subs - -# def lookup_all_orgs(ctx:dict) -> list[models.Organization]: -# """ -# Lookup all organizations (labs) - -# Args: -# ctx (dict): settings passed from gui - -# Returns: -# list[models.Organization]: list of retrieved organizations -# """ -# return ctx['database_session'].query(models.Organization).all() - -# def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization: -# """ -# Lookup organization (lab) by (startswith) name. - -# Args: -# ctx (dict): settings passed from gui -# name (str | None): name of organization - -# Returns: -# models.Organization: retrieved organization -# """ -# logger.debug(f"Querying organization: {name}") -# return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first() - -# def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame: -# """ -# Convert submissions looked up by type to dataframe - -# Args: -# ctx (dict): settings passed by gui -# type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None. - -# Returns: -# pd.DataFrame: dataframe constructed from retrieved submissions -# """ -# logger.debug(f"Type: {sub_type}") -# # use lookup function to create list of dicts -# subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, sub_type=sub_type)] -# # make df from dicts (records) in list -# df = pd.DataFrame.from_records(subs) -# # Exclude sub information -# try: -# df = df.drop("controls", axis=1) -# except: -# logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.") -# try: -# df = df.drop("ext_info", axis=1) -# except: -# logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.") -# return df - - -# def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission: -# """ -# Lookup submission by id number - -# Args: -# ctx (dict): settings passed from gui -# id (int): submission id number - -# Returns: -# models.BasicSubmission: retrieved submission -# """ -# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() - - -# def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]: -# """ -# Lookup submissions greater than start_date and less than end_date - -# Args: -# ctx (dict): settings passed from gui -# start_date (datetime.date): date to start looking -# end_date (datetime.date): date to end looking - -# Returns: -# list[models.BasicSubmission]: list of retrieved submissions -# """ -# # return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all() -# start_date = start_date.strftime("%Y-%m-%d") -# end_date = end_date.strftime("%Y-%m-%d") -# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() - - -# def get_all_Control_Types_names(ctx:dict) -> list[str]: -# """ -# Grabs all control type names from db. - -# Args: -# settings (dict): settings passed down from gui. - -# Returns: -# list: list of controltype names -# """ -# conTypes = ctx['database_session'].query(models.ControlType).all() -# conTypes = [conType.name for conType in conTypes] -# logger.debug(f"Control Types: {conTypes}") -# return conTypes - - -# def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: -# """ -# Create and store a new kit in the database based on a .yml file -# TODO: split into create and store functions - -# Args: -# ctx (dict): Context dictionary passed down from frontend -# exp (dict): Experiment dictionary created from yaml file - -# Returns: -# dict: a dictionary containing results of db addition -# """ -# # Don't want just anyone adding kits -# if not check_is_power_user(ctx=ctx): -# logger.debug(f"{getuser()} does not have permission to add kits.") -# return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"} -# # iterate through keys in dict -# for type in exp: -# if type == "password": -# continue -# # A submission type may use multiple kits. -# for kt in exp[type]['kits']: -# kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], constant_cost=exp[type]["kits"][kt]["constant_cost"], mutable_cost=exp[type]["kits"][kt]["mutable_cost"]) -# # A kit contains multiple reagent types. -# for r in exp[type]['kits'][kt]['reagenttypes']: -# # check if reagent type already exists. -# look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first() -# if look_up == None: -# rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit]) -# else: -# rt = look_up -# rt.kits.append(kit) -# # add this because I think it's necessary to get proper back population -# kit.reagent_types_id.append(rt.id) -# ctx['database_session'].add(rt) -# logger.debug(f"Kit construction reagent type: {rt.__dict__}") -# logger.debug(f"Kit construction kit: {kit.__dict__}") -# ctx['database_session'].add(kit) -# ctx['database_session'].commit() -# return {'code':0, 'message':'Kit has been added', 'status': 'information'} - - -# def create_org_from_yaml(ctx:dict, org:dict) -> dict: -# """ -# Create and store a new organization based on a .yml file - -# Args: -# ctx (dict): Context dictionary passed down from frontend -# org (dict): Dictionary containing organization info. - -# Returns: -# dict: dictionary containing results of db addition -# """ -# # Don't want just anyone adding in clients -# if not check_is_power_user(ctx=ctx): -# logger.debug(f"{getuser()} does not have permission to add kits.") -# return {'code':1, 'message':"This user does not have permission to add organizations."} -# # the yml can contain multiple clients -# for client in org: -# cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre']) -# # a client can contain multiple contacts -# for contact in org[client]['contacts']: -# cont_name = list(contact.keys())[0] -# # check if contact already exists -# look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first() -# if look_up == None: -# cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org]) -# else: -# cli_cont = look_up -# cli_cont.organization.append(cli_org) -# ctx['database_session'].add(cli_cont) -# logger.debug(f"Client creation contact: {cli_cont.__dict__}") -# logger.debug(f"Client creation client: {cli_org.__dict__}") -# ctx['database_session'].add(cli_org) -# ctx["database_session"].commit() -# return {"code":0, "message":"Organization has been added."} - - -# def lookup_all_sample_types(ctx:dict) -> list[str]: -# """ -# Lookup all sample types and get names - -# Args: -# ctx (dict): settings pass from gui - -# Returns: -# list[str]: list of sample type names -# """ -# uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()] -# # flattened list of lists -# uses = list(set([item for sublist in uses for item in sublist])) -# return uses - - -# def get_all_available_modes(ctx:dict) -> list[str]: -# """ -# Get types of analysis for controls - -# Args: -# ctx (dict): settings passed from gui - -# Returns: -# list[str]: list of analysis types -# """ -# # Only one control is necessary since they all share the same control types. -# rel = ctx['database_session'].query(models.Control).first() -# try: -# cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)] -# except AttributeError as e: -# logger.debug(f"Failed to get available modes from db: {e}") -# cols = [] -# return cols - - -# def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]: -# """ -# Returns a list of control objects that are instances of the input controltype. -# Between dates if supplied. - -# Args: -# ctx (dict): Settings passed down from gui -# con_type (str): Name of control type. -# start_date (date | None, optional): Start date of query. Defaults to None. -# end_date (date | None, optional): End date of query. Defaults to None. - -# Returns: -# list[models.Control]: list of control samples. -# """ -# logger.debug(f"Using dates: {start_date} to {end_date}") -# if start_date != None and end_date != None: -# start_date = start_date.strftime("%Y-%m-%d") -# end_date = end_date.strftime("%Y-%m-%d") -# output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all() -# else: -# output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all() -# logger.debug(f"Returned controls between dates: {output}") -# return output - - -# def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: -# """ -# Get subtypes for a control analysis mode - -# Args: -# ctx (dict): settings passed from gui -# type (str): control type name -# mode (str): analysis mode name - -# Returns: -# list[str]: list of subtype names -# """ -# # Only the first control of type is necessary since they all share subtypes -# try: -# outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0] -# except TypeError: -# return [] -# # Get analysis mode data as dict -# jsoner = json.loads(getattr(outs, mode)) -# logger.debug(f"JSON out: {jsoner}") -# try: -# genera = list(jsoner.keys())[0] -# except IndexError: -# return [] -# subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] -# return subtypes - - -# def get_all_controls(ctx:dict) -> list[models.Control]: -# """ -# Retrieve a list of all controls from the database - -# Args: -# ctx (dict): settings passed down from the gui. - -# Returns: -# list[models.Control]: list of all control objects -# """ -# return ctx['database_session'].query(models.Control).all() - - -# def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission: -# """ -# Retrieve a submission from the database based on rsl plate number - -# Args: -# ctx (dict): settings passed down from gui -# rsl_num (str): rsl plate number - -# Returns: -# models.BasicSubmission: Submissions object retrieved from database -# """ -# return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() - - -# def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]: -# return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() - - -# def delete_submission_by_id(ctx:dict, id:int) -> None: -# """ -# Deletes a submission and its associated samples from the database. - -# Args: -# ctx (dict): settings passed down from gui -# id (int): id of submission to be deleted. -# """ -# # In order to properly do this Im' going to have to delete all of the secondary table stuff as well. -# # Retrieve submission -# sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() -# # Convert to dict for storing backup as a yml -# backup = sub.to_dict() -# try: -# with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: -# yaml.dump(backup, f) -# except KeyError: -# pass -# sub.reagents = [] -# for sample in sub.samples: -# ctx['database_session'].delete(sample) -# ctx["database_session"].delete(sub) -# ctx["database_session"].commit() \ No newline at end of file +from .functions import * \ No newline at end of file diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py index 5976b38..2e9ca01 100644 --- a/src/submissions/backend/db/functions.py +++ b/src/submissions/backend/db/functions.py @@ -3,8 +3,8 @@ Convenience functions for interacting with the database. ''' from . import models -from .models.kits import reagenttypes_kittypes -from .models.submissions import reagents_submissions +from .models.kits import reagenttypes_kittypes, KitType +from .models.submissions import reagents_submissions, BasicSubmission import pandas as pd import sqlalchemy.exc import sqlite3 @@ -18,6 +18,7 @@ from getpass import getuser import numpy as np import yaml from pathlib import Path +from tools import Settings @@ -31,12 +32,12 @@ def set_sqlite_pragma(dbapi_connection, connection_record): cursor.close() -def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict: +def store_submission(ctx:Settings, base_submission:models.BasicSubmission) -> None|dict: """ Upserts submissions into database Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui base_submission (models.BasicSubmission): submission to be add to db Returns: @@ -57,50 +58,57 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d sample.artic_rsl_plate = base_submission logger.debug(f"Attempting to add sample: {sample.to_string()}") try: - ctx['database_session'].add(sample) + # ctx['database_session'].add(sample) + ctx.database_session.add(sample) except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: logger.debug(f"Hit an integrity error : {e}") continue # Add submission to submission table - ctx['database_session'].add(base_submission) + # ctx['database_session'].add(base_submission) + ctx.database_session.add(base_submission) logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}") try: - ctx['database_session'].commit() + # ctx['database_session'].commit() + ctx.database_session.commit() except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: logger.debug(f"Hit an integrity error : {e}") - ctx['database_session'].rollback() + # ctx['database_session'].rollback() + ctx.database_session.rollback() return {"message":"This plate number already exists, so we can't add it.", "status":"Critical"} except (sqlite3.OperationalError, sqlalchemy.exc.IntegrityError) as e: logger.debug(f"Hit an operational error: {e}") - ctx['database_session'].rollback() + # ctx['database_session'].rollback() + ctx.database_session.rollback() return {"message":"The database is locked for editing.", "status":"Critical"} return None -def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict: +def store_reagent(ctx:Settings, reagent:models.Reagent) -> None|dict: """ Inserts a reagent into the database. Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui reagent (models.Reagent): Reagent object to be added to db Returns: None|dict: object indicating issue to be reported in the gui """ logger.debug(f"Reagent dictionary: {reagent.__dict__}") - ctx['database_session'].add(reagent) + # ctx['database_session'].add(reagent) + ctx.database_session.add(reagent) try: - ctx['database_session'].commit() + # ctx['database_session'].commit() + ctx.database_session.commit() except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError): return {"message":"The database is locked for editing."} return None -def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission: +def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmission: """ Construct submission object from dictionary Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui info_dict (dict): dictionary to be transformed Returns: @@ -118,7 +126,8 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio # enforce conventions on the rsl plate number from the form info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name # check database for existing object - instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() + # instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() + instance = ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() # get model based on submission type converted above logger.debug(f"Looking at models for submission type: {query}") model = getattr(models, query) @@ -201,12 +210,12 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio logger.debug(f"Constructed submissions message: {msg}") return instance, {'code':code, 'message':msg} -def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: +def construct_reagent(ctx:Settings, info_dict:dict) -> models.Reagent: """ Construct reagent object from dictionary Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui info_dict (dict): dictionary to be converted Returns: @@ -233,84 +242,90 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent: # pass return reagent -def get_all_reagenttype_names(ctx:dict) -> list[str]: +def get_all_reagenttype_names(ctx:Settings) -> list[str]: """ Lookup all reagent types and get names Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui Returns: list[str]: reagent type names """ - lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()] + # lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()] + lookedup = [item.__str__() for item in ctx.database_session.query(models.ReagentType).all()] return lookedup -def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType: +def lookup_reagenttype_by_name(ctx:Settings, rt_name:str) -> models.ReagentType: """ Lookup a single reagent type by name Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui rt_name (str): reagent type name to look up Returns: models.ReagentType: looked up reagent type """ logger.debug(f"Looking up ReagentType by name: {rt_name}") - lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() + # lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() + lookedup = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() logger.debug(f"Found ReagentType: {lookedup}") return lookedup -def lookup_kittype_by_use(ctx:dict, used_by:str|None=None) -> list[models.KitType]: +def lookup_kittype_by_use(ctx:Settings, used_by:str|None=None) -> list[models.KitType]: """ Lookup kits by a sample type its used for Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object from gui used_by (str): sample type (should be string in D3 of excel sheet) Returns: list[models.KitType]: list of kittypes that have that sample type in their uses """ if used_by != None: - return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() + # return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() + return ctx.database_session.query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all() else: - return ctx['database_session'].query(models.KitType).all() + # return ctx['database_session'].query(models.KitType).all() + return ctx.database_session.query(models.KitType).all() -def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType: +def lookup_kittype_by_name(ctx:Settings, name:str) -> models.KitType: """ Lookup a kit type by name Args: - ctx (dict): settings passed from bui + ctx (Settings): settings object passed from bui name (str): name of kit to query Returns: models.KitType: retrieved kittype """ logger.debug(f"Querying kittype: {name}") - return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first() + # return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first() + return ctx.database_session.query(models.KitType).filter(models.KitType.name==name).first() -def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]: +def lookup_regent_by_type_name(ctx:Settings, type_name:str) -> list[models.Reagent]: """ Lookup reagents by their type name Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui type_name (str): reagent type name Returns: list[models.Reagent]: list of retrieved reagents """ - return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() + # return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() + return ctx.database_session.query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all() -def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]: +def lookup_regent_by_type_name_and_kit_name(ctx:Settings, type_name:str, kit_name:str) -> list[models.Reagent]: """ Lookup reagents by their type name and kits they belong to (Broken... maybe cursed, I'm not sure.) Args: - ctx (dict): settings pass by gui + ctx (Settings): settings object pass by gui type_name (str): reagent type name kit_name (str): kit name @@ -320,7 +335,8 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st # What I want to do is get the reagent type by name # Hang on, this is going to be a long one. # by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all() - rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name)) + # rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name)) + rt_types = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name)) # add filter for kit name... try: check = not np.isnan(kit_name) @@ -336,55 +352,59 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st output = rt_types.instances return output -def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[models.BasicSubmission]: +def lookup_all_submissions_by_type(ctx:Settings, sub_type:str|None=None) -> list[models.BasicSubmission]: """ Get all submissions, filtering by type if given Args: - ctx (dict): settings pass from gui + ctx (Settings): settings object pass from gui type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None. Returns: list[models.BasicSubmission]: list of retrieved submissions """ if sub_type == None: - subs = ctx['database_session'].query(models.BasicSubmission).all() + # subs = ctx['database_session'].query(models.BasicSubmission).all() + subs = ctx.database_session.query(models.BasicSubmission).all() else: - subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all() + # subs = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all() + subs = ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.submission_type==sub_type.lower().replace(" ", "_")).all() return subs -def lookup_all_orgs(ctx:dict) -> list[models.Organization]: +def lookup_all_orgs(ctx:Settings) -> list[models.Organization]: """ Lookup all organizations (labs) Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui Returns: list[models.Organization]: list of retrieved organizations """ - return ctx['database_session'].query(models.Organization).all() + # return ctx['database_session'].query(models.Organization).all() + return ctx.database_session.query(models.Organization).all() -def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization: +def lookup_org_by_name(ctx:Settings, name:str|None) -> models.Organization: """ Lookup organization (lab) by (startswith) name. Args: - ctx (dict): settings passed from gui + ctx (Settings): settings passed from gui name (str | None): name of organization Returns: models.Organization: retrieved organization """ logger.debug(f"Querying organization: {name}") - return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first() + # return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first() + return ctx.database_session.query(models.Organization).filter(models.Organization.name.startswith(name)).first() -def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame: +def submissions_to_df(ctx:Settings, sub_type:str|None=None) -> pd.DataFrame: """ Convert submissions looked up by type to dataframe Args: - ctx (dict): settings passed by gui + ctx (Settings): settings object passed by gui type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None. Returns: @@ -410,25 +430,26 @@ def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame: logger.warning(f"Couldn't drop 'pcr_info' column from submissionsheet df.") return df -def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission: +def lookup_submission_by_id(ctx:Settings, id:int) -> models.BasicSubmission: """ Lookup submission by id number Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui id (int): submission id number Returns: models.BasicSubmission: retrieved submission """ - return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() + # return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() + return ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() -def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]: +def lookup_submissions_by_date_range(ctx:Settings, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]: """ Lookup submissions greater than start_date and less than end_date Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui start_date (datetime.date): date to start looking end_date (datetime.date): date to end looking @@ -438,30 +459,32 @@ def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_dat # return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all() start_date = start_date.strftime("%Y-%m-%d") end_date = end_date.strftime("%Y-%m-%d") - return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() + # return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() + return ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all() -def get_all_Control_Types_names(ctx:dict) -> list[str]: +def get_all_Control_Types_names(ctx:Settings) -> list[str]: """ Grabs all control type names from db. Args: - settings (dict): settings passed down from gui. + settings (Settings): settings object passed down from gui. Returns: list: list of controltype names """ - conTypes = ctx['database_session'].query(models.ControlType).all() + # conTypes = ctx['database_session'].query(models.ControlType).all() + conTypes = ctx.database_session.query(models.ControlType).all() conTypes = [conType.name for conType in conTypes] logger.debug(f"Control Types: {conTypes}") return conTypes -def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: +def create_kit_from_yaml(ctx:Settings, exp:dict) -> dict: """ Create and store a new kit in the database based on a .yml file TODO: split into create and store functions Args: - ctx (dict): Context dictionary passed down from frontend + ctx (Settings): Context object passed down from frontend exp (dict): Experiment dictionary created from yaml file Returns: @@ -474,8 +497,8 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"} # iterate through keys in dict for type in exp: - if type == "password": - continue + # if type == "password": + # continue # A submission type may use multiple kits. for kt in exp[type]['kits']: kit = models.KitType(name=kt, @@ -488,7 +511,8 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: for r in exp[type]['kits'][kt]['reagenttypes']: # check if reagent type already exists. r = massage_common_reagents(r) - look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first() + # look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first() + look_up = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==r).first() if look_up == None: rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit], required=1) else: @@ -500,19 +524,22 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict: except AttributeError as e: logger.error(f"Error appending reagent id to kit.reagent_types_id: {e}, creating new.") # kit.reagent_types_id = [rt.id] - ctx['database_session'].add(rt) + # ctx['database_session'].add(rt) + ctx.database_session.add(rt) logger.debug(f"Kit construction reagent type: {rt.__dict__}") logger.debug(f"Kit construction kit: {kit.__dict__}") - ctx['database_session'].add(kit) - ctx['database_session'].commit() + # ctx['database_session'].add(kit) + ctx.database_session.add(kit) + # ctx['database_session'].commit() + ctx.database_session.commit() return {'code':0, 'message':'Kit has been added', 'status': 'information'} -def create_org_from_yaml(ctx:dict, org:dict) -> dict: +def create_org_from_yaml(ctx:Settings, org:dict) -> dict: """ Create and store a new organization based on a .yml file Args: - ctx (dict): Context dictionary passed down from frontend + ctx (Settings): Context object passed down from frontend org (dict): Dictionary containing organization info. Returns: @@ -530,46 +557,52 @@ def create_org_from_yaml(ctx:dict, org:dict) -> dict: for contact in org[client]['contacts']: cont_name = list(contact.keys())[0] # check if contact already exists - look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first() + # look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first() + look_up = ctx.database_session.query(models.Contact).filter(models.Contact.name==cont_name).first() if look_up == None: cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org]) else: cli_cont = look_up cli_cont.organization.append(cli_org) - ctx['database_session'].add(cli_cont) + # ctx['database_session'].add(cli_cont) + ctx.database_session.add(cli_cont) logger.debug(f"Client creation contact: {cli_cont.__dict__}") logger.debug(f"Client creation client: {cli_org.__dict__}") - ctx['database_session'].add(cli_org) - ctx["database_session"].commit() + # ctx['database_session'].add(cli_org) + ctx.database_session.add(cli_org) + # ctx["database_session"].commit() + ctx.database_session.commit() return {"code":0, "message":"Organization has been added."} -def lookup_all_sample_types(ctx:dict) -> list[str]: +def lookup_all_sample_types(ctx:Settings) -> list[str]: """ Lookup all sample types and get names Args: - ctx (dict): settings pass from gui + ctx (Settings): settings object pass from gui Returns: list[str]: list of sample type names """ - uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()] + # uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()] + uses = [item.used_for for item in ctx.database_session.query(models.KitType).all()] # flattened list of lists uses = list(set([item for sublist in uses for item in sublist])) return uses -def get_all_available_modes(ctx:dict) -> list[str]: +def get_all_available_modes(ctx:Settings) -> list[str]: """ Get types of analysis for controls Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui Returns: list[str]: list of analysis types """ # Only one control is necessary since they all share the same control types. - rel = ctx['database_session'].query(models.Control).first() + # rel = ctx['database_session'].query(models.Control).first() + rel = ctx.database_session.query(models.Control).first() try: cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)] except AttributeError as e: @@ -577,13 +610,13 @@ def get_all_available_modes(ctx:dict) -> list[str]: cols = [] return cols -def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]: +def get_all_controls_by_type(ctx:Settings, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]: """ Returns a list of control objects that are instances of the input controltype. Between dates if supplied. Args: - ctx (dict): Settings passed down from gui + ctx (Settings): Settings object passed down from gui con_type (str): Name of control type. start_date (date | None, optional): Start date of query. Defaults to None. end_date (date | None, optional): End date of query. Defaults to None. @@ -595,18 +628,19 @@ def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, if start_date != None and end_date != None: start_date = start_date.strftime("%Y-%m-%d") end_date = end_date.strftime("%Y-%m-%d") - output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all() + # output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all() + output = ctx.database_session.query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all() else: - output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all() + output = ctx.database_session.query(models.Control).join(models.ControlType).filter_by(name=con_type).all() logger.debug(f"Returned controls between dates: {[item.submitted_date for item in output]}") return output -def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: +def get_control_subtypes(ctx:Settings, type:str, mode:str) -> list[str]: """ Get subtypes for a control analysis mode Args: - ctx (dict): settings passed from gui + ctx (Settings): settings object passed from gui type (str): control type name mode (str): analysis mode name @@ -628,7 +662,7 @@ def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]: subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item] return subtypes -def get_all_controls(ctx:dict) -> list[models.Control]: +def get_all_controls(ctx:Settings) -> list[models.Control]: """ Retrieve a list of all controls from the database @@ -638,109 +672,118 @@ def get_all_controls(ctx:dict) -> list[models.Control]: Returns: list[models.Control]: list of all control objects """ - return ctx['database_session'].query(models.Control).all() + return ctx.database_session.query(models.Control).all() -def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission: +def lookup_submission_by_rsl_num(ctx:Settings, rsl_num:str) -> models.BasicSubmission: """ Retrieve a submission from the database based on rsl plate number Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui rsl_num (str): rsl plate number Returns: models.BasicSubmission: Submissions object retrieved from database """ - return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() + # return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() + return ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first() -def lookup_submissions_using_reagent(ctx:dict, reagent:models.Reagent) -> list[models.BasicSubmission]: +def lookup_submissions_using_reagent(ctx:Settings, reagent:models.Reagent) -> list[models.BasicSubmission]: """ Retrieves each submission using a specified reagent. Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings passed down from gui reagent (models.Reagent): reagent object in question Returns: list[models.BasicSubmission]: list of all submissions using specified reagent. """ - return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() + # return ctx['database_session'].query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() + return ctx.database_session.query(models.BasicSubmission).join(reagents_submissions).filter(reagents_submissions.c.reagent_id==reagent.id).all() -def delete_submission_by_id(ctx:dict, id:int) -> None: +def delete_submission_by_id(ctx:Settings, id:int) -> None: """ Deletes a submission and its associated samples from the database. Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui id (int): id of submission to be deleted. """ # In order to properly do this Im' going to have to delete all of the secondary table stuff as well. # Retrieve submission - sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() + # sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() + sub = ctx.database_session.query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first() # Convert to dict for storing backup as a yml backup = sub.to_dict() try: - with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: + # with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: + with open(Path(ctx.backup_path).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f: yaml.dump(backup, f) except KeyError: pass sub.reagents = [] for sample in sub.samples: if sample.rsl_plate == sub: - ctx['database_session'].delete(sample) + # ctx['database_session'].delete(sample) + ctx.database_session.delete(sample) else: logger.warning(f"Not deleting sample {sample.ww_sample_full_id} because it belongs to another plate.") - ctx["database_session"].delete(sub) - ctx["database_session"].commit() + # ctx["database_session"].delete(sub) + # ctx["database_session"].commit() + ctx.database_session.delete(sub) + ctx.database_session.commit() -def lookup_ww_sample_by_rsl_sample_number(ctx:dict, rsl_number:str) -> models.WWSample: +def lookup_ww_sample_by_rsl_sample_number(ctx:Settings, rsl_number:str) -> models.WWSample: """ Retrieves wastewater sample from database by rsl sample number Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui rsl_number (str): sample number assigned by robotics lab Returns: models.WWSample: instance of wastewater sample """ - return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first() + # return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first() + return ctx.database_session.query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first() -def lookup_ww_sample_by_ww_sample_num(ctx:dict, sample_number:str) -> models.WWSample: +def lookup_ww_sample_by_ww_sample_num(ctx:Settings, sample_number:str) -> models.WWSample: """ Retrieves wastewater sample from database by ww sample number Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui sample_number (str): sample number assigned by wastewater Returns: models.WWSample: instance of wastewater sample """ - return ctx['database_session'].query(models.WWSample).filter(models.WWSample.ww_sample_full_id==sample_number).first() + return ctx.database_session.query(models.WWSample).filter(models.WWSample.ww_sample_full_id==sample_number).first() -def lookup_ww_sample_by_sub_sample_rsl(ctx:dict, sample_rsl:str, plate_rsl:str) -> models.WWSample: +def lookup_ww_sample_by_sub_sample_rsl(ctx:Settings, sample_rsl:str, plate_rsl:str) -> models.WWSample: """ Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number. This will likely replace simply looking up by the sample rsl above cine I need to control for repeats. Args: - ctx (dict): settings passed down from the gui + ctx (Settings): settings passed down from the gui sample_rsl (str): rsl number of the relevant sample plate_rsl (str): rsl number of the parent plate Returns: models.WWSample: Relevant wastewater object """ - return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first() + # return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first() + return ctx.database_session.query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first() -def lookup_ww_sample_by_sub_sample_well(ctx:dict, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WWSample: +def lookup_ww_sample_by_sub_sample_well(ctx:Settings, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WWSample: """ Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number. This will likely replace simply looking up by the sample rsl above cine I need to control for repeats. Args: - ctx (dict): settings passed down from the gui + ctx (Settings): settings object passed down from the gui sample_rsl (str): rsl number of the relevant sample well_num (str): well number of the relevant sample plate_rsl (str): rsl number of the parent plate @@ -748,17 +791,21 @@ def lookup_ww_sample_by_sub_sample_well(ctx:dict, sample_rsl:str, well_num:str, Returns: models.WWSample: Relevant wastewater object """ - return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission) \ + # return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission) \ + # .filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \ + # .filter(models.WWSample.rsl_number==sample_rsl) \ + # .filter(models.WWSample.well_number==well_num).first() + return ctx.database_session.query(models.WWSample).join(models.BasicSubmission) \ .filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \ .filter(models.WWSample.rsl_number==sample_rsl) \ .filter(models.WWSample.well_number==well_num).first() -def update_ww_sample(ctx:dict, sample_obj:dict): +def update_ww_sample(ctx:Settings, sample_obj:dict): """ Retrieves wastewater sample by rsl number (sample_obj['sample']) and updates values from constructed dictionary Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings object passed down from gui sample_obj (dict): dictionary representing new values for database object """ # ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample']) @@ -779,11 +826,28 @@ def update_ww_sample(ctx:dict, sample_obj:dict): else: logger.error(f"Unable to find sample {sample_obj['sample']}") return - ctx['database_session'].add(ww_samp) - ctx["database_session"].commit() + # ctx['database_session'].add(ww_samp) + # ctx["database_session"].commit() + ctx.database_session.add(ww_samp) + ctx.database_session.commit() -def lookup_discounts_by_org_and_kit(ctx:dict, kit_id:int, lab_id:int): - return ctx['database_session'].query(models.Discount).join(models.KitType).join(models.Organization).filter(and_( +def lookup_discounts_by_org_and_kit(ctx:Settings, kit_id:int, lab_id:int) -> list: + """ + Find discounts for kit for specified client + + Args: + ctx (Settings): settings object passed down from gui + kit_id (int): Id number of desired kit + lab_id (int): Id number of desired client + + Returns: + list: list of Discount objects + """ + # return ctx['database_session'].query(models.Discount).join(models.KitType).join(models.Organization).filter(and_( + # models.KitType.id==kit_id, + # models.Organization.id==lab_id + # )).all() + return ctx.database_session.query(models.Discount).join(models.KitType).join(models.Organization).filter(and_( models.KitType.id==kit_id, models.Organization.id==lab_id )).all() @@ -861,12 +925,12 @@ def platemap_plate(submission:models.BasicSubmission) -> list: # image = make_plate_map(plate_dicto) return plate_dicto -def lookup_reagent(ctx:dict, reagent_lot:str, type_name:str|None=None) -> models.Reagent: +def lookup_reagent(ctx:Settings, reagent_lot:str, type_name:str|None=None) -> models.Reagent: """ Query db for reagent based on lot number, with optional reagent type to enforce Args: - ctx (dict): settings passed down from gui + ctx (Settings): settings passed down from gui reagent_lot (str): lot number to query type_name (str | None, optional): name of reagent type. Defaults to None. @@ -874,21 +938,67 @@ def lookup_reagent(ctx:dict, reagent_lot:str, type_name:str|None=None) -> models models.Reagent: looked up reagent """ if reagent_lot != None and type_name != None: - return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).first() + # return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).first() + return ctx.database_session.query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).first() elif type_name == None: - return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() + # return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() + return ctx.database_session.query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() -def lookup_last_used_reagenttype_lot(ctx:dict, type_name:str) -> models.Reagent: +def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str) -> models.Reagent: """ Look up the last used reagent of the reagent type Args: - ctx (dict): Settings passed down from gui + ctx (Settings): Settings object passed down from gui type_name (str): Name of reagent type Returns: models.Reagent: Reagent object with last used lot. """ - rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first() + # rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first() + rt = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==type_name).first() logger.debug(f"Reagent type looked up for {type_name}: {rt.__str__()}") - return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name) \ No newline at end of file + try: + return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name) + except AttributeError: + return None + + +def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None) -> dict|None: + """ + Ensures all reagents expected in kit are listed in Submission + + Args: + sub (BasicSubmission | KitType): Object containing complete list of reagent types. + reagenttypes (list | None, optional): List to check against complete list. Defaults to None. + + Returns: + dict|None: Result object containing a message and any missing components. + """ + logger.debug(type(sub)) + # What type is sub? + match sub: + case BasicSubmission(): + # Get all required reagent types for this kit. + ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1] + # Overwrite function parameter reagenttypes + try: + reagenttypes = [reagent.type.name for reagent in sub.reagents] + except AttributeError as e: + logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}") + case KitType(): + ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types if reagenttype.required == 1] + logger.debug(f"Kit reagents: {ext_kit_rtypes}") + logger.debug(f"Submission reagents: {reagenttypes}") + # check if lists are equal + check = set(ext_kit_rtypes) == set(reagenttypes) + logger.debug(f"Checking if reagents match kit contents: {check}") + # what reagent types are in both lists? + missing = list(set(ext_kit_rtypes).difference(reagenttypes)) + logger.debug(f"Missing reagents types: {missing}") + # if lists are equal return no problem + if len(missing)==0: + result = None + else: + result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing} + return result \ No newline at end of file diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 9591322..c5e4572 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -15,7 +15,7 @@ import re import numpy as np from datetime import date, datetime import uuid -from tools import check_not_nan, RSLNamer, massage_common_reagents, convert_nans_to_nones +from tools import check_not_nan, RSLNamer, massage_common_reagents, convert_nans_to_nones, Settings logger = logging.getLogger(f"submissions.{__name__}") @@ -23,10 +23,10 @@ class SheetParser(object): """ object to pull and contain data from excel file """ - def __init__(self, ctx:dict, filepath:Path|None = None): + def __init__(self, ctx:Settings, filepath:Path|None = None): """ Args: - ctx (dict): Settings passed down from gui + ctx (Settings): Settings object passed down from gui filepath (Path | None, optional): file path to excel sheet. Defaults to None. """ self.ctx = ctx @@ -59,15 +59,17 @@ class SheetParser(object): """ # Check metadata for category, return first category if self.xl.book.properties.category != None: + logger.debug("Using file properties to find type...") categories = [item.strip().title() for item in self.xl.book.properties.category.split(";")] return categories[0].replace(" ", "_") else: # This code is going to be depreciated once there is full adoption of the client sheets # with updated metadata... but how will it work for Artic? + logger.debug("Using excel map to find type...") try: - for type in self.ctx['submission_types']: + for type in self.ctx.submission_types: # This gets the *first* submission type that matches the sheet names in the workbook - if self.xl.sheet_names == self.ctx['submission_types'][type]['excel_map']: + if self.xl.sheet_names == self.ctx.submission_types[type]['excel_map']: return type.title() return "Unknown" except Exception as e: @@ -299,6 +301,8 @@ class SheetParser(object): return_list = [] for _, ii in df.iloc[1:,1:].iterrows(): for c in df.columns.to_list(): + if not check_not_nan(c): + continue logger.debug(f"Checking {ii.name}{c}") if check_not_nan(df.loc[ii.name, int(c)]) and df.loc[ii.name, int(c)] != "EMPTY": try: @@ -310,21 +314,23 @@ class SheetParser(object): continue logger.debug(f"massaged sample list for {self.sub['rsl_plate_num']}: {pprint.pprint(return_list)}") return return_list - submission_info = self.xl.parse("First Strand", dtype=object) - biomek_info = self.xl.parse("ArticV4 Biomek", dtype=object) - sub_reagent_range = submission_info.iloc[56:, 1:4].dropna(how='all') - biomek_reagent_range = biomek_info.iloc[60:, 0:3].dropna(how='all') + submission_info = self.xl.parse("cDNA", dtype=object) + biomek_info = self.xl.parse("ArticV4_1 Biomek", dtype=object) + # Reminder that the iloc uses row, column ordering + # sub_reagent_range = submission_info.iloc[56:, 1:4].dropna(how='all') + sub_reagent_range = submission_info.iloc[7:15, 5:9].dropna(how='all') + biomek_reagent_range = biomek_info.iloc[62:, 0:3].dropna(how='all') self.sub['submitter_plate_num'] = "" self.sub['rsl_plate_num'] = RSLNamer(ctx=self.ctx, instr=self.filepath.__str__()).parsed_name self.sub['submitted_date'] = biomek_info.iloc[1][1] self.sub['submitting_lab'] = "Enterics Wastewater Genomics" - self.sub['sample_count'] = submission_info.iloc[4][6] + self.sub['sample_count'] = submission_info.iloc[34][6] self.sub['extraction_kit'] = "ArticV4.1" self.sub['technician'] = f"MM: {biomek_info.iloc[2][1]}, Bio: {biomek_info.iloc[3][1]}" self.sub['reagents'] = [] parse_reagents(sub_reagent_range) parse_reagents(biomek_reagent_range) - samples = massage_samples(biomek_info.iloc[22:31, 0:]) + samples = massage_samples(biomek_info.iloc[25:33, 0:]) sample_parser = SampleParser(self.ctx, pd.DataFrame.from_records(samples)) sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].lower()}_samples") self.sample_result, self.sub['samples'] = sample_parse() @@ -347,12 +353,12 @@ class SampleParser(object): object to pull data for samples in excel sheet and construct individual sample objects """ - def __init__(self, ctx:dict, df:pd.DataFrame, elution_map:pd.DataFrame|None=None) -> None: + def __init__(self, ctx:Settings, df:pd.DataFrame, elution_map:pd.DataFrame|None=None) -> None: """ convert sample sub-dataframe to dictionary of records Args: - ctx (dict): setting passed down from gui + ctx (Settings): settings object passed down from gui df (pd.DataFrame): input sample dataframe elution_map (pd.DataFrame | None, optional): optional map of elution plate. Defaults to None. """ @@ -460,7 +466,7 @@ class SampleParser(object): new_list = [] missed_samples = [] for sample in self.samples: - with self.ctx['database_session'].no_autoflush: + with self.ctx.database_session.no_autoflush: instance = lookup_ww_sample_by_ww_sample_num(ctx=self.ctx, sample_number=sample['sample_name']) logger.debug(f"Checking: {sample['sample_name']}") if instance == None: diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index af2ca4b..6a921e9 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -6,7 +6,7 @@ import logging from datetime import date, timedelta import re from typing import Tuple -from configure import jinja_template_loading +from tools import jinja_template_loading logger = logging.getLogger(f"submissions.{__name__}") diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py index b91f0fe..6ec986b 100644 --- a/src/submissions/backend/pydant/__init__.py +++ b/src/submissions/backend/pydant/__init__.py @@ -6,7 +6,7 @@ from tools import RSLNamer from pathlib import Path import re import logging -from tools import check_not_nan, convert_nans_to_nones +from tools import check_not_nan, convert_nans_to_nones, Settings import numpy as np @@ -45,7 +45,7 @@ class PydReagent(BaseModel): class PydSubmission(BaseModel, extra=Extra.allow): - ctx: dict + ctx: Settings filepath: Path submission_type: str|dict|None submitter_plate_num: str|None @@ -62,6 +62,8 @@ class PydSubmission(BaseModel, extra=Extra.allow): @field_validator("submitted_date", mode="before") @classmethod def strip_datetime_string(cls, value): + if not check_not_nan(value): + value = date.today() if isinstance(value, datetime): return value if isinstance(value, date): diff --git a/src/submissions/configure/__init__.py b/src/submissions/configure/__init__.py deleted file mode 100644 index 6ca5051..0000000 --- a/src/submissions/configure/__init__.py +++ /dev/null @@ -1,260 +0,0 @@ -''' -Contains functions for setting up program from config.yml and database. -''' -from jinja2 import Environment, FileSystemLoader -import yaml -import sys, os, stat, platform, getpass -import logging -from logging import handlers -from pathlib import Path -from sqlalchemy.orm import Session -from sqlalchemy import create_engine -from tools import check_if_app - - -logger = logging.getLogger(f"submissions.{__name__}") - -package_dir = Path(__file__).parents[2].resolve() -logger.debug(f"Package dir: {package_dir}") - -if platform.system() == "Windows": - os_config_dir = "AppData/local" - print(f"Got platform Windows, config_dir: {os_config_dir}") -else: - os_config_dir = ".config" - print(f"Got platform other, config_dir: {os_config_dir}") - - -main_aux_dir = Path.home().joinpath(f"{os_config_dir}/submissions") - -CONFIGDIR = main_aux_dir.joinpath("config") -LOGDIR = main_aux_dir.joinpath("logs") - -class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler): - - def doRollover(self): - """ - Override base class method to make the new log file group writable. - """ - # Rotate the file first. - handlers.RotatingFileHandler.doRollover(self) - # Add group write to the current permissions. - currMode = os.stat(self.baseFilename).st_mode - os.chmod(self.baseFilename, currMode | stat.S_IWGRP) - - def _open(self): - prevumask=os.umask(0o002) - rtv=handlers.RotatingFileHandler._open(self) - os.umask(prevumask) - return rtv - - -class StreamToLogger(object): - """ - Fake file-like stream object that redirects writes to a logger instance. - """ - - def __init__(self, logger, log_level=logging.INFO): - self.logger = logger - self.log_level = log_level - self.linebuf = '' - - def write(self, buf): - for line in buf.rstrip().splitlines(): - self.logger.log(self.log_level, line.rstrip()) - - -def get_config(settings_path: Path|str|None=None) -> dict: - """ - Get configuration settings from path or default if blank. - - Args: - settings_path (Path | str | None, optional): Path to config.yml Defaults to None. - - Returns: - dict: Dictionary of settings. - """ - if isinstance(settings_path, str): - settings_path = Path(settings_path) - # custom pyyaml constructor to join fields - def join(loader, node): - seq = loader.construct_sequence(node) - return ''.join([str(i) for i in seq]) - # register the tag handler - yaml.add_constructor('!join', join) - logger.debug(f"Making directory: {CONFIGDIR.__str__()}") - # make directories - try: - CONFIGDIR.mkdir(parents=True) - except FileExistsError: - pass - logger.debug(f"Making directory: {LOGDIR.__str__()}") - try: - LOGDIR.mkdir(parents=True) - except FileExistsError: - pass - # if user hasn't defined config path in cli args - copy_settings_trigger = False - if settings_path == None: - # Check user .config/submissions directory - if CONFIGDIR.joinpath("config.yml").exists(): - settings_path = CONFIGDIR.joinpath("config.yml") - # Check user .submissions directory - elif Path.home().joinpath(".submissions", "config.yml").exists(): - settings_path = Path.home().joinpath(".submissions", "config.yml") - # finally look in the local config - else: - # if getattr(sys, 'frozen', False): - if check_if_app(): - settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml") - else: - settings_path = package_dir.joinpath('config.yml') - # Tell program we need to copy the config.yml to the user directory - copy_settings_trigger = True - else: - # check if user defined path is directory - if settings_path.is_dir(): - settings_path = settings_path.joinpath("config.yml") - # check if user defined path is file - elif settings_path.is_file(): - settings_path = settings_path - else: - logger.error("No config.yml file found. Using empty dictionary.") - return {} - logger.debug(f"Using {settings_path} for config file.") - with open(settings_path, "r") as stream: - try: - settings = yaml.load(stream, Loader=yaml.Loader) - except yaml.YAMLError as exc: - logger.error(f'Error reading yaml file {settings_path}: {exc}') - return {} - # copy settings to config directory - if copy_settings_trigger: - settings = copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=settings) - return settings - - -def create_database_session(database_path: Path|str|None=None) -> Session: - """ - Creates a session to sqlite3 database from path or default database if database_path is blank. - - Args: - database_path (Path | str | None, optional): path to sqlite database. Defaults to None. - - Returns: - Session: database session - """ - # convert string to path object - if isinstance(database_path, str): - database_path = Path(database_path) - # check if database path defined by user - if database_path == None: - # check in user's .submissions directory for submissions.db - if Path.home().joinpath(".submissions", "submissions.db").exists(): - database_path = Path.home().joinpath(".submissions", "submissions.db") - # finally, look in the local dir - else: - database_path = package_dir.joinpath("submissions.db") - else: - # check if user defined path is directory - if database_path.is_dir(): - database_path = database_path.joinpath("submissions.db") - # check if user defined path is a file - elif database_path.is_file(): - database_path = database_path - else: - logger.error("No database file found. Exiting program.") - sys.exit() - logger.debug(f"Using {database_path} for database file.") - engine = create_engine(f"sqlite:///{database_path}") - session = Session(engine) - return session - - -def setup_logger(verbosity:int=3): - """ - Set logger levels using settings. - - Args: - verbosit (int, optional): Level of verbosity desired 3 is highest. Defaults to 3. - - Returns: - logger: logger object - """ - logger = logging.getLogger("submissions") - logger.setLevel(logging.DEBUG) - # create file handler which logs even debug messages - try: - Path(LOGDIR).mkdir(parents=True) - except FileExistsError: - pass - fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False) - # file logging will always be debug - fh.setLevel(logging.DEBUG) - fh.name = "File" - # create console handler with a higher log level - # create custom logger with STERR -> log - ch = logging.StreamHandler(stream=sys.stdout) - # set looging level based on verbosity - match verbosity: - case 3: - ch.setLevel(logging.DEBUG) - case 2: - ch.setLevel(logging.INFO) - case 1: - ch.setLevel(logging.WARNING) - ch.name = "Stream" - # create formatter and add it to the handlers - formatter = logging.Formatter('%(asctime)s - %(levelname)s - {%(pathname)s:%(lineno)d} - %(message)s') - fh.setFormatter(formatter) - ch.setFormatter(formatter) - # add the handlers to the logger - logger.addHandler(fh) - logger.addHandler(ch) - # Output exception and traceback to logger - def handle_exception(exc_type, exc_value, exc_traceback): - if issubclass(exc_type, KeyboardInterrupt): - sys.__excepthook__(exc_type, exc_value, exc_traceback) - return - logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) - sys.excepthook = handle_exception - return logger - -def copy_settings(settings_path:Path, settings:dict) -> dict: - """ - copies relevant settings dictionary from the default config.yml to a new directory - - Args: - settings_path (Path): path to write the file to - settings (dict): settings dictionary obtained from default config.yml - - Returns: - dict: output dictionary for use in first run - """ - # if the current user is not a superuser remove the superusers entry - if not getpass.getuser() in settings['super_users']: - del settings['super_users'] - if not getpass.getuser() in settings['power_users']: - del settings['power_users'] - with open(settings_path, 'w') as f: - yaml.dump(settings, f) - return settings - - -def jinja_template_loading(): - """ - Returns jinja2 template environment. - - Returns: - _type_: _description_ - """ - # determine if pyinstaller launcher is being used - if getattr(sys, 'frozen', False): - loader_path = Path(sys._MEIPASS).joinpath("files", "templates") - else: - loader_path = Path(__file__).parents[1].joinpath('templates').absolute().__str__() - - # jinja template loading - loader = FileSystemLoader(loader_path) - env = Environment(loader=loader) - return env \ No newline at end of file diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py index d7ba45f..e42372b 100644 --- a/src/submissions/frontend/__init__.py +++ b/src/submissions/frontend/__init__.py @@ -31,7 +31,8 @@ class App(QMainWindow): self.ctx = ctx # indicate version and connected database in title bar try: - self.title = f"Submissions App (v{ctx['package'].__version__}) - {ctx['database']}" + # self.title = f"Submissions App (v{ctx['package'].__version__}) - {ctx['database']}" + self.title = f"Submissions App (v{ctx.package.__version__}) - {ctx.database_path}" except (AttributeError, KeyError): self.title = f"Submissions App" # set initial app position and size diff --git a/src/submissions/frontend/all_window_functions.py b/src/submissions/frontend/all_window_functions.py index 59a9149..17fc1aa 100644 --- a/src/submissions/frontend/all_window_functions.py +++ b/src/submissions/frontend/all_window_functions.py @@ -1,3 +1,6 @@ +''' +functions used by all windows in the application's frontend +''' from pathlib import Path import logging from PyQt6.QtWidgets import ( @@ -19,7 +22,8 @@ def select_open_file(obj:QMainWindow, file_extension:str) -> Path: Returns: Path: Path of file to be opened """ - home_dir = str(Path(obj.ctx["directory_path"])) + # home_dir = str(Path(obj.ctx["directory_path"])) + home_dir = str(Path(obj.ctx.directory_path)) fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{file_extension}(*.{file_extension})")[0]) return fname @@ -36,7 +40,8 @@ def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path: Path: Path of file to be opened """ try: - home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() + # home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() + home_dir = Path(obj.ctx.directory_path).joinpath(default_name).resolve().__str__() except FileNotFoundError: home_dir = Path.home().resolve().__str__() fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0]) diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index cea6275..ca5cf8f 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -14,7 +14,7 @@ from tools import check_not_nan from ..all_window_functions import extract_form_info from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, \ lookup_regent_by_type_name, lookup_last_used_reagenttype_lot -from configure import jinja_template_loading +from tools import jinja_template_loading import logging import numpy as np from .pop_ups import AlertPop @@ -300,55 +300,3 @@ class ImportReagent(QComboBox): self.setObjectName(f"lot_{reagent.type}") self.addItems(relevant_reagents) - -# class ImportReagent(QComboBox): - -# def __init__(self, ctx:dict, reagent:dict): -# super().__init__() -# self.setEditable(True) -# # Ensure that all reagenttypes have a name that matches the items in the excel parser -# query_var = reagent['type'] -# logger.debug(f"Import Reagent is looking at: {reagent['lot']} for {reagent['type']}") -# if isinstance(reagent['lot'], np.float64): -# logger.debug(f"{reagent['lot']} is a numpy float!") -# try: -# reagent['lot'] = int(reagent['lot']) -# except ValueError: -# pass -# # query for reagents using type name from sheet and kit from sheet -# logger.debug(f"Attempting lookup of reagents by type: {query_var}") -# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work. -# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)] -# output_reg = [] -# for rel_reagent in relevant_reagents: -# # extract strings from any sets. -# if isinstance(rel_reagent, set): -# for thing in rel_reagent: -# output_reg.append(thing) -# elif isinstance(rel_reagent, str): -# output_reg.append(rel_reagent) -# relevant_reagents = output_reg -# # if reagent in sheet is not found insert it into the front of relevant reagents so it shows -# logger.debug(f"Relevant reagents for {reagent['lot']}: {relevant_reagents}") -# if str(reagent['lot']) not in relevant_reagents: -# if check_not_nan(reagent['lot']): -# relevant_reagents.insert(0, str(reagent['lot'])) -# else: -# # TODO: look up the last used reagent of this type in the database -# looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent['type']) -# logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}") -# if looked_up_reg != None: -# relevant_reagents.remove(str(looked_up_reg.lot)) -# relevant_reagents.insert(0, str(looked_up_reg.lot)) -# else: -# if len(relevant_reagents) > 1: -# logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.") -# idx = relevant_reagents.index(str(reagent['lot'])) -# logger.debug(f"The index we got for {reagent['lot']} in {relevant_reagents} was {idx}") -# moved_reag = relevant_reagents.pop(idx) -# relevant_reagents.insert(0, moved_reag) -# else: -# logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.") -# logger.debug(f"New relevant reagents: {relevant_reagents}") -# self.setObjectName(f"lot_{reagent['type']}") -# self.addItems(relevant_reagents) \ No newline at end of file diff --git a/src/submissions/frontend/custom_widgets/pop_ups.py b/src/submissions/frontend/custom_widgets/pop_ups.py index 74d6a1f..001e4df 100644 --- a/src/submissions/frontend/custom_widgets/pop_ups.py +++ b/src/submissions/frontend/custom_widgets/pop_ups.py @@ -5,7 +5,7 @@ from PyQt6.QtWidgets import ( QLabel, QVBoxLayout, QDialog, QDialogButtonBox, QMessageBox, QComboBox ) -from configure import jinja_template_loading +from tools import jinja_template_loading import logging from backend.db.functions import lookup_kittype_by_use diff --git a/src/submissions/frontend/custom_widgets/sub_details.py b/src/submissions/frontend/custom_widgets/sub_details.py index 1a4d337..a56e107 100644 --- a/src/submissions/frontend/custom_widgets/sub_details.py +++ b/src/submissions/frontend/custom_widgets/sub_details.py @@ -16,7 +16,7 @@ from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel from PyQt6.QtGui import QAction, QCursor, QPixmap, QPainter from backend.db import submissions_to_df, lookup_submission_by_id, delete_submission_by_id, lookup_submission_by_rsl_num, hitpick_plate from backend.excel import make_hitpicks -from configure import jinja_template_loading +from tools import jinja_template_loading from xhtml2pdf import pisa from pathlib import Path import logging diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index 3f41216..312e0c4 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -5,7 +5,6 @@ from datetime import date import difflib from getpass import getuser import inspect -from pathlib import Path import pprint import yaml import json @@ -16,7 +15,7 @@ import pandas as pd from backend.db.models import * import logging from PyQt6.QtWidgets import ( - QMainWindow, QLabel, QWidget, QPushButton, QFileDialog, + QMainWindow, QLabel, QWidget, QPushButton, QLineEdit, QComboBox, QDateEdit ) from .all_window_functions import extract_form_info, select_open_file, select_save_file @@ -25,12 +24,13 @@ from backend.db.functions import ( lookup_all_orgs, lookup_kittype_by_use, lookup_kittype_by_name, construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range, create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type, - lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_ww_sample + lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_ww_sample, + check_kit_integrity ) from backend.excel.parser import SheetParser, PCRParser from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df from backend.pydant import PydReagent -from tools import check_not_nan, check_kit_integrity +from tools import check_not_nan from .custom_widgets.pop_ups import AlertPop, QuestionAsker from .custom_widgets import ReportDatePicker from .custom_widgets.misc import ImportReagent @@ -393,9 +393,8 @@ def generate_report_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: # make dataframe from record dictionaries detailed_df, summary_df = make_report_xlsx(records=records) html = make_report_html(df=summary_df, start_date=info['start_date'], end_date=info['end_date']) - # setup filedialog to handle save location of report - home_dir = Path(obj.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__() - fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter=".pdf")[0]) + # get save location of report + fname = select_save_file(obj=obj, default_name=f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf", extension="pdf") # logger.debug(f"report output name: {fname}") with open(fname, "w+b") as f: pisa.CreatePDF(html, dest=f) @@ -613,13 +612,16 @@ def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: # bcs.control_id.append(control.id) control.submission = bcs control.submission_id = bcs.id - obj.ctx["database_session"].add(control) + # obj.ctx["database_session"].add(control) + obj.ctx.database_session.add(control) count += 1 - obj.ctx["database_session"].add(bcs) + # obj.ctx["database_session"].add(bcs) + obj.ctx.database_session.add(bcs) logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}") result = dict(message=f"We added {count} controls to bacterial cultures.", status="information") logger.debug(result) - obj.ctx['database_session'].commit() + # obj.ctx['database_session'].commit() + obj.ctx.database_session.commit() # msg = QMessageBox() # msg.setText("Controls added") # msg.setInformativeText(result) @@ -687,8 +689,10 @@ def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}") else: sub.extraction_info = json.dumps([new_run]) - obj.ctx['database_session'].add(sub) - obj.ctx["database_session"].commit() + # obj.ctx['database_session'].add(sub) + # obj.ctx["database_session"].commit() + obj.ctx.database_session.add(sub) + obj.ctx.database_session.commit() result = dict(message=f"We added {count} logs to the database.", status='information') return obj, result @@ -750,8 +754,10 @@ def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}") else: sub.pcr_info = json.dumps([new_run]) - obj.ctx['database_session'].add(sub) - obj.ctx["database_session"].commit() + # obj.ctx['database_session'].add(sub) + # obj.ctx["database_session"].commit() + obj.ctx.database_session.add(sub) + obj.ctx.database_session.commit() result = dict(message=f"We added {count} logs to the database.", status='information') return obj, result @@ -801,10 +807,12 @@ def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}") else: sub.pcr_info = json.dumps([parser.pcr]) - obj.ctx['database_session'].add(sub) + # obj.ctx['database_session'].add(sub) + obj.ctx.database_session.add(sub) logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}") logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}") - obj.ctx["database_session"].commit() + # obj.ctx["database_session"].commit() + obj.ctx.database_session.commit() logger.debug(f"Got {len(parser.samples)} samples to update!") logger.debug(f"Parser samples: {parser.samples}") for sample in parser.samples: diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 6ee8b4b..666c071 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -3,17 +3,41 @@ Contains miscellaenous functions used by both frontend and backend. ''' from pathlib import Path import re -import sys import numpy as np import logging -import getpass -from backend.db.models import BasicSubmission, KitType import pandas as pd -from typing import Tuple from datetime import datetime +from jinja2 import Environment, FileSystemLoader +import yaml +import sys, os, stat, platform, getpass +import logging +from logging import handlers +from pathlib import Path +from sqlalchemy.orm import Session +from sqlalchemy import create_engine +from pydantic import field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict +from typing import Any, Tuple +import __init__ as package + logger = logging.getLogger(f"submissions.{__name__}") +package_dir = Path(__file__).parents[2].resolve() +logger.debug(f"Package dir: {package_dir}") + +if platform.system() == "Windows": + os_config_dir = "AppData/local" + print(f"Got platform Windows, config_dir: {os_config_dir}") +else: + os_config_dir = ".config" + print(f"Got platform other, config_dir: {os_config_dir}") + +main_aux_dir = Path.home().joinpath(f"{os_config_dir}/submissions") + +CONFIGDIR = main_aux_dir.joinpath("config") +LOGDIR = main_aux_dir.joinpath("logs") + def check_not_nan(cell_contents) -> bool: """ Check to ensure excel sheet cell contents are not blank. @@ -49,7 +73,6 @@ def check_not_nan(cell_contents) -> bool: logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}") return False - def convert_nans_to_nones(input_str) -> str|None: """ Get rid of various "nan", "NAN", "NaN", etc/ @@ -64,7 +87,6 @@ def convert_nans_to_nones(input_str) -> str|None: return None return input_str - def check_is_power_user(ctx:dict) -> bool: """ Check to ensure current user is in power users list. @@ -76,7 +98,7 @@ def check_is_power_user(ctx:dict) -> bool: bool: True if user is in power users, else false. """ try: - check = getpass.getuser() in ctx['power_users'] + check = getpass.getuser() in ctx.power_users except KeyError as e: check = False except Exception as e: @@ -84,7 +106,6 @@ def check_is_power_user(ctx:dict) -> bool: check = False return check - def create_reagent_list(in_dict:dict) -> list[str]: """ Makes list of reagent types without "lot\_" prefix for each key in a dictionary @@ -97,47 +118,6 @@ def create_reagent_list(in_dict:dict) -> list[str]: """ return [item.strip("lot_") for item in in_dict.keys()] - -def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None) -> dict|None: - """ - Ensures all reagents expected in kit are listed in Submission - - Args: - sub (BasicSubmission | KitType): Object containing complete list of reagent types. - reagenttypes (list | None, optional): List to check against complete list. Defaults to None. - - Returns: - dict|None: Result object containing a message and any missing components. - """ - logger.debug(type(sub)) - # What type is sub? - match sub: - case BasicSubmission(): - # Get all required reagent types for this kit. - ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1] - # Overwrite function parameter reagenttypes - try: - reagenttypes = [reagent.type.name for reagent in sub.reagents] - except AttributeError as e: - logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}") - case KitType(): - ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types if reagenttype.required == 1] - logger.debug(f"Kit reagents: {ext_kit_rtypes}") - logger.debug(f"Submission reagents: {reagenttypes}") - # check if lists are equal - check = set(ext_kit_rtypes) == set(reagenttypes) - logger.debug(f"Checking if reagents match kit contents: {check}") - # what reagent types are in both lists? - missing = list(set(ext_kit_rtypes).difference(reagenttypes)) - logger.debug(f"Missing reagents types: {missing}") - # if lists are equal return no problem - if len(missing)==0: - result = None - else: - result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing} - return result - - def check_if_app(ctx:dict=None) -> bool: """ Checks if the program is running from pyinstaller compiled @@ -153,7 +133,6 @@ def check_if_app(ctx:dict=None) -> bool: else: return False - def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: """ Uses regex to retrieve the plate number and submission type from an input string @@ -173,35 +152,24 @@ def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: parsed = m.group().replace("_", "-") return (parsed, m.lastgroup) - -def format_rsl_number(instr:str) -> str: - """ - Enforces proper formatting on a plate number - Depreciated, replaced by RSLNamer class - - Args: - instr (str): input plate number - - Returns: - str: _description_ - """ - output = instr.upper() - output = output.replace("_", "-") - return output - - def check_regex_match(pattern:str, check:str) -> bool: try: return bool(re.match(fr"{pattern}", check)) except TypeError: return False - +def massage_common_reagents(reagent_name:str): + logger.debug(f"Attempting to massage {reagent_name}") + if reagent_name.endswith("water") or "H2O" in reagent_name.upper(): + reagent_name = "molecular_grade_water" + reagent_name = reagent_name.replace("µ", "u") + return reagent_name + class RSLNamer(object): """ Object that will enforce proper formatting on RSL plate names. """ - def __init__(self, ctx:dict, instr:str): + def __init__(self, ctx, instr:str): self.ctx = ctx self.retrieve_rsl_number(in_str=instr) if self.submission_type != None: @@ -209,29 +177,16 @@ class RSLNamer(object): parser() self.parsed_name = self.parsed_name.replace("_", "-") - def retrieve_rsl_number(self, in_str:str|Path): """ Uses regex to retrieve the plate number and submission type from an input string Args: in_str (str): string to be parsed - - Returns: - Tuple[str, str]: tuple of (output rsl number, submission_type) """ if not isinstance(in_str, Path): in_str = Path(in_str) out_str = in_str.stem - # else: - # in_str = Path(in_str) - # logger.debug(f"Attempting split of {in_str}") - # try: - # out_str = in_str.split("\\")[-1] - # except AttributeError: - # self.parsed_name = None - # self.submission_type = None - # return logger.debug(f"Attempting match of {out_str}") logger.debug(f"The initial plate name is: {out_str}") regex = re.compile(r""" @@ -260,17 +215,18 @@ class RSLNamer(object): raise AttributeError(f"File {in_str.__str__()} has no categories.") else: raise FileNotFoundError() - - def enforce_wastewater(self): """ Uses regex to enforce proper formatting of wastewater samples - """ + """ + def construct(): + today = datetime.now() + return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" try: self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) except AttributeError as e: - self.parsed_name = self.construct_wastewater() + self.parsed_name = construct() self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) @@ -291,73 +247,366 @@ class RSLNamer(object): repeat = "" self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") - - def construct_wastewater(self): - today = datetime.now() - return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" - + def enforce_bacterial_culture(self): """ Uses regex to enforce proper formatting of bacterial culture samples """ + def construct(ctx) -> str: + """ + DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 + + Returns: + str: new RSL number + """ + logger.debug(f"Attempting to construct RSL number from scratch...") + # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") + directory = Path(ctx.directory_path).joinpath("Bacteria") + year = str(datetime.now().year)[-2:] + if directory.exists(): + logger.debug(f"Year: {year}") + relevant_rsls = [] + all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] + logger.debug(f"All rsls: {all_xlsx}") + for item in all_xlsx: + try: + relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) + except Exception as e: + logger.error(f"Regex error: {e}") + continue + logger.debug(f"Initial xlsx: {relevant_rsls}") + max_number = max([int(item[-4:]) for item in relevant_rsls]) + logger.debug(f"The largest sample number is: {max_number}") + return f"RSL-{year}-{str(max_number+1).zfill(4)}" + else: + # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") + return f"RSL-{year}-0000" try: self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) except AttributeError as e: - self.parsed_name = self.construct_bacterial_culture_rsl() + self.parsed_name = construct(ctx=self.ctx) # year = datetime.now().year # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) - - def construct_bacterial_culture_rsl(self) -> str: - """ - DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 - - Returns: - str: new RSL number - """ - logger.debug(f"Attempting to construct RSL number from scratch...") - directory = Path(self.ctx['directory_path']).joinpath("Bacteria") - year = str(datetime.now().year)[-2:] - if directory.exists(): - logger.debug(f"Year: {year}") - relevant_rsls = [] - all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] - logger.debug(f"All rsls: {all_xlsx}") - for item in all_xlsx: - try: - relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) - except Exception as e: - logger.error(f"Regex error: {e}") - continue - logger.debug(f"Initial xlsx: {relevant_rsls}") - max_number = max([int(item[-4:]) for item in relevant_rsls]) - logger.debug(f"The largest sample number is: {max_number}") - return f"RSL-{year}-{str(max_number+1).zfill(4)}" - else: - # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") - return f"RSL-{year}-0000" - - - + def enforce_wastewater_artic(self): """ Uses regex to enforce proper formatting of wastewater samples - """ - self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) + """ + def construct(): + today = datetime.now() + return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + try: + self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) + except AttributeError: + self.parsed_name = construct() try: plate_number = int(re.search(r"_\d?_", self.parsed_name).group().strip("_")) except AttributeError as e: plate_number = 1 self.parsed_name = re.sub(r"(_\d)?_ARTIC", f"-{plate_number}", self.parsed_name) +class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler): -def massage_common_reagents(reagent_name:str): - logger.debug(f"Attempting to massage {reagent_name}") - if reagent_name.endswith("water") or "H2O" in reagent_name.upper(): - reagent_name = "molecular_grade_water" - reagent_name = reagent_name.replace("µ", "u") - return reagent_name + def doRollover(self): + """ + Override base class method to make the new log file group writable. + """ + # Rotate the file first. + handlers.RotatingFileHandler.doRollover(self) + # Add group write to the current permissions. + currMode = os.stat(self.baseFilename).st_mode + os.chmod(self.baseFilename, currMode | stat.S_IWGRP) + + def _open(self): + prevumask=os.umask(0o002) + rtv=handlers.RotatingFileHandler._open(self) + os.umask(prevumask) + return rtv + +class StreamToLogger(object): + """ + Fake file-like stream object that redirects writes to a logger instance. + """ + + def __init__(self, logger, log_level=logging.INFO): + self.logger = logger + self.log_level = log_level + self.linebuf = '' + + def write(self, buf): + for line in buf.rstrip().splitlines(): + self.logger.log(self.log_level, line.rstrip()) + +class Settings(BaseSettings): + """ + Pydantic model to hold settings + + Raises: + FileNotFoundError: _description_ + + """ + directory_path: Path + database_path: Path|None = None + backup_path: Path + super_users: list + power_users: list + rerun_regex: str + submission_types: dict + database_session: Session|None = None + package: Any|None = None + + model_config = SettingsConfigDict(env_file_encoding='utf-8') + + @field_validator('directory_path', mode="before") + @classmethod + def ensure_directory_exists(cls, value): + if isinstance(value, str): + value = Path(value) + if value.exists(): + return value + else: + raise FileNotFoundError(f"Couldn't find settings file {value}") + @field_validator('database_path', mode="before") + @classmethod + def ensure_database_exists(cls, value): + if isinstance(value, str): + value = Path(value) + if value.exists(): + return value + else: + raise FileNotFoundError(f"Couldn't find database at {value}") + + @field_validator('database_session', mode="before") + @classmethod + def create_database_session(cls, value, values): + if value != None: + return value + else: + database_path = values.data['database_path'] + if database_path == None: + # check in user's .submissions directory for submissions.db + if Path.home().joinpath(".submissions", "submissions.db").exists(): + database_path = Path.home().joinpath(".submissions", "submissions.db") + # finally, look in the local dir + else: + database_path = package_dir.joinpath("submissions.db") + else: + # check if user defined path is directory + if database_path.is_dir(): + database_path = database_path.joinpath("submissions.db") + # check if user defined path is a file + elif database_path.is_file(): + database_path = database_path + else: + raise FileNotFoundError("No database file found. Exiting program.") + # sys.exit() + logger.debug(f"Using {database_path} for database file.") + engine = create_engine(f"sqlite:///{database_path}") + session = Session(engine) + return session + @field_validator('package', mode="before") + @classmethod + def import_package(cls, value): + if value == None: + return package + +def get_config(settings_path: Path|str|None=None) -> dict: + """ + Get configuration settings from path or default if blank. + + Args: + settings_path (Path | str | None, optional): Path to config.yml Defaults to None. + + Returns: + Settings: Pydantic settings object + """ + if isinstance(settings_path, str): + settings_path = Path(settings_path) + # custom pyyaml constructor to join fields + def join(loader, node): + seq = loader.construct_sequence(node) + return ''.join([str(i) for i in seq]) + # register the tag handler + yaml.add_constructor('!join', join) + logger.debug(f"Making directory: {CONFIGDIR.__str__()}") + # make directories + try: + CONFIGDIR.mkdir(parents=True) + except FileExistsError: + pass + logger.debug(f"Making directory: {LOGDIR.__str__()}") + try: + LOGDIR.mkdir(parents=True) + except FileExistsError: + pass + # if user hasn't defined config path in cli args + copy_settings_trigger = False + if settings_path == None: + # Check user .config/submissions directory + if CONFIGDIR.joinpath("config.yml").exists(): + settings_path = CONFIGDIR.joinpath("config.yml") + # Check user .submissions directory + elif Path.home().joinpath(".submissions", "config.yml").exists(): + settings_path = Path.home().joinpath(".submissions", "config.yml") + # finally look in the local config + else: + # if getattr(sys, 'frozen', False): + if check_if_app(): + settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml") + else: + settings_path = package_dir.joinpath('config.yml') + # Tell program we need to copy the config.yml to the user directory + # copy_settings_trigger = True + # copy settings to config directory + return Settings(**copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=settings)) + else: + # check if user defined path is directory + if settings_path.is_dir(): + settings_path = settings_path.joinpath("config.yml") + # check if user defined path is file + elif settings_path.is_file(): + settings_path = settings_path + else: + logger.error("No config.yml file found. Cannot continue.") + raise FileNotFoundError("No config.yml file found. Cannot continue.") + return {} + logger.debug(f"Using {settings_path} for config file.") + with open(settings_path, "r") as stream: + # try: + settings = yaml.load(stream, Loader=yaml.Loader) + # except yaml.YAMLError as exc: + # logger.error(f'Error reading yaml file {settings_path}: {exc}' + # return {} + # copy settings to config directory + # if copy_settings_trigger: + # settings = copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=settings) + return Settings(**settings) + +def create_database_session(database_path: Path|str|None=None) -> Session: + """ + Creates a session to sqlite3 database from path or default database if database_path is blank. + DEPRECIATED: THIS IS NOW HANDLED BY THE PYDANTIC SETTINGS OBJECT. + + Args: + database_path (Path | str | None, optional): path to sqlite database. Defaults to None. + + Returns: + Session: database session + """ + # convert string to path object + if isinstance(database_path, str): + database_path = Path(database_path) + # check if database path defined by user + if database_path == None: + # check in user's .submissions directory for submissions.db + if Path.home().joinpath(".submissions", "submissions.db").exists(): + database_path = Path.home().joinpath(".submissions", "submissions.db") + # finally, look in the local dir + else: + database_path = package_dir.joinpath("submissions.db") + else: + # check if user defined path is directory + if database_path.is_dir(): + database_path = database_path.joinpath("submissions.db") + # check if user defined path is a file + elif database_path.is_file(): + database_path = database_path + else: + logger.error("No database file found. Exiting program.") + sys.exit() + logger.debug(f"Using {database_path} for database file.") + engine = create_engine(f"sqlite:///{database_path}") + session = Session(engine) + return session + +def setup_logger(verbosity:int=3): + """ + Set logger levels using settings. + + Args: + verbosit (int, optional): Level of verbosity desired 3 is highest. Defaults to 3. + + Returns: + logger: logger object + """ + logger = logging.getLogger("submissions") + logger.setLevel(logging.DEBUG) + # create file handler which logs even debug messages + try: + Path(LOGDIR).mkdir(parents=True) + except FileExistsError: + pass + fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False) + # file logging will always be debug + fh.setLevel(logging.DEBUG) + fh.name = "File" + # create console handler with a higher log level + # create custom logger with STERR -> log + ch = logging.StreamHandler(stream=sys.stdout) + # set looging level based on verbosity + match verbosity: + case 3: + ch.setLevel(logging.DEBUG) + case 2: + ch.setLevel(logging.INFO) + case 1: + ch.setLevel(logging.WARNING) + ch.name = "Stream" + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(levelname)s - {%(pathname)s:%(lineno)d} - %(message)s') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + # add the handlers to the logger + logger.addHandler(fh) + logger.addHandler(ch) + # Output exception and traceback to logger + def handle_exception(exc_type, exc_value, exc_traceback): + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) + sys.excepthook = handle_exception + return logger + +def copy_settings(settings_path:Path, settings:dict) -> dict: + """ + copies relevant settings dictionary from the default config.yml to a new directory + + Args: + settings_path (Path): path to write the file to + settings (dict): settings dictionary obtained from default config.yml + + Returns: + dict: output dictionary for use in first run + """ + # if the current user is not a superuser remove the superusers entry + if not getpass.getuser() in settings['super_users']: + del settings['super_users'] + if not getpass.getuser() in settings['power_users']: + del settings['power_users'] + with open(settings_path, 'w') as f: + yaml.dump(settings, f) + return settings + +def jinja_template_loading(): + """ + Returns jinja2 template environment. + + Returns: + _type_: _description_ + """ + # determine if pyinstaller launcher is being used + if check_if_app(): + loader_path = Path(sys._MEIPASS).joinpath("files", "templates") + else: + loader_path = Path(__file__).parents[1].joinpath('templates').absolute().__str__() + + # jinja template loading + loader = FileSystemLoader(loader_path) + env = Environment(loader=loader) + return env \ No newline at end of file