Pre-removal of constructors module.

This commit is contained in:
Landon Wark
2023-10-23 09:36:57 -05:00
parent 39b94405e5
commit 4b1f88f1d0
16 changed files with 751 additions and 457 deletions

View File

@@ -1,4 +1,8 @@
- [ ] Validate form data using pydantic. - [x] Create custom store methods for submission, reagent and sample.
- [x] Make pydantic models for other things that use constructors.
- [x] Move backend.db.functions.constructor functions into Pydantic models.
- This will allow for better data validation.
- Parser -> Pydantic(validation) -> Form(user input) -> Pydantic(validation) -> SQL
- [x] Rebuild RSLNamer and fix circular imports - [x] Rebuild RSLNamer and fix circular imports
- Should be used when coming in to parser and when leaving form. NO OTHER PLACES. - Should be used when coming in to parser and when leaving form. NO OTHER PLACES.
- [x] Change 'check_is_power_user' to decorator. - [x] Change 'check_is_power_user' to decorator.

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package # Version of the realpython-reader package
__project__ = "submissions" __project__ = "submissions"
__version__ = "202310.2b" __version__ = "202310.4b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada" __copyright__ = "2022-2023, Government of Canada"

View File

@@ -87,5 +87,5 @@ def store_object(ctx:Settings, object) -> dict|None:
return None return None
from .lookups import * from .lookups import *
from .constructions import * # from .constructions import *
from .misc import * from .misc import *

View File

@@ -11,266 +11,270 @@ from dateutil.parser import parse
from typing import Tuple from typing import Tuple
from sqlalchemy.exc import IntegrityError, SAWarning from sqlalchemy.exc import IntegrityError, SAWarning
from . import store_object from . import store_object
from backend.validators import RSLNamer
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
def construct_reagent(ctx:Settings, info_dict:dict) -> models.Reagent: # def construct_reagent(ctx:Settings, info_dict:dict) -> models.Reagent:
""" # """
Construct reagent object from dictionary # Construct reagent object from dictionary
# NOTE: Depreciated in favour of Pydantic model .toSQL method
Args: # Args:
ctx (Settings): settings object passed down from gui # ctx (Settings): settings object passed down from gui
info_dict (dict): dictionary to be converted # info_dict (dict): dictionary to be converted
Returns: # Returns:
models.Reagent: Constructed reagent object # models.Reagent: Constructed reagent object
""" # """
reagent = models.Reagent() # reagent = models.Reagent()
for item in info_dict: # for item in info_dict:
logger.debug(f"Reagent info item for {item}: {info_dict[item]}") # logger.debug(f"Reagent info item for {item}: {info_dict[item]}")
# set fields based on keys in dictionary # # set fields based on keys in dictionary
match item: # match item:
case "lot": # case "lot":
reagent.lot = info_dict[item].upper() # reagent.lot = info_dict[item].upper()
case "expiry": # case "expiry":
if isinstance(info_dict[item], date): # if isinstance(info_dict[item], date):
reagent.expiry = info_dict[item] # reagent.expiry = info_dict[item]
else: # else:
reagent.expiry = parse(info_dict[item]).date() # reagent.expiry = parse(info_dict[item]).date()
case "type": # case "type":
reagent_type = lookup_reagent_types(ctx=ctx, name=info_dict[item]) # reagent_type = lookup_reagent_types(ctx=ctx, name=info_dict[item])
if reagent_type != None: # if reagent_type != None:
reagent.type.append(reagent_type) # reagent.type.append(reagent_type)
case "name": # case "name":
if item == None: # if item == None:
reagent.name = reagent.type.name # reagent.name = reagent.type.name
else: # else:
reagent.name = info_dict[item] # reagent.name = info_dict[item]
# add end-of-life extension from reagent type to expiry date # # add end-of-life extension from reagent type to expiry date
# NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions # # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
return reagent # return reagent
def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.BasicSubmission, dict]: # def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.BasicSubmission, dict]:
""" # """
Construct submission object from dictionary pulled from gui form # Construct submission object from dictionary pulled from gui form
# NOTE: Depreciated in favour of Pydantic model .toSQL method
Args: # Args:
ctx (Settings): settings object passed down from gui # ctx (Settings): settings object passed down from gui
info_dict (dict): dictionary to be transformed # info_dict (dict): dictionary to be transformed
Returns: # Returns:
models.BasicSubmission: Constructed submission object # models.BasicSubmission: Constructed submission object
""" # """
# convert submission type into model name # # convert submission type into model name
# model = get_polymorphic_subclass(polymorphic_identity=info_dict['submission_type']) # # model = get_polymorphic_subclass(polymorphic_identity=info_dict['submission_type'])
model = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=info_dict['submission_type']) # model = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=info_dict['submission_type'])
logger.debug(f"We've got the model: {type(model)}") # logger.debug(f"We've got the model: {type(model)}")
# Ensure an rsl plate number exists for the plate # # Ensure an rsl plate number exists for the plate
if not check_regex_match("^RSL", info_dict["rsl_plate_num"]): # if not check_regex_match("^RSL", info_dict["rsl_plate_num"]):
instance = None # instance = None
msg = "A proper RSL plate number is required." # msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."} # return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
# else: # else:
# # enforce conventions on the rsl plate number from the form # # # enforce conventions on the rsl plate number from the form
# # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name # # # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name
# info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"], sub_type=info_dict['submission_type']).parsed_name # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"], sub_type=info_dict['submission_type']).parsed_name
# check database for existing object # # check database for existing object
instance = lookup_submissions(ctx=ctx, rsl_number=info_dict['rsl_plate_num']) # instance = lookup_submissions(ctx=ctx, rsl_number=info_dict['rsl_plate_num'])
# get model based on submission type converted above # # get model based on submission type converted above
# logger.debug(f"Looking at models for submission type: {query}") # # logger.debug(f"Looking at models for submission type: {query}")
# if query return nothing, ie doesn't already exist in db # # if query return nothing, ie doesn't already exist in db
if instance == None: # if instance == None:
instance = model() # instance = model()
logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}") # logger.debug(f"Submission doesn't exist yet, creating new instance: {instance}")
msg = None # msg = None
code = 0 # code = 0
else: # else:
code = 1 # code = 1
msg = "This submission already exists.\nWould you like to overwrite?" # msg = "This submission already exists.\nWould you like to overwrite?"
for item in info_dict: # for item in info_dict:
value = info_dict[item] # value = info_dict[item]
logger.debug(f"Setting {item} to {value}") # logger.debug(f"Setting {item} to {value}")
# set fields based on keys in dictionary # # set fields based on keys in dictionary
match item: # match item:
case "extraction_kit": # case "extraction_kit":
logger.debug(f"Looking up kit {value}") # logger.debug(f"Looking up kit {value}")
field_value = lookup_kit_types(ctx=ctx, name=value) # field_value = lookup_kit_types(ctx=ctx, name=value)
logger.debug(f"Got {field_value} for kit {value}") # logger.debug(f"Got {field_value} for kit {value}")
case "submitting_lab": # case "submitting_lab":
logger.debug(f"Looking up organization: {value}") # logger.debug(f"Looking up organization: {value}")
field_value = lookup_organizations(ctx=ctx, name=value) # field_value = lookup_organizations(ctx=ctx, name=value)
logger.debug(f"Got {field_value} for organization {value}") # logger.debug(f"Got {field_value} for organization {value}")
case "submitter_plate_num": # case "submitter_plate_num":
logger.debug(f"Submitter plate id: {value}") # logger.debug(f"Submitter plate id: {value}")
field_value = value # field_value = value
case "samples": # case "samples":
instance = construct_samples(ctx=ctx, instance=instance, samples=value) # instance = construct_samples(ctx=ctx, instance=instance, samples=value)
continue # continue
case "submission_type": # case "submission_type":
field_value = lookup_submission_type(ctx=ctx, name=value) # field_value = lookup_submission_type(ctx=ctx, name=value)
case _: # case _:
field_value = value # field_value = value
# insert into field # # insert into field
try: # try:
setattr(instance, item, field_value) # setattr(instance, item, field_value)
except AttributeError: # except AttributeError:
logger.debug(f"Could not set attribute: {item} to {info_dict[item]}") # logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
continue # continue
except KeyError: # except KeyError:
continue # continue
# calculate cost of the run: immutable cost + mutable times number of columns # # calculate cost of the run: immutable cost + mutable times number of columns
# This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future. # # This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
try: # try:
logger.debug(f"Calculating costs for procedure...") # logger.debug(f"Calculating costs for procedure...")
instance.calculate_base_cost() # instance.calculate_base_cost()
except (TypeError, AttributeError) as e: # except (TypeError, AttributeError) as e:
logger.debug(f"Looks like that kit doesn't have cost breakdown yet due to: {e}, using full plate cost.") # logger.debug(f"Looks like that kit doesn't have cost breakdown yet due to: {e}, using full plate cost.")
instance.run_cost = instance.extraction_kit.cost_per_run # instance.run_cost = instance.extraction_kit.cost_per_run
logger.debug(f"Calculated base run cost of: {instance.run_cost}") # logger.debug(f"Calculated base run cost of: {instance.run_cost}")
# Apply any discounts that are applicable for client and kit. # # Apply any discounts that are applicable for client and kit.
try: # try:
logger.debug("Checking and applying discounts...") # logger.debug("Checking and applying discounts...")
discounts = [item.amount for item in lookup_discounts(ctx=ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)] # discounts = [item.amount for item in lookup_discounts(ctx=ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)]
logger.debug(f"We got discounts: {discounts}") # logger.debug(f"We got discounts: {discounts}")
if len(discounts) > 0: # if len(discounts) > 0:
discounts = sum(discounts) # discounts = sum(discounts)
instance.run_cost = instance.run_cost - discounts # instance.run_cost = instance.run_cost - discounts
except Exception as e: # except Exception as e:
logger.error(f"An unknown exception occurred when calculating discounts: {e}") # logger.error(f"An unknown exception occurred when calculating discounts: {e}")
# We need to make sure there's a proper rsl plate number # # We need to make sure there's a proper rsl plate number
logger.debug(f"We've got a total cost of {instance.run_cost}") # logger.debug(f"We've got a total cost of {instance.run_cost}")
try: # try:
logger.debug(f"Constructed instance: {instance.to_string()}") # logger.debug(f"Constructed instance: {instance.to_string()}")
except AttributeError as e: # except AttributeError as e:
logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}") # logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
logger.debug(f"Constructed submissions message: {msg}") # logger.debug(f"Constructed submissions message: {msg}")
return instance, {'code':code, 'message':msg} # return instance, {'code':code, 'message':msg}
def construct_samples(ctx:Settings, instance:models.BasicSubmission, samples:List[dict]) -> models.BasicSubmission: # def construct_samples(ctx:Settings, instance:models.BasicSubmission, samples:List[dict]) -> models.BasicSubmission:
""" # """
constructs sample objects and adds to submission # constructs sample objects and adds to submission
# NOTE: Depreciated in favour of Pydantic model .toSQL method
Args: # Args:
ctx (Settings): settings passed down from gui # ctx (Settings): settings passed down from gui
instance (models.BasicSubmission): Submission samples scraped from. # instance (models.BasicSubmission): Submission samples scraped from.
samples (List[dict]): List of parsed samples # samples (List[dict]): List of parsed samples
Returns: # Returns:
models.BasicSubmission: Updated submission object. # models.BasicSubmission: Updated submission object.
""" # """
for sample in samples: # for sample in samples:
sample_instance = lookup_samples(ctx=ctx, submitter_id=str(sample['sample'].submitter_id)) # sample_instance = lookup_samples(ctx=ctx, submitter_id=str(sample['sample'].submitter_id))
if sample_instance == None: # if sample_instance == None:
sample_instance = sample['sample'] # sample_instance = sample['sample']
else: # else:
logger.warning(f"Sample {sample} already exists, creating association.") # logger.warning(f"Sample {sample} already exists, creating association.")
logger.debug(f"Adding {sample_instance.__dict__}") # logger.debug(f"Adding {sample_instance.__dict__}")
if sample_instance in instance.samples: # if sample_instance in instance.samples:
logger.error(f"Looks like there's a duplicate sample on this plate: {sample_instance.submitter_id}!") # logger.error(f"Looks like there's a duplicate sample on this plate: {sample_instance.submitter_id}!")
continue # continue
try: # try:
with ctx.database_session.no_autoflush: # with ctx.database_session.no_autoflush:
try: # try:
sample_query = sample_instance.sample_type.replace('Sample', '').strip() # sample_query = sample_instance.sample_type.replace('Sample', '').strip()
logger.debug(f"Here is the sample instance type: {sample_instance}") # logger.debug(f"Here is the sample instance type: {sample_instance}")
try: # try:
assoc = getattr(models, f"{sample_query}Association") # assoc = getattr(models, f"{sample_query}Association")
except AttributeError as e: # except AttributeError as e:
logger.error(f"Couldn't get type specific association using {sample_instance.sample_type.replace('Sample', '').strip()}. Getting generic.") # logger.error(f"Couldn't get type specific association using {sample_instance.sample_type.replace('Sample', '').strip()}. Getting generic.")
assoc = models.SubmissionSampleAssociation # assoc = models.SubmissionSampleAssociation
assoc = assoc(submission=instance, sample=sample_instance, row=sample['row'], column=sample['column']) # assoc = assoc(submission=instance, sample=sample_instance, row=sample['row'], column=sample['column'])
instance.submission_sample_associations.append(assoc) # instance.submission_sample_associations.append(assoc)
except IntegrityError: # except IntegrityError:
logger.error(f"Hit integrity error for: {sample}") # logger.error(f"Hit integrity error for: {sample}")
continue # continue
except SAWarning: # except SAWarning:
logger.error(f"Looks like the association already exists for submission: {instance} and sample: {sample_instance}") # logger.error(f"Looks like the association already exists for submission: {instance} and sample: {sample_instance}")
continue # continue
except IntegrityError as e: # except IntegrityError as e:
logger.critical(e) # logger.critical(e)
continue # continue
return instance # return instance
@check_authorization # @check_authorization
def construct_kit_from_yaml(ctx:Settings, kit_dict:dict) -> dict: # def construct_kit_from_yaml(ctx:Settings, kit_dict:dict) -> dict:
""" # """
Create and store a new kit in the database based on a .yml file # Create and store a new kit in the database based on a .yml file
TODO: split into create and store functions # TODO: split into create and store functions
Args: # Args:
ctx (Settings): Context object passed down from frontend # ctx (Settings): Context object passed down from frontend
kit_dict (dict): Experiment dictionary created from yaml file # kit_dict (dict): Experiment dictionary created from yaml file
Returns: # Returns:
dict: a dictionary containing results of db addition # dict: a dictionary containing results of db addition
""" # """
# from tools import check_is_power_user, massage_common_reagents # # from tools import check_is_power_user, massage_common_reagents
# Don't want just anyone adding kits # # Don't want just anyone adding kits
# if not check_is_power_user(ctx=ctx): # # if not check_is_power_user(ctx=ctx):
# logger.debug(f"{getuser()} does not have permission to add kits.") # # logger.debug(f"{getuser()} does not have permission to add kits.")
# return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"} # # return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
submission_type = lookup_submission_type(ctx=ctx, name=kit_dict['used_for']) # submission_type = lookup_submission_type(ctx=ctx, name=kit_dict['used_for'])
logger.debug(f"Looked up submission type: {kit_dict['used_for']} and got {submission_type}") # logger.debug(f"Looked up submission type: {kit_dict['used_for']} and got {submission_type}")
kit = models.KitType(name=kit_dict["kit_name"]) # kit = models.KitType(name=kit_dict["kit_name"])
kt_st_assoc = models.SubmissionTypeKitTypeAssociation(kit_type=kit, submission_type=submission_type) # kt_st_assoc = models.SubmissionTypeKitTypeAssociation(kit_type=kit, submission_type=submission_type)
for k,v in kit_dict.items(): # for k,v in kit_dict.items():
if k not in ["reagent_types", "kit_name", "used_for"]: # if k not in ["reagent_types", "kit_name", "used_for"]:
kt_st_assoc.set_attrib(k, v) # kt_st_assoc.set_attrib(k, v)
kit.kit_submissiontype_associations.append(kt_st_assoc) # kit.kit_submissiontype_associations.append(kt_st_assoc)
# A kit contains multiple reagent types. # # A kit contains multiple reagent types.
for r in kit_dict['reagent_types']: # for r in kit_dict['reagent_types']:
logger.debug(f"Constructing reagent type: {r}") # logger.debug(f"Constructing reagent type: {r}")
rtname = massage_common_reagents(r['rtname']) # rtname = massage_common_reagents(r['rtname'])
look_up = lookup_reagent_types(name=rtname) # look_up = lookup_reagent_types(name=rtname)
if look_up == None: # if look_up == None:
rt = models.ReagentType(name=rtname.strip(), eol_ext=timedelta(30*r['eol'])) # rt = models.ReagentType(name=rtname.strip(), eol_ext=timedelta(30*r['eol']))
else: # else:
rt = look_up # rt = look_up
uses = {kit_dict['used_for']:{k:v for k,v in r.items() if k not in ['eol']}} # uses = {kit_dict['used_for']:{k:v for k,v in r.items() if k not in ['eol']}}
assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses=uses) # assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses=uses)
# ctx.database_session.add(rt) # # ctx.database_session.add(rt)
store_object(ctx=ctx, object=rt) # store_object(ctx=ctx, object=rt)
kit.kit_reagenttype_associations.append(assoc) # kit.kit_reagenttype_associations.append(assoc)
logger.debug(f"Kit construction reagent type: {rt.__dict__}") # logger.debug(f"Kit construction reagent type: {rt.__dict__}")
logger.debug(f"Kit construction kit: {kit.__dict__}") # logger.debug(f"Kit construction kit: {kit.__dict__}")
store_object(ctx=ctx, object=kit) # store_object(ctx=ctx, object=kit)
return {'code':0, 'message':'Kit has been added', 'status': 'information'} # return {'code':0, 'message':'Kit has been added', 'status': 'information'}
@check_authorization # @check_authorization
def construct_org_from_yaml(ctx:Settings, org:dict) -> dict: # def construct_org_from_yaml(ctx:Settings, org:dict) -> dict:
""" # """
Create and store a new organization based on a .yml file # Create and store a new organization based on a .yml file
Args: # Args:
ctx (Settings): Context object passed down from frontend # ctx (Settings): Context object passed down from frontend
org (dict): Dictionary containing organization info. # org (dict): Dictionary containing organization info.
Returns: # Returns:
dict: dictionary containing results of db addition # dict: dictionary containing results of db addition
""" # """
# from tools import check_is_power_user # # from tools import check_is_power_user
# # Don't want just anyone adding in clients # # # Don't want just anyone adding in clients
# if not check_is_power_user(ctx=ctx): # # if not check_is_power_user(ctx=ctx):
# logger.debug(f"{getuser()} does not have permission to add kits.") # # logger.debug(f"{getuser()} does not have permission to add kits.")
# return {'code':1, 'message':"This user does not have permission to add organizations."} # # return {'code':1, 'message':"This user does not have permission to add organizations."}
# the yml can contain multiple clients # # the yml can contain multiple clients
for client in org: # for client in org:
cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre']) # cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
# a client can contain multiple contacts # # a client can contain multiple contacts
for contact in org[client]['contacts']: # for contact in org[client]['contacts']:
cont_name = list(contact.keys())[0] # cont_name = list(contact.keys())[0]
# check if contact already exists # # check if contact already exists
look_up = ctx.database_session.query(models.Contact).filter(models.Contact.name==cont_name).first() # look_up = ctx.database_session.query(models.Contact).filter(models.Contact.name==cont_name).first()
if look_up == None: # if look_up == None:
cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org]) # cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
else: # else:
cli_cont = look_up # cli_cont = look_up
cli_cont.organization.append(cli_org) # cli_cont.organization.append(cli_org)
ctx.database_session.add(cli_cont) # ctx.database_session.add(cli_cont)
logger.debug(f"Client creation contact: {cli_cont.__dict__}") # logger.debug(f"Client creation contact: {cli_cont.__dict__}")
logger.debug(f"Client creation client: {cli_org.__dict__}") # logger.debug(f"Client creation client: {cli_org.__dict__}")
ctx.database_session.add(cli_org) # ctx.database_session.add(cli_org)
ctx.database_session.commit() # ctx.database_session.commit()
return {"code":0, "message":"Organization has been added."} # return {"code":0, "message":"Organization has been added."}

View File

@@ -135,7 +135,13 @@ def lookup_reagent_types(ctx:Settings,
reagent = lookup_reagents(ctx=ctx, lot_number=reagent) reagent = lookup_reagents(ctx=ctx, lot_number=reagent)
case _: case _:
pass pass
return list(set(kit_type.reagent_types).intersection(reagent.type))[0] assert reagent.type != []
logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}")
result = list(set(kit_type.reagent_types).intersection(reagent.type))
logger.debug(f"Result: {result}")
return result[0]
match name: match name:
case str(): case str():
logger.debug(f"Looking up reagent type by name: {name}") logger.debug(f"Looking up reagent type by name: {name}")
@@ -420,6 +426,8 @@ def lookup_reagenttype_kittype_association(ctx:Settings,
def lookup_submission_sample_association(ctx:Settings, def lookup_submission_sample_association(ctx:Settings,
submission:models.BasicSubmission|str|None=None, submission:models.BasicSubmission|str|None=None,
sample:models.BasicSample|str|None=None, sample:models.BasicSample|str|None=None,
row:int=0,
column:int=0,
limit:int=0, limit:int=0,
chronologic:bool=False chronologic:bool=False
) -> models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]: ) -> models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]:
@@ -438,10 +446,14 @@ def lookup_submission_sample_association(ctx:Settings,
query = query.join(models.BasicSample).filter(models.BasicSample.submitter_id==sample) query = query.join(models.BasicSample).filter(models.BasicSample.submitter_id==sample)
case _: case _:
pass pass
if row > 0:
query = query.filter(models.SubmissionSampleAssociation.row==row)
if column > 0:
query = query.filter(models.SubmissionSampleAssociation.column==column)
logger.debug(f"Query count: {query.count()}") logger.debug(f"Query count: {query.count()}")
if chronologic: if chronologic:
query.join(models.BasicSubmission).order_by(models.BasicSubmission.submitted_date) query.join(models.BasicSubmission).order_by(models.BasicSubmission.submitted_date)
if query.count() == 1: if query.count() <= 1:
limit = 1 limit = 1
return query_return(query=query, limit=limit) return query_return(query=query, limit=limit)

View File

@@ -100,6 +100,7 @@ def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType):
kit (models.KitType): kit to be used for lookup kit (models.KitType): kit to be used for lookup
""" """
# rt = list(set(reagent.type).intersection(kit.reagent_types))[0] # rt = list(set(reagent.type).intersection(kit.reagent_types))[0]
logger.debug(f"Attempting update of reagent type at intersection of ({reagent}), ({kit})")
rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent) rt = lookup_reagent_types(ctx=ctx, kit_type=kit, reagent=reagent)
if rt != None: if rt != None:
assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt) assoc = lookup_reagenttype_kittype_association(ctx=ctx, kit_type=kit, reagent_type=rt)

View File

@@ -1,43 +1,42 @@
''' '''
Contains all models for sqlalchemy Contains all models for sqlalchemy
''' '''
from typing import Any
from sqlalchemy.orm import declarative_base, DeclarativeMeta from sqlalchemy.orm import declarative_base, DeclarativeMeta
import logging import logging
from pprint import pformat
Base: DeclarativeMeta = declarative_base() Base: DeclarativeMeta = declarative_base()
metadata = Base.metadata metadata = Base.metadata
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
def find_subclasses(parent:Any, attrs:dict|None=None, rsl_number:str|None=None) -> Any: # def find_subclasses(parent:Any, attrs:dict|None=None, rsl_number:str|None=None) -> Any:
""" # """
Finds subclasses of a parent that does contain all # Finds subclasses of a parent that does contain all
attributes if the parent does not. # attributes if the parent does not.
# NOTE: Depreciated, moved to classmethods in individual base models.
Args: # Args:
parent (_type_): Parent class. # parent (_type_): Parent class.
attrs (dict): Key:Value dictionary of attributes # attrs (dict): Key:Value dictionary of attributes
Raises: # Raises:
AttributeError: Raised if no subclass is found. # AttributeError: Raised if no subclass is found.
Returns: # Returns:
_type_: Parent or subclass. # _type_: Parent or subclass.
""" # """
if len(attrs) == 0 or attrs == None: # if len(attrs) == 0 or attrs == None:
return parent # return parent
if any([not hasattr(parent, attr) for attr in attrs]): # if any([not hasattr(parent, attr) for attr in attrs]):
# looks for first model that has all included kwargs # # looks for first model that has all included kwargs
try: # try:
model = [subclass for subclass in parent.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] # model = [subclass for subclass in parent.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0]
except IndexError as e: # except IndexError as e:
raise AttributeError(f"Couldn't find existing class/subclass of {parent} with all attributes:\n{pformat(attrs)}") # raise AttributeError(f"Couldn't find existing class/subclass of {parent} with all attributes:\n{pformat(attrs)}")
else: # else:
model = parent # model = parent
logger.debug(f"Using model: {model}") # logger.debug(f"Using model: {model}")
return model # return model
from .controls import Control, ControlType from .controls import Control, ControlType
from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation

View File

@@ -31,7 +31,8 @@ class KitType(Base):
# association proxy of "user_keyword_associations" collection # association proxy of "user_keyword_associations" collection
# to "keyword" attribute # to "keyword" attribute
reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type") # creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type", creator=lambda RT: KitTypeReagentTypeAssociation(reagent_type=RT))
kit_submissiontype_associations = relationship( kit_submissiontype_associations = relationship(
"SubmissionTypeKitTypeAssociation", "SubmissionTypeKitTypeAssociation",
@@ -118,7 +119,8 @@ class ReagentType(Base):
# association proxy of "user_keyword_associations" collection # association proxy of "user_keyword_associations" collection
# to "keyword" attribute # to "keyword" attribute
kit_types = association_proxy("reagenttype_kit_associations", "kit_type") # creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
kit_types = association_proxy("reagenttype_kit_associations", "kit_type", creator=lambda kit: KitTypeReagentTypeAssociation(kit_type=kit))
def __str__(self) -> str: def __str__(self) -> str:
""" """
@@ -150,6 +152,7 @@ class KitTypeReagentTypeAssociation(Base):
reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations") reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations")
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1): def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
logger.debug(f"Parameters: Kit={kit_type}, RT={reagent_type}, Uses={uses}, Required={required}")
self.kit_type = kit_type self.kit_type = kit_type
self.reagent_type = reagent_type self.reagent_type = reagent_type
self.uses = uses self.uses = uses
@@ -186,9 +189,9 @@ class Reagent(Base):
def __repr__(self): def __repr__(self):
if self.name != None: if self.name != None:
return f"Reagent({self.name}-{self.lot})" return f"<Reagent({self.name}-{self.lot})>"
else: else:
return f"Reagent({self.type.name}-{self.lot})" return f"<Reagent({self.type.name}-{self.lot})>"
def __str__(self) -> str: def __str__(self) -> str:

View File

@@ -33,6 +33,13 @@ class Organization(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<Organization({self.name})>" return f"<Organization({self.name})>"
def save(self, ctx):
ctx.database_session.add(self)
ctx.database_session.commit()
def set_attribute(self, name:str, value):
setattr(self, name, value)
class Contact(Base): class Contact(Base):
""" """

View File

@@ -13,7 +13,6 @@ from json.decoder import JSONDecodeError
from math import ceil from math import ceil
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
import uuid import uuid
from pandas import Timestamp
from dateutil.parser import parse from dateutil.parser import parse
import re import re
import pandas as pd import pandas as pd
@@ -301,6 +300,7 @@ class BasicSubmission(Base):
@classmethod @classmethod
def enforce_name(cls, ctx:Settings, instr:str) -> str: def enforce_name(cls, ctx:Settings, instr:str) -> str:
logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!")
logger.debug(f"Attempting enforcement on {instr}")
return instr return instr
@classmethod @classmethod
@@ -344,6 +344,11 @@ class BasicSubmission(Base):
logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!")
return [] return []
def save(self, ctx:Settings):
self.uploaded_by = getuser()
ctx.database_session.add(self)
ctx.database_session.commit()
# Below are the custom submission types # Below are the custom submission types
class BacterialCulture(BasicSubmission): class BacterialCulture(BasicSubmission):
@@ -536,6 +541,8 @@ class Wastewater(BasicSubmission):
def construct(): def construct():
today = datetime.now() today = datetime.now()
return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
if outstr == None:
outstr = construct()
try: try:
outstr = re.sub(r"PCR(-|_)", "", outstr) outstr = re.sub(r"PCR(-|_)", "", outstr)
except AttributeError as e: except AttributeError as e:
@@ -743,6 +750,11 @@ class BasicSample(Base):
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
return cls return cls
@classmethod
def parse_sample(cls, input_dict:dict) -> dict:
logger.debug(f"Called {cls.__name__} sample parser")
return input_dict
class WastewaterSample(BasicSample): class WastewaterSample(BasicSample):
""" """
Derivative wastewater sample Derivative wastewater sample
@@ -757,51 +769,51 @@ class WastewaterSample(BasicSample):
__mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"}
@validates("collected-date") # @validates("collected-date")
def convert_cdate_time(self, key, value): # def convert_cdate_time(self, key, value):
logger.debug(f"Validating {key}: {value}") # logger.debug(f"Validating {key}: {value}")
if isinstance(value, Timestamp): # if isinstance(value, Timestamp):
return value.date() # return value.date()
if isinstance(value, str): # if isinstance(value, str):
return parse(value) # return parse(value)
return value # return value
@validates("rsl_number") # @validates("rsl_number")
def use_submitter_id(self, key, value): # def use_submitter_id(self, key, value):
logger.debug(f"Validating {key}: {value}") # logger.debug(f"Validating {key}: {value}")
return value or self.submitter_id # return value or self.submitter_id
def set_attribute(self, name:str, value): # def set_attribute(self, name:str, value):
""" # """
Set an attribute of this object. Extends parent. # Set an attribute of this object. Extends parent.
Args: # Args:
name (str): name of the attribute # name (str): name of the attribute
value (_type_): value to be set # value (_type_): value to be set
""" # """
# Due to the plate map being populated with RSL numbers, we have to do some shuffling. # # Due to the plate map being populated with RSL numbers, we have to do some shuffling.
match name: # match name:
case "submitter_id": # case "submitter_id":
# If submitter_id already has a value, stop # # If submitter_id already has a value, stop
if self.submitter_id != None: # if self.submitter_id != None:
return # return
# otherwise also set rsl_number to the same value # # otherwise also set rsl_number to the same value
else: # else:
super().set_attribute("rsl_number", value) # super().set_attribute("rsl_number", value)
case "ww_full_sample_id": # case "ww_full_sample_id":
# If value present, set ww_full_sample_id and make this the submitter_id # # If value present, set ww_full_sample_id and make this the submitter_id
if value != None: # if value != None:
super().set_attribute(name, value) # super().set_attribute(name, value)
name = "submitter_id" # name = "submitter_id"
case 'collection_date': # case 'collection_date':
# If this is a string use dateutils to parse into date() # # If this is a string use dateutils to parse into date()
if isinstance(value, str): # if isinstance(value, str):
logger.debug(f"collection_date {value} is a string. Attempting parse...") # logger.debug(f"collection_date {value} is a string. Attempting parse...")
value = parse(value) # value = parse(value)
case "rsl_number": # case "rsl_number":
if value == None: # if value == None:
value = self.submitter_id # value = self.submitter_id
super().set_attribute(name, value) # super().set_attribute(name, value)
def to_hitpick(self, submission_rsl:str) -> dict|None: def to_hitpick(self, submission_rsl:str) -> dict|None:
""" """
@@ -832,6 +844,16 @@ class WastewaterSample(BasicSample):
except IndexError: except IndexError:
return None return None
@classmethod
def parse_sample(cls, input_dict: dict) -> dict:
output_dict = super().parse_sample(input_dict)
if output_dict['rsl_number'] == None:
output_dict['rsl_number'] = output_dict['submitter_id']
if output_dict['ww_full_sample_id'] != None:
output_dict["submitter_id"] = output_dict['ww_full_sample_id']
return output_dict
class BacterialCultureSample(BasicSample): class BacterialCultureSample(BasicSample):
""" """
base of bacterial culture sample base of bacterial culture sample
@@ -873,7 +895,7 @@ class SubmissionSampleAssociation(Base):
# Refers to the type of parent. # Refers to the type of parent.
# Hooooooo boy, polymorphic association type, now we're getting into the weeds! # Hooooooo boy, polymorphic association type, now we're getting into the weeds!
__mapper_args__ = { __mapper_args__ = {
"polymorphic_identity": "basic_association", "polymorphic_identity": "Basic Association",
"polymorphic_on": base_sub_type, "polymorphic_on": base_sub_type,
"with_polymorphic": "*", "with_polymorphic": "*",
} }
@@ -887,6 +909,19 @@ class SubmissionSampleAssociation(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<SubmissionSampleAssociation({self.submission.rsl_plate_num} & {self.sample.submitter_id})" return f"<SubmissionSampleAssociation({self.submission.rsl_plate_num} & {self.sample.submitter_id})"
@classmethod
def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None):
if isinstance(polymorphic_identity, dict):
polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity == None:
return cls
else:
try:
return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0]
except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
return cls
class WastewaterAssociation(SubmissionSampleAssociation): class WastewaterAssociation(SubmissionSampleAssociation):
""" """
Derivative custom Wastewater/Submission Association... fancy. Derivative custom Wastewater/Submission Association... fancy.
@@ -897,5 +932,5 @@ class WastewaterAssociation(SubmissionSampleAssociation):
n2_status = Column(String(32)) #: positive or negative for N2 n2_status = Column(String(32)) #: positive or negative for N2
pcr_results = Column(JSON) #: imported PCR status from QuantStudio pcr_results = Column(JSON) #: imported PCR status from QuantStudio
__mapper_args__ = {"polymorphic_identity": "wastewater", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "Wastewater Association", "polymorphic_load": "inline"}

View File

@@ -5,9 +5,10 @@ from getpass import getuser
import pprint import pprint
from typing import List from typing import List
import pandas as pd import pandas as pd
import numpy as np
from pathlib import Path from pathlib import Path
from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples
from backend.validators import PydSheetSubmission, PydSheetReagent, RSLNamer from backend.validators import PydSubmission, PydReagent, RSLNamer, PydSample
import logging import logging
from collections import OrderedDict from collections import OrderedDict
import re import re
@@ -113,7 +114,7 @@ class SheetParser(object):
logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}") logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}")
self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents] self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents]
def to_pydantic(self) -> PydSheetSubmission: def to_pydantic(self) -> PydSubmission:
""" """
Generates a pydantic model of scraped data for validation Generates a pydantic model of scraped data for validation
@@ -121,8 +122,8 @@ class SheetParser(object):
PydSubmission: output pydantic model PydSubmission: output pydantic model
""" """
logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pprint.pformat(self.sub)}") logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pprint.pformat(self.sub)}")
psm = PydSheetSubmission(ctx=self.ctx, filepath=self.filepath, **self.sub) psm = PydSubmission(ctx=self.ctx, filepath=self.filepath, **self.sub)
delattr(psm, "filepath") # delattr(psm, "filepath")
return psm return psm
class InfoParser(object): class InfoParser(object):
@@ -218,6 +219,7 @@ class ReagentParser(object):
listo = [] listo = []
for sheet in self.xl.sheet_names: for sheet in self.xl.sheet_names:
df = self.xl.parse(sheet, header=None, dtype=object) df = self.xl.parse(sheet, header=None, dtype=object)
df.replace({np.nan: None}, inplace = True)
relevant = {k.strip():v for k,v in self.map.items() if sheet in self.map[k]['sheet']} relevant = {k.strip():v for k,v in self.map.items() if sheet in self.map[k]['sheet']}
logger.debug(f"relevant map for {sheet}: {pprint.pformat(relevant)}") logger.debug(f"relevant map for {sheet}: {pprint.pformat(relevant)}")
if relevant == {}: if relevant == {}:
@@ -229,15 +231,16 @@ class ReagentParser(object):
lot = df.iat[relevant[item]['lot']['row']-1, relevant[item]['lot']['column']-1] lot = df.iat[relevant[item]['lot']['row']-1, relevant[item]['lot']['column']-1]
expiry = df.iat[relevant[item]['expiry']['row']-1, relevant[item]['expiry']['column']-1] expiry = df.iat[relevant[item]['expiry']['row']-1, relevant[item]['expiry']['column']-1]
except (KeyError, IndexError): except (KeyError, IndexError):
listo.append(dict(value=PydSheetReagent(type=item.strip(), lot=None, exp=None, name=None), parsed=False)) listo.append(dict(value=PydReagent(ctx=self.ctx, type=item.strip(), lot=None, exp=None, name=None), parsed=False))
continue continue
if check_not_nan(lot): if check_not_nan(lot):
parsed = True parsed = True
else: else:
parsed = False parsed = False
logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}") # logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}")
lot = str(lot) lot = str(lot)
listo.append(dict(value=PydSheetReagent(type=item.strip(), lot=lot, exp=expiry, name=name), parsed=parsed)) logger.debug(f"Going into pydantic: name: {name}, lot: {lot}, expiry: {expiry}, type: {item.strip()}")
listo.append(dict(value=PydReagent(ctx=self.ctx, type=item.strip(), lot=lot, exp=expiry, name=name), parsed=parsed))
logger.debug(f"Returning listo: {listo}") logger.debug(f"Returning listo: {listo}")
return listo return listo
@@ -284,7 +287,8 @@ class SampleParser(object):
logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}") logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}")
sample_info_map = submission_type.info_map['samples'] sample_info_map = submission_type.info_map['samples']
# self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples
self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_samples self.custom_sub_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_samples
self.custom_sample_parser = models.BasicSample.find_polymorphic_subclass(polymorphic_identity=f"{submission_type.name} Sample").parse_sample
return sample_info_map return sample_info_map
def construct_plate_map(self, plate_map_location:dict) -> pd.DataFrame: def construct_plate_map(self, plate_map_location:dict) -> pd.DataFrame:
@@ -361,9 +365,13 @@ class SampleParser(object):
else: else:
return input_str return input_str
for sample in self.samples: for sample in self.samples:
addition = self.lookup_table[self.lookup_table.isin([sample['submitter_id']]).any(axis=1)].squeeze().to_dict() # addition = self.lookup_table[self.lookup_table.isin([sample['submitter_id']]).any(axis=1)].squeeze().to_dict()
logger.debug(f"Lookuptable info: {addition}") addition = self.lookup_table[self.lookup_table.isin([sample['submitter_id']]).any(axis=1)].squeeze()
for k,v in addition.items(): logger.debug(addition)
if isinstance(addition, pd.DataFrame) and not addition.empty:
addition = addition.iloc[0]
logger.debug(f"Lookuptable info: {addition.to_dict()}")
for k,v in addition.to_dict().items():
# logger.debug(f"Checking {k} in lookup table.") # logger.debug(f"Checking {k} in lookup table.")
if check_not_nan(k) and isinstance(k, str): if check_not_nan(k) and isinstance(k, str):
if k.lower() not in sample: if k.lower() not in sample:
@@ -376,7 +384,13 @@ class SampleParser(object):
sample[k] = determine_if_date(v) sample[k] = determine_if_date(v)
case _: case _:
sample[k] = v sample[k] = v
# Set row in lookup table to blank values to prevent multipe lookups.
try:
self.lookup_table.loc[self.lookup_table['Sample #']==addition['Sample #']] = np.nan
except ValueError:
pass
logger.debug(f"Output sample dict: {sample}") logger.debug(f"Output sample dict: {sample}")
logger.debug(f"Final lookup_table: \n\n {self.lookup_table}")
def parse_samples(self, generate:bool=True) -> List[dict]|List[models.BasicSample]: def parse_samples(self, generate:bool=True) -> List[dict]|List[models.BasicSample]:
""" """
@@ -391,11 +405,11 @@ class SampleParser(object):
result = None result = None
new_samples = [] new_samples = []
for ii, sample in enumerate(self.samples): for ii, sample in enumerate(self.samples):
try: # try:
if sample['submitter_id'] in [check_sample['sample'].submitter_id for check_sample in new_samples]: # if sample['submitter_id'] in [check_sample['sample'].submitter_id for check_sample in new_samples]:
sample['submitter_id'] = f"{sample['submitter_id']}-{ii}" # sample['submitter_id'] = f"{sample['submitter_id']}-{ii}"
except KeyError as e: # except KeyError as e:
logger.error(f"Sample obj: {sample}, error: {e}") # logger.error(f"Sample obj: {sample}, error: {e}")
translated_dict = {} translated_dict = {}
for k, v in sample.items(): for k, v in sample.items():
match v: match v:
@@ -410,11 +424,14 @@ class SampleParser(object):
except KeyError: except KeyError:
translated_dict[k] = convert_nans_to_nones(v) translated_dict[k] = convert_nans_to_nones(v)
translated_dict['sample_type'] = f"{self.submission_type} Sample" translated_dict['sample_type'] = f"{self.submission_type} Sample"
translated_dict = self.custom_parser(translated_dict) translated_dict = self.custom_sub_parser(translated_dict)
if generate: translated_dict = self.custom_sample_parser(translated_dict)
new_samples.append(self.generate_sample_object(translated_dict)) logger.debug(f"Here is the output of the custom parser: \n\n{translated_dict}\n\n")
else: # if generate:
new_samples.append(translated_dict) # new_samples.append(self.generate_sample_object(translated_dict))
# else:
# new_samples.append(translated_dict)
new_samples.append(PydSample(**translated_dict))
return result, new_samples return result, new_samples
def generate_sample_object(self, input_dict) -> models.BasicSample: def generate_sample_object(self, input_dict) -> models.BasicSample:

View File

@@ -3,22 +3,28 @@ Contains pydantic models and accompanying validators
''' '''
import uuid import uuid
from pydantic import BaseModel, field_validator, Field from pydantic import BaseModel, field_validator, Field
from datetime import date, datetime from datetime import date, datetime, timedelta
from dateutil.parser import parse from dateutil.parser import parse
from dateutil.parser._parser import ParserError from dateutil.parser._parser import ParserError
from typing import List, Any from typing import List, Any, Tuple
from . import RSLNamer from . import RSLNamer
from pathlib import Path from pathlib import Path
import re import re
import logging import logging
from tools import check_not_nan, convert_nans_to_nones, Settings from tools import check_not_nan, convert_nans_to_nones, Settings
from backend.db.functions import lookup_submissions from backend.db.functions import (lookup_submissions, lookup_reagent_types, lookup_reagents, lookup_kit_types,
lookup_organizations, lookup_submission_type, lookup_discounts, lookup_samples, lookup_submission_sample_association,
lookup_reagenttype_kittype_association
)
from backend.db.models import *
from sqlalchemy.exc import InvalidRequestError, StatementError
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class PydSheetReagent(BaseModel): class PydReagent(BaseModel):
type: str|None ctx: Settings
lot: str|None lot: str|None
type: str|None
exp: date|None exp: date|None
name: str|None name: str|None
@@ -31,6 +37,16 @@ class PydSheetReagent(BaseModel):
case _: case _:
return value return value
@field_validator("type")
@classmethod
def rescue_type_with_lookup(cls, value, values):
if value == None and values.data['lot'] != None:
try:
return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name
except AttributeError:
return value
return value
@field_validator("lot", mode='before') @field_validator("lot", mode='before')
@classmethod @classmethod
def rescue_lot_string(cls, value): def rescue_lot_string(cls, value):
@@ -70,7 +86,81 @@ class PydSheetReagent(BaseModel):
else: else:
return values.data['type'] return values.data['type']
class PydSheetSubmission(BaseModel, extra='allow'): def toSQL(self):# -> Tuple[Reagent, dict]:
result = None
logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}")
reagent = lookup_reagents(ctx=self.ctx, lot_number=self.lot)
logger.debug(f"Result: {reagent}")
if reagent == None:
reagent = Reagent()
for key, value in self.__dict__.items():
if isinstance(value, dict):
value = value['value']
logger.debug(f"Reagent info item for {key}: {value}")
# set fields based on keys in dictionary
match key:
case "lot":
reagent.lot = value.upper()
case "expiry":
reagent.expiry = value
case "type":
reagent_type = lookup_reagent_types(ctx=self.ctx, name=value)
if reagent_type != None:
reagent.type.append(reagent_type)
case "name":
reagent.name = value
# add end-of-life extension from reagent type to expiry date
# NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
return reagent, result
class PydSample(BaseModel, extra='allow'):
submitter_id: str
sample_type: str
row: int|List[int]|None
column: int|List[int]|None
@field_validator("row", "column")
@classmethod
def row_int_to_list(cls, value):
if isinstance(value, int):
return [value]
return value
# @field_validator(column)
# @classmethod
# def column_int_to_list(cls, value):
# if isinstance(value, int):
# return [value]
# return value
def toSQL(self, ctx:Settings, submission):
result = None
self.__dict__.update(self.model_extra)
logger.debug(f"Here is the incoming sample dict: \n{self.__dict__}")
instance = lookup_samples(ctx=ctx, submitter_id=self.submitter_id)
if instance == None:
logger.debug(f"Sample {self.submitter_id} doesn't exist yet. Looking up sample object with polymorphic identity: {self.sample_type}")
instance = BasicSample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)()
for key, value in self.__dict__.items():
# logger.debug(f"Setting sample field {key} to {value}")
match key:
case "row" | "column":
continue
case _:
instance.set_attribute(name=key, value=value)
for row, column in zip(self.row, self.column):
logger.debug(f"Looking up association with identity: ({submission.submission_type_name} Association)")
association = lookup_submission_sample_association(ctx=ctx, submission=submission, row=row, column=column)
logger.debug(f"Returned association: {association}")
if association == None or association == []:
logger.debug(f"Looked up association at row {row}, column {column} didn't exist, creating new association.")
association = SubmissionSampleAssociation.find_polymorphic_subclass(polymorphic_identity=f"{submission.submission_type_name} Association")
association = association(submission=submission, sample=instance, row=row, column=column)
instance.sample_submission_associations.append(association)
return instance, result
class PydSubmission(BaseModel, extra='allow'):
ctx: Settings ctx: Settings
filepath: Path filepath: Path
submission_type: dict|None submission_type: dict|None
@@ -83,7 +173,7 @@ class PydSheetSubmission(BaseModel, extra='allow'):
extraction_kit: dict|None extraction_kit: dict|None
technician: dict|None technician: dict|None
submission_category: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True) submission_category: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True)
reagents: List[dict] = [] reagents: List[dict]|List[PydReagent] = []
samples: List[Any] samples: List[Any]
@field_validator("submitter_plate_num") @field_validator("submitter_plate_num")
@@ -211,3 +301,165 @@ class PydSheetSubmission(BaseModel, extra='allow'):
if value['value'] not in ["Research", "Diagnostic", "Surveillance"]: if value['value'] not in ["Research", "Diagnostic", "Surveillance"]:
value['value'] = values.data['submission_type']['value'] value['value'] = values.data['submission_type']['value']
return value return value
def toSQL(self):
code = 0
msg = None
self.__dict__.update(self.model_extra)
instance = lookup_submissions(ctx=self.ctx, rsl_number=self.rsl_plate_num['value'])
if instance == None:
instance = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)()
else:
code = 1
msg = "This submission already exists.\nWould you like to overwrite?"
self.handle_duplicate_samples()
logger.debug(f"Here's our list of duplicate removed samples: {self.samples}")
for key, value in self.__dict__.items():
if isinstance(value, dict):
value = value['value']
logger.debug(f"Setting {key} to {value}")
# set fields based on keys in dictionary
match key:
case "extraction_kit":
logger.debug(f"Looking up kit {value}")
field_value = lookup_kit_types(ctx=self.ctx, name=value)
logger.debug(f"Got {field_value} for kit {value}")
case "submitting_lab":
logger.debug(f"Looking up organization: {value}")
field_value = lookup_organizations(ctx=self.ctx, name=value)
logger.debug(f"Got {field_value} for organization {value}")
case "submitter_plate_num":
logger.debug(f"Submitter plate id: {value}")
field_value = value
case "samples":
# instance = construct_samples(ctx=ctx, instance=instance, samples=value)
for sample in value:
# logger.debug(f"Parsing {sample} to sql.")
sample, _ = sample.toSQL(ctx=self.ctx, submission=instance)
# instance.samples.append(sample)
continue
case "reagents":
field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for reagent in value]
case "submission_type":
field_value = lookup_submission_type(ctx=self.ctx, name=value)
case "ctx" | "csv" | "filepath":
continue
case _:
field_value = value
# insert into field
try:
setattr(instance, key, field_value)
except AttributeError as e:
logger.debug(f"Could not set attribute: {key} to {value} due to: \n\n {e}")
continue
except KeyError:
continue
try:
logger.debug(f"Calculating costs for procedure...")
instance.calculate_base_cost()
except (TypeError, AttributeError) as e:
logger.debug(f"Looks like that kit doesn't have cost breakdown yet due to: {e}, using full plate cost.")
instance.run_cost = instance.extraction_kit.cost_per_run
logger.debug(f"Calculated base run cost of: {instance.run_cost}")
# Apply any discounts that are applicable for client and kit.
try:
logger.debug("Checking and applying discounts...")
discounts = [item.amount for item in lookup_discounts(ctx=self.ctx, kit_type=instance.extraction_kit, organization=instance.submitting_lab)]
logger.debug(f"We got discounts: {discounts}")
if len(discounts) > 0:
discounts = sum(discounts)
instance.run_cost = instance.run_cost - discounts
except Exception as e:
logger.error(f"An unknown exception occurred when calculating discounts: {e}")
# We need to make sure there's a proper rsl plate number
logger.debug(f"We've got a total cost of {instance.run_cost}")
try:
logger.debug(f"Constructed instance: {instance.to_string()}")
except AttributeError as e:
logger.debug(f"Something went wrong constructing instance {self.rsl_plate_num}: {e}")
logger.debug(f"Constructed submissions message: {msg}")
return instance, {'code':code, 'message':msg}
def handle_duplicate_samples(self):
submitter_ids = list(set([sample.submitter_id for sample in self.samples]))
output = []
for id in submitter_ids:
relevants = [item for item in self.samples if item.submitter_id==id]
if len(relevants) <= 1:
output += relevants
else:
rows = [item.row[0] for item in relevants]
columns = [item.column[0] for item in relevants]
dummy = relevants[0]
dummy.row = rows
dummy.column = columns
output.append(dummy)
self.samples = output
class PydContact(BaseModel):
name: str
phone: str|None
email: str|None
def toSQL(self, ctx):
return Contact(name=self.name, phone=self.phone, email=self.email)
class PydOrganization(BaseModel):
name: str
cost_centre: str
contacts: List[PydContact]|None
def toSQL(self, ctx):
instance = Organization()
for field in self.model_fields:
match field:
case "contacts":
value = [item.toSQL(ctx) for item in getattr(self, field)]
case _:
value = getattr(self, field)
instance.set_attribute(name=field, value=value)
return instance
class PydReagentType(BaseModel):
name: str
eol_ext: timedelta|int|None
uses: dict|None
required: int|None = Field(default=1)
@field_validator("eol_ext")
@classmethod
def int_to_timedelta(cls, value):
if isinstance(value, int):
return timedelta(days=value)
return value
def toSQL(self, ctx:Settings, kit:KitType):
instance: ReagentType = lookup_reagent_types(ctx=ctx, name=self.name)
if instance == None:
instance = ReagentType(name=self.name, eol_ext=self.eol_ext)
logger.debug(f"This is the reagent type instance: {instance.__dict__}")
try:
assoc = lookup_reagenttype_kittype_association(ctx=ctx, reagent_type=instance, kit_type=kit)
except StatementError:
assoc = None
if assoc == None:
assoc = KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=instance, uses=self.uses, required=self.required)
kit.kit_reagenttype_associations.append(assoc)
return instance
class PydKit(BaseModel):
name: str
reagent_types: List[PydReagentType]|None
def toSQL(self, ctx):
instance = lookup_kit_types(ctx=ctx, name=self.name)
if instance == None:
instance = KitType(name=self.name)
instance.reagent_types = [item.toSQL(ctx, instance) for item in self.reagent_types]
return instance

View File

@@ -15,8 +15,9 @@ from PyQt6.QtGui import QAction
from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWebEngineWidgets import QWebEngineView
from pathlib import Path from pathlib import Path
from backend.db import ( from backend.db import (
construct_reagent, store_object, lookup_control_types, lookup_modes store_object, lookup_control_types, lookup_modes, #construct_reagent
) )
from backend.validators import PydSubmission, PydReagent
from tools import check_if_app, Settings from tools import check_if_app, Settings
from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ImportReagent from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker, ImportReagent
import logging import logging
@@ -220,10 +221,11 @@ class App(QMainWindow):
info = dlg.parse_form() info = dlg.parse_form()
logger.debug(f"Reagent info: {info}") logger.debug(f"Reagent info: {info}")
# create reagent object # create reagent object
reagent = construct_reagent(ctx=self.ctx, info_dict=info) # reagent = construct_reagent(ctx=self.ctx, info_dict=info)
reagent = PydReagent(ctx=self.ctx, **info)
# send reagent to db # send reagent to db
# store_reagent(ctx=self.ctx, reagent=reagent) # store_reagent(ctx=self.ctx, reagent=reagent)
result = store_object(ctx=self.ctx, object=reagent) result = store_object(ctx=self.ctx, object=reagent.toSQL()[0])
self.result_reporter(result=result) self.result_reporter(result=result)
return reagent return reagent
@@ -322,7 +324,7 @@ class AddSubForm(QWidget):
logger.debug(f"Initializating subform...") logger.debug(f"Initializating subform...")
super(QWidget, self).__init__(parent) super(QWidget, self).__init__(parent)
self.layout = QVBoxLayout(self) self.layout = QVBoxLayout(self)
self.parent = parent
# Initialize tab screen # Initialize tab screen
self.tabs = QTabWidget() self.tabs = QTabWidget()
self.tab1 = QWidget() self.tab1 = QWidget()
@@ -396,6 +398,7 @@ class SubmissionFormWidget(QWidget):
def __init__(self, parent: QWidget) -> None: def __init__(self, parent: QWidget) -> None:
logger.debug(f"Setting form widget...") logger.debug(f"Setting form widget...")
super().__init__(parent) super().__init__(parent)
self.parent = parent
self.ignore = [None, "", "qt_spinbox_lineedit", "qt_scrollarea_viewport", "qt_scrollarea_hcontainer", self.ignore = [None, "", "qt_spinbox_lineedit", "qt_scrollarea_viewport", "qt_scrollarea_hcontainer",
"qt_scrollarea_vcontainer", "submit_btn" "qt_scrollarea_vcontainer", "submit_btn"
] ]
@@ -411,23 +414,26 @@ class SubmissionFormWidget(QWidget):
fname = Path([u.toLocalFile() for u in event.mimeData().urls()][0]) fname = Path([u.toLocalFile() for u in event.mimeData().urls()][0])
self.import_drag.emit(fname) self.import_drag.emit(fname)
def parse_form(self) -> Tuple[dict, list]: def parse_form(self) -> PydSubmission:
logger.debug(f"Hello from parser!") logger.debug(f"Hello from form parser!")
info = {} info = {}
reagents = [] reagents = []
samples = self.parent.parent.samples
logger.debug(f"Using samples: {pformat(samples)}")
widgets = [widget for widget in self.findChildren(QWidget) if widget.objectName() not in self.ignore] widgets = [widget for widget in self.findChildren(QWidget) if widget.objectName() not in self.ignore]
for widget in widgets: for widget in widgets:
logger.debug(f"Parsed widget: {widget.objectName()} of type {type(widget)}") logger.debug(f"Parsed widget: {widget.objectName()} of type {type(widget)}")
match widget: match widget:
case ImportReagent(): case ImportReagent():
reagents.append(dict(name=widget.objectName().replace("lot_", ""), lot=widget.currentText())) reagent = dict(name=widget.objectName().replace("lot_", ""), lot=widget.currentText(), type=None, exp=None)
reagents.append(PydReagent(ctx=self.parent.parent.ctx, **reagent))
case QLineEdit(): case QLineEdit():
info[widget.objectName()] = widget.text() info[widget.objectName()] = dict(value=widget.text())
case QComboBox(): case QComboBox():
info[widget.objectName()] = widget.currentText() info[widget.objectName()] = dict(value=widget.currentText())
case QDateEdit(): case QDateEdit():
info[widget.objectName()] = widget.date().toPyDate() info[widget.objectName()] = dict(value=widget.date().toPyDate())
logger.debug(f"Info: {pformat(info)}") logger.debug(f"Info: {pformat(info)}")
logger.debug(f"Reagents: {pformat(reagents)}") logger.debug(f"Reagents: {pformat(reagents)}")
return info, reagents submission = PydSubmission(ctx=self.parent.parent.ctx, filepath=self.parent.parent.current_file, reagents=reagents, samples=samples, **info)
return submission

View File

@@ -13,15 +13,15 @@ from PyQt6.QtWidgets import (
) )
from PyQt6.QtCore import Qt, QDate, QSize from PyQt6.QtCore import Qt, QDate, QSize
from tools import check_not_nan, jinja_template_loading, Settings from tools import check_not_nan, jinja_template_loading, Settings
from backend.db.functions import construct_kit_from_yaml, \ from backend.db.functions import \
lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \ lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \
lookup_submissions lookup_submissions#, construct_kit_from_yaml
from backend.db.models import SubmissionTypeKitTypeAssociation from backend.db.models import SubmissionTypeKitTypeAssociation
from sqlalchemy import FLOAT, INTEGER from sqlalchemy import FLOAT, INTEGER
import logging import logging
import numpy as np import numpy as np
from .pop_ups import AlertPop from .pop_ups import AlertPop
from backend.validators import PydSheetReagent from backend.validators import PydReagent
from typing import Tuple from typing import Tuple
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -92,7 +92,7 @@ class AddReagentForm(QDialog):
def parse_form(self): def parse_form(self):
return dict(name=self.name_input.currentText(), return dict(name=self.name_input.currentText(),
lot=self.lot_input.text(), lot=self.lot_input.text(),
expiry=self.exp_input.date().toPyDate(), exp=self.exp_input.date().toPyDate(),
type=self.type_input.currentText()) type=self.type_input.currentText())
def update_names(self): def update_names(self):
@@ -386,11 +386,11 @@ class ControlsDatePicker(QWidget):
class ImportReagent(QComboBox): class ImportReagent(QComboBox):
def __init__(self, ctx:Settings, reagent:dict|PydSheetReagent, extraction_kit:str): def __init__(self, ctx:Settings, reagent:dict|PydReagent, extraction_kit:str):
super().__init__() super().__init__()
self.setEditable(True) self.setEditable(True)
if isinstance(reagent, dict): if isinstance(reagent, dict):
reagent = PydSheetReagent(**reagent) reagent = PydReagent(ctx=ctx, **reagent)
# Ensure that all reagenttypes have a name that matches the items in the excel parser # Ensure that all reagenttypes have a name that matches the items in the excel parser
query_var = reagent.type query_var = reagent.type
logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}") logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}")

View File

@@ -24,12 +24,14 @@ from .all_window_functions import select_open_file, select_save_file
from PyQt6.QtCore import QSignalBlocker from PyQt6.QtCore import QSignalBlocker
from backend.db.models import BasicSubmission from backend.db.models import BasicSubmission
from backend.db.functions import ( from backend.db.functions import (
construct_submission_info, lookup_reagents, construct_kit_from_yaml, construct_org_from_yaml, get_control_subtypes, lookup_reagents, get_control_subtypes,
update_subsampassoc_with_pcr, check_kit_integrity, update_last_used, lookup_organizations, lookup_kit_types, update_subsampassoc_with_pcr, check_kit_integrity, update_last_used, lookup_organizations, lookup_kit_types,
lookup_submissions, lookup_controls, lookup_samples, lookup_submission_sample_association, store_object, lookup_submission_type, lookup_submissions, lookup_controls, lookup_samples, lookup_submission_sample_association, store_object, lookup_submission_type,
#construct_submission_info, construct_kit_from_yaml, construct_org_from_yaml
) )
from backend.excel.parser import SheetParser, PCRParser, SampleParser from backend.excel.parser import SheetParser, PCRParser, SampleParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
from backend.validators import PydSubmission, PydSample, PydReagent
from tools import check_not_nan, convert_well_to_row_column from tools import check_not_nan, convert_well_to_row_column
from .custom_widgets.pop_ups import AlertPop, QuestionAsker from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker from .custom_widgets import ReportDatePicker
@@ -79,6 +81,7 @@ def import_submission_function(obj:QMainWindow, fname:Path|None=None) -> Tuple[Q
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None) item.setParent(None)
obj.current_submission_type = pyd.submission_type['value'] obj.current_submission_type = pyd.submission_type['value']
obj.current_file = pyd.filepath
# Get list of fields from pydantic model. # Get list of fields from pydantic model.
fields = list(pyd.model_fields.keys()) + list(pyd.model_extra.keys()) fields = list(pyd.model_fields.keys()) + list(pyd.model_extra.keys())
fields.remove('filepath') fields.remove('filepath')
@@ -97,7 +100,6 @@ def import_submission_function(obj:QMainWindow, fname:Path|None=None) -> Tuple[Q
logger.debug(f"{field}: {value['value']}") logger.debug(f"{field}: {value['value']}")
# create combobox to hold looked up submitting labs # create combobox to hold looked up submitting labs
add_widget = QComboBox() add_widget = QComboBox()
# labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)] labs = [item.__str__() for item in lookup_organizations(ctx=obj.ctx)]
# try to set closest match to top of list # try to set closest match to top of list
try: try:
@@ -149,12 +151,7 @@ def import_submission_function(obj:QMainWindow, fname:Path|None=None) -> Tuple[Q
except ValueError: except ValueError:
cats.insert(0, cats.pop(cats.index(pyd.submission_type['value']))) cats.insert(0, cats.pop(cats.index(pyd.submission_type['value'])))
add_widget.addItems(cats) add_widget.addItems(cats)
case "ctx": case "ctx" | 'reagents' | 'csv' | 'filepath':
continue
case 'reagents':
# NOTE: This is now set to run when the extraction kit is updated.
continue
case 'csv':
continue continue
case _: case _:
# anything else gets added in as a line edit # anything else gets added in as a line edit
@@ -178,7 +175,6 @@ def import_submission_function(obj:QMainWindow, fname:Path|None=None) -> Tuple[Q
if "csv" in pyd.model_extra: if "csv" in pyd.model_extra:
obj.csv = pyd.model_extra['csv'] obj.csv = pyd.model_extra['csv']
logger.debug(f"All attributes of obj:\n{pprint.pformat(obj.__dict__)}") logger.debug(f"All attributes of obj:\n{pprint.pformat(obj.__dict__)}")
return obj, result return obj, result
def kit_reload_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: def kit_reload_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
@@ -265,41 +261,44 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
# # seperate out reagents # # seperate out reagents
# reagents = {k.replace("lot_", ""):v for k,v in info.items() if k.startswith("lot_")} # reagents = {k.replace("lot_", ""):v for k,v in info.items() if k.startswith("lot_")}
# info = {k:v for k,v in info.items() if not k.startswith("lot_")} # info = {k:v for k,v in info.items() if not k.startswith("lot_")}
info, reagents = obj.table_widget.formwidget.parse_form() # info, reagents = obj.table_widget.formwidget.parse_form()
logger.debug(f"Info: {info}") submission: PydSubmission = obj.table_widget.formwidget.parse_form()
logger.debug(f"Reagents: {reagents}") logger.debug(f"Submission: {pprint.pformat(submission)}")
parsed_reagents = [] parsed_reagents = []
# compare reagents in form to reagent database # compare reagents in form to reagent database
for reagent in reagents: for reagent in submission.reagents:
# Lookup any existing reagent of this type with this lot number # Lookup any existing reagent of this type with this lot number
wanted_reagent = lookup_reagents(ctx=obj.ctx, lot_number=reagent['lot'], reagent_type=reagent['name']) wanted_reagent = lookup_reagents(ctx=obj.ctx, lot_number=reagent.lot, reagent_type=reagent.name)
logger.debug(f"Looked up reagent: {wanted_reagent}") logger.debug(f"Looked up reagent: {wanted_reagent}")
# if reagent not found offer to add to database # if reagent not found offer to add to database
if wanted_reagent == None: if wanted_reagent == None:
# r_lot = reagent[reagent] # r_lot = reagent[reagent]
r_lot = reagent['lot'] dlg = QuestionAsker(title=f"Add {reagent.lot}?", message=f"Couldn't find reagent type {reagent.name.strip('Lot')}: {reagent.lot} in the database.\n\nWould you like to add it?")
dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent['name'].strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?")
if dlg.exec(): if dlg.exec():
logger.debug(f"Looking through {pprint.pformat(obj.reagents)} for reagent {reagent['name']}") logger.debug(f"Looking through {pprint.pformat(obj.reagents)} for reagent {reagent.name}")
try: try:
picked_reagent = [item for item in obj.reagents if item.type == reagent['name']][0] picked_reagent = [item for item in obj.reagents if item.type == reagent.name][0]
except IndexError: except IndexError:
logger.error(f"Couldn't find {reagent['name']} in obj.reagents. Checking missing reagents {pprint.pformat(obj.missing_reagents)}") logger.error(f"Couldn't find {reagent.name} in obj.reagents. Checking missing reagents {pprint.pformat(obj.missing_reagents)}")
picked_reagent = [item for item in obj.missing_reagents if item.type == reagent['name']][0] picked_reagent = [item for item in obj.missing_reagents if item.type == reagent.name][0]
logger.debug(f"checking reagent: {reagent['name']} in obj.reagents. Result: {picked_reagent}") logger.debug(f"checking reagent: {reagent.name} in obj.reagents. Result: {picked_reagent}")
expiry_date = picked_reagent.exp expiry_date = picked_reagent.exp
wanted_reagent = obj.add_reagent(reagent_lot=r_lot, reagent_type=reagent['name'].replace("lot_", ""), expiry=expiry_date, name=picked_reagent.name) wanted_reagent = obj.add_reagent(reagent_lot=reagent.lot, reagent_type=reagent.name.replace("lot_", ""), expiry=expiry_date, name=picked_reagent.name)
else: else:
# In this case we will have an empty reagent and the submission will fail kit integrity check # In this case we will have an empty reagent and the submission will fail kit integrity check
logger.debug("Will not add reagent.") logger.debug("Will not add reagent.")
return obj, dict(message="Failed integrity check", status="critical") return obj, dict(message="Failed integrity check", status="critical")
parsed_reagents.append(wanted_reagent) # Append the PydReagent object o be added to the submission
parsed_reagents.append(reagent)
# move samples into preliminary submission dict # move samples into preliminary submission dict
info['samples'] = obj.samples submission.reagents = parsed_reagents
info['uploaded_by'] = getuser() # submission.uploaded_by = getuser()
# construct submission object # construct submission object
logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") # logger.debug(f"Here is the info_dict: {pprint.pformat(info)}")
base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info) # base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info)
base_submission, result = submission.toSQL()
# delattr(base_submission, "ctx")
# raise ValueError(base_submission.__dict__)
# check output message for issues # check output message for issues
match result['code']: match result['code']:
# code 1: ask for overwrite # code 1: ask for overwrite
@@ -307,7 +306,8 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message']) dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message'])
if dlg.exec(): if dlg.exec():
# Do not add duplicate reagents. # Do not add duplicate reagents.
base_submission.reagents = [] # base_submission.reagents = []
pass
else: else:
obj.ctx.database_session.rollback() obj.ctx.database_session.rollback()
return obj, dict(message="Overwrite cancelled", status="Information") return obj, dict(message="Overwrite cancelled", status="Information")
@@ -317,16 +317,17 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
case _: case _:
pass pass
# add reagents to submission object # add reagents to submission object
for reagent in parsed_reagents: for reagent in base_submission.reagents:
base_submission.reagents.append(reagent)
update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit) update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit)
logger.debug(f"Parsed reagents: {pprint.pformat(parsed_reagents)}") logger.debug(f"Here is the final submission: {pprint.pformat(base_submission.__dict__)}")
logger.debug(f"Parsed reagents: {pprint.pformat(base_submission.reagents)}")
logger.debug("Checking kit integrity...") logger.debug("Checking kit integrity...")
kit_integrity = check_kit_integrity(base_submission) kit_integrity = check_kit_integrity(base_submission)
if kit_integrity != None: if kit_integrity != None:
return obj, dict(message=kit_integrity['message'], status="critical") return obj, dict(message=kit_integrity['message'], status="critical")
logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.") logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
result = store_object(ctx=obj.ctx, object=base_submission) # result = store_object(ctx=obj.ctx, object=base_submission)
base_submission.save(ctx=obj.ctx)
# update summary sheet # update summary sheet
obj.table_widget.sub_wid.setData() obj.table_widget.sub_wid.setData()
# reset form # reset form
@@ -339,9 +340,10 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"We have the extraction kit: {extraction_kit.name}") logger.debug(f"We have the extraction kit: {extraction_kit.name}")
excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type) excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type)
logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}") logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}")
input_reagents = [item.to_reagent_dict(extraction_kit=base_submission.extraction_kit) for item in parsed_reagents] input_reagents = [item.to_reagent_dict(extraction_kit=base_submission.extraction_kit) for item in base_submission.reagents]
logger.debug(f"Parsed reagents going into autofile: {pprint.pformat(input_reagents)}") logger.debug(f"Parsed reagents going into autofile: {pprint.pformat(input_reagents)}")
autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info, missing_info=obj.missing_info) # autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info, missing_info=obj.missing_info)
autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=base_submission.__dict__, missing_info=obj.missing_info)
if hasattr(obj, 'csv'): if hasattr(obj, 'csv'):
dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?") dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?")
if dlg.exec(): if dlg.exec():

View File

@@ -17,7 +17,6 @@ from sqlalchemy import create_engine
from pydantic import field_validator from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Any, Tuple from typing import Any, Tuple
from datetime import datetime
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -55,6 +54,11 @@ def check_not_nan(cell_contents) -> bool:
cell_contents = cell_contents.lower() cell_contents = cell_contents.lower()
except (TypeError, AttributeError): except (TypeError, AttributeError):
pass pass
try:
if np.isnat(cell_contents):
cell_contents = np.nan
except TypeError as e:
pass
if cell_contents == "nat": if cell_contents == "nat":
cell_contents = np.nan cell_contents = np.nan
if cell_contents == 'nan': if cell_contents == 'nan':
@@ -89,38 +93,6 @@ def convert_nans_to_nones(input_str) -> str|None:
return input_str return input_str
return None return None
# def create_reagent_list(in_dict:dict) -> list[str]:
# """
# Makes list of reagent types without "lot_" prefix for each key in a dictionary
# Args:
# in_dict (dict): input dictionary of reagents
# Returns:
# list[str]: list of reagent types with "lot_" prefix removed.
# """
# return [item.strip("lot_") for item in in_dict.keys()]
# def retrieve_rsl_number(in_str:str) -> Tuple[str, str]:
# """
# Uses regex to retrieve the plate number and submission type from an input string
# DEPRECIATED. REPLACED BY RSLNamer.parsed_name
# Args:
# in_str (str): string to be parsed
# Returns:
# Tuple[str, str]: tuple of (output rsl number, submission_type)
# """
# in_str = in_str.split("\\")[-1]
# logger.debug(f"Attempting match of {in_str}")
# regex = re.compile(r"""
# (?P<wastewater>RSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?P<bacterial_culture>RSL-\d{2}-\d{4})
# """, re.VERBOSE)
# m = regex.search(in_str)
# parsed = m.group().replace("_", "-")
# return (parsed, m.lastgroup)
def check_regex_match(pattern:str, check:str) -> bool: def check_regex_match(pattern:str, check:str) -> bool:
try: try:
return bool(re.match(fr"{pattern}", check)) return bool(re.match(fr"{pattern}", check))
@@ -438,26 +410,6 @@ def jinja_template_loading():
env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css") env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css")
return env return env
# def check_is_power_user(ctx:Settings) -> bool:
# """
# Check to ensure current user is in power users list.
# NOTE: Depreciated in favour of 'check_authorization' below.
# Args:
# ctx (dict): settings passed down from gui.
# Returns:
# bool: True if user is in power users, else false.
# """
# try:
# check = getpass.getuser() in ctx.power_users
# except KeyError as e:
# check = False
# except Exception as e:
# logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
# check = False
# return check
def check_authorization(func): def check_authorization(func):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
logger.debug(f"Checking authorization") logger.debug(f"Checking authorization")