Before creating info and reagent parser classes.

This commit is contained in:
Landon Wark
2023-08-21 13:50:38 -05:00
parent af810ae528
commit b6de159631
20 changed files with 1176 additions and 571 deletions

View File

@@ -3,15 +3,16 @@ Convenience functions for interacting with the database.
'''
from . import models
from .models.kits import reagenttypes_kittypes, KitType
from .models.submissions import reagents_submissions, BasicSubmission
# from .models.kits import KitType
# from .models.submissions import BasicSample, reagents_submissions, BasicSubmission, SubmissionSampleAssociation
# from .models import submissions
import pandas as pd
import sqlalchemy.exc
import sqlite3
import logging
from datetime import date, datetime, timedelta
from sqlalchemy import and_
from sqlalchemy import JSON, event
from sqlalchemy import and_, JSON, event
from sqlalchemy.exc import IntegrityError, OperationalError, SAWarning
from sqlalchemy.engine import Engine
import json
from getpass import getuser
@@ -19,6 +20,7 @@ import numpy as np
import yaml
from pathlib import Path
from tools import Settings, check_regex_match, RSLNamer
from typing import List
@@ -32,7 +34,7 @@ def set_sqlite_pragma(dbapi_connection, connection_record):
cursor.close()
def store_submission(ctx:Settings, base_submission:models.BasicSubmission) -> None|dict:
def store_submission(ctx:Settings, base_submission:models.BasicSubmission, samples:List[dict]=[]) -> None|dict:
"""
Upserts submissions into database
@@ -47,26 +49,37 @@ def store_submission(ctx:Settings, base_submission:models.BasicSubmission) -> No
# Add all samples to sample table
typer = RSLNamer(ctx=ctx, instr=base_submission.rsl_plate_num)
base_submission.rsl_plate_num = typer.parsed_name
for sample in base_submission.samples:
logger.debug(f"Typer: {typer.submission_type}")
logger.debug(f"sample going in: {type(sample)}\n{sample.__dict__}")
# Suuuuuper hacky way to be sure that the artic doesn't overwrite the ww plate in a ww sample
# need something more elegant
if "_artic" not in typer.submission_type:
sample.rsl_plate = base_submission
else:
logger.debug(f"{sample.ww_sample_full_id} is an ARTIC sample.")
# base_submission.samples.remove(sample)
# sample.rsl_plate = sample.rsl_plate
# sample.artic_rsl_plate = base_submission
logger.debug(f"Attempting to add sample: {sample.to_string()}")
try:
# ctx['database_session'].add(sample)
ctx.database_session.add(sample)
except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
logger.debug(f"Hit an integrity error : {e}")
continue
logger.debug(f"Here is the sample to be stored in the DB: {sample.__dict__}")
# for sample in samples:
# instance = sample['sample']
# logger.debug(f"Typer: {typer.submission_type}")
# logger.debug(f"sample going in: {type(sample['sample'])}\n{sample['sample'].__dict__}")
# # Suuuuuper hacky way to be sure that the artic doesn't overwrite the ww plate in a ww sample
# # need something more elegant
# # if "_artic" not in typer.submission_type:
# # sample.rsl_plate = base_submission
# # else:
# # logger.debug(f"{sample.ww_sample_full_id} is an ARTIC sample.")
# # # base_submission.samples.remove(sample)
# # # sample.rsl_plate = sample.rsl_plate
# # # sample.artic_rsl_plate = base_submission
# # logger.debug(f"Attempting to add sample: {sample.to_string()}")
# # try:
# # ctx['database_session'].add(sample)
# # ctx.database_session.add(instance)
# # ctx.database_session.commit()
# # logger.debug(f"Submitter id: {sample['sample'].submitter_id} and table id: {sample['sample'].id}")
# logger.debug(f"Submitter id: {instance.submitter_id} and table id: {instance.id}")
# assoc = SubmissionSampleAssociation(submission=base_submission, sample=instance, row=sample['row'], column=sample['column'])
# # except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
# # logger.debug(f"Hit an integrity error : {e}")
# # continue
# try:
# base_submission.submission_sample_associations.append(assoc)
# except IntegrityError as e:
# logger.critical(e)
# continue
# logger.debug(f"Here is the sample to be stored in the DB: {sample.__dict__}")
# Add submission to submission table
# ctx['database_session'].add(base_submission)
ctx.database_session.add(base_submission)
@@ -148,14 +161,15 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmi
code = 1
msg = "This submission already exists.\nWould you like to overwrite?"
for item in info_dict:
logger.debug(f"Setting {item} to {info_dict[item]}")
value = info_dict[item]
logger.debug(f"Setting {item} to {value}")
# set fields based on keys in dictionary
match item:
case "extraction_kit":
q_str = info_dict[item]
logger.debug(f"Looking up kit {q_str}")
# q_str = info_dict[item]
logger.debug(f"Looking up kit {value}")
try:
field_value = lookup_kittype_by_name(ctx=ctx, name=q_str)
field_value = lookup_kittype_by_name(ctx=ctx, name=value)
except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
logger.error(f"Hit an integrity error looking up kit type: {e}")
logger.error(f"Details: {e.__dict__}")
@@ -164,29 +178,62 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmi
else:
msg = "SQL integrity error of unknown origin."
return instance, dict(code=2, message=msg)
logger.debug(f"Got {field_value} for kit {q_str}")
logger.debug(f"Got {field_value} for kit {value}")
case "submitting_lab":
q_str = info_dict[item].replace(" ", "_").lower()
logger.debug(f"Looking up organization: {q_str}")
field_value = lookup_org_by_name(ctx=ctx, name=q_str)
logger.debug(f"Got {field_value} for organization {q_str}")
value = value.replace(" ", "_").lower()
logger.debug(f"Looking up organization: {value}")
field_value = lookup_org_by_name(ctx=ctx, name=value)
logger.debug(f"Got {field_value} for organization {value}")
case "submitter_plate_num":
# Because of unique constraint, there will be problems with
# multiple submissions named 'None', so...
# Should be depreciated with use of pydantic validator
logger.debug(f"Submitter plate id: {info_dict[item]}")
logger.debug(f"Submitter plate id: {value}")
# if info_dict[item] == None or info_dict[item] == "None" or info_dict[item] == "":
# logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
# info_dict[item] = uuid.uuid4().hex.upper()
field_value = info_dict[item]
field_value = value
case "samples":
for sample in value:
sample_instance = lookup_sample_by_submitter_id(ctx=ctx, submitter_id=sample['sample'].submitter_id)
if sample_instance == None:
sample_instance = sample['sample']
else:
logger.warning(f"Sample {sample} already exists, creating association.")
if sample_instance in instance.samples:
logger.error(f"Looks like there's a duplicate sample on this plate: {sample_instance.submitter_id}!")
continue
try:
with ctx.database_session.no_autoflush:
try:
logger.debug(f"Here is the sample instance type: {sample_instance.sample_type}")
try:
assoc = getattr(models, f"{sample_instance.sample_type.replace('_sample', '').replace('_', ' ').title().replace(' ', '')}Association")
except AttributeError as e:
assoc = models.SubmissionSampleAssociation
# assoc = models.SubmissionSampleAssociation(submission=instance, sample=sample_instance, row=sample['row'], column=sample['column'])
assoc = assoc(submission=instance, sample=sample_instance, row=sample['row'], column=sample['column'])
instance.submission_sample_associations.append(assoc)
except IntegrityError:
logger.error(f"Hit integrity error for: {sample}")
continue
except SAWarning:
logger.error(f"Looks like the association already exists for submission: {instance} and sample: {sample_instance}")
continue
except IntegrityError as e:
logger.critical(e)
continue
continue
case _:
field_value = info_dict[item]
field_value = value
# insert into field
try:
setattr(instance, item, field_value)
except AttributeError:
logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
continue
except KeyError:
continue
# calculate cost of the run: immutable cost + mutable times number of columns
# This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
try:
@@ -202,8 +249,9 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmi
logger.debug("Checking and applying discounts...")
discounts = [item.amount for item in lookup_discounts_by_org_and_kit(ctx=ctx, kit_id=instance.extraction_kit.id, lab_id=instance.submitting_lab.id)]
logger.debug(f"We got discounts: {discounts}")
discounts = sum(discounts)
instance.run_cost = instance.run_cost - discounts
if len(discounts) > 0:
discounts = sum(discounts)
instance.run_cost = instance.run_cost - discounts
except Exception as e:
logger.error(f"An unknown exception occurred when calculating discounts: {e}")
# We need to make sure there's a proper rsl plate number
@@ -307,10 +355,15 @@ def lookup_kittype_by_name(ctx:Settings, name:str) -> models.KitType:
Returns:
models.KitType: retrieved kittype
"""
if isinstance(name, dict):
name = name['value']
logger.debug(f"Querying kittype: {name}")
# return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
return ctx.database_session.query(models.KitType).filter(models.KitType.name==name).first()
def lookup_kittype_by_id(ctx:Settings, id:int) -> models.KitType:
return ctx.database_session.query(models.KitType).filter(models.KitType.id==id).first()
def lookup_regent_by_type_name(ctx:Settings, type_name:str) -> list[models.Reagent]:
"""
Lookup reagents by their type name
@@ -519,18 +572,21 @@ def create_kit_from_yaml(ctx:Settings, exp:dict) -> dict:
# look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first()
look_up = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==r).first()
if look_up == None:
rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit], required=1)
# rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit], required=1)
rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), last_used="")
else:
rt = look_up
rt.kits.append(kit)
# rt.kits.append(kit)
# add this because I think it's necessary to get proper back population
try:
kit.reagent_types_id.append(rt.id)
except AttributeError as e:
logger.error(f"Error appending reagent id to kit.reagent_types_id: {e}, creating new.")
# try:
# kit.reagent_types_id.append(rt.id)
# except AttributeError as e:
# logger.error(f"Error appending reagent id to kit.reagent_types_id: {e}, creating new.")
# kit.reagent_types_id = [rt.id]
assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses=kit.used_for)
# ctx['database_session'].add(rt)
ctx.database_session.add(rt)
kit.kit_reagenttype_associations.append(assoc)
logger.debug(f"Kit construction reagent type: {rt.__dict__}")
logger.debug(f"Kit construction kit: {kit.__dict__}")
# ctx['database_session'].add(kit)
@@ -727,19 +783,25 @@ def delete_submission_by_id(ctx:Settings, id:int) -> None:
yaml.dump(backup, f)
except KeyError:
pass
sub.reagents = []
for sample in sub.samples:
if sample.rsl_plate == sub:
# ctx['database_session'].delete(sample)
ctx.database_session.delete(sample)
else:
logger.warning(f"Not deleting sample {sample.ww_sample_full_id} because it belongs to another plate.")
# sub.reagents = []
# for assoc in sub.submission_sample_associations:
# # if sample.rsl_plate == sub:
# if sub in sample.submissions:
# # ctx['database_session'].delete(sample)
# ctx.database_session.delete(assoc)
# else:
# logger.warning(f"Not deleting sample {sample.ww_sample_full_id} because it belongs to another plate.")
# ctx["database_session"].delete(sub)
# ctx["database_session"].commit()
ctx.database_session.delete(sub)
ctx.database_session.commit()
try:
ctx.database_session.commit()
except (IntegrityError, OperationalError) as e:
ctx.database_session.rollback()
raise e
def lookup_ww_sample_by_rsl_sample_number(ctx:Settings, rsl_number:str) -> models.WWSample:
def lookup_ww_sample_by_rsl_sample_number(ctx:Settings, rsl_number:str) -> models.WastewaterSample:
"""
Retrieves wastewater sample from database by rsl sample number
@@ -751,9 +813,9 @@ def lookup_ww_sample_by_rsl_sample_number(ctx:Settings, rsl_number:str) -> model
models.WWSample: instance of wastewater sample
"""
# return ctx['database_session'].query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first()
return ctx.database_session.query(models.WWSample).filter(models.WWSample.rsl_number==rsl_number).first()
return ctx.database_session.query(models.WastewaterSample).filter(models.WastewaterSample.rsl_number==rsl_number).first()
def lookup_ww_sample_by_ww_sample_num(ctx:Settings, sample_number:str) -> models.WWSample:
def lookup_ww_sample_by_ww_sample_num(ctx:Settings, sample_number:str) -> models.WastewaterSample:
"""
Retrieves wastewater sample from database by ww sample number
@@ -764,9 +826,9 @@ def lookup_ww_sample_by_ww_sample_num(ctx:Settings, sample_number:str) -> models
Returns:
models.WWSample: instance of wastewater sample
"""
return ctx.database_session.query(models.WWSample).filter(models.WWSample.ww_sample_full_id==sample_number).first()
return ctx.database_session.query(models.WastewaterSample).filter(models.WastewaterSample.submitter_id==sample_number).first()
def lookup_ww_sample_by_sub_sample_rsl(ctx:Settings, sample_rsl:str, plate_rsl:str) -> models.WWSample:
def lookup_ww_sample_by_sub_sample_rsl(ctx:Settings, sample_rsl:str, plate_rsl:str) -> models.WastewaterSample:
"""
Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number.
This will likely replace simply looking up by the sample rsl above cine I need to control for repeats.
@@ -780,9 +842,10 @@ def lookup_ww_sample_by_sub_sample_rsl(ctx:Settings, sample_rsl:str, plate_rsl:s
models.WWSample: Relevant wastewater object
"""
# return ctx['database_session'].query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first()
return ctx.database_session.query(models.WWSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.WWSample.rsl_number==sample_rsl).first()
# return ctx.database_session.query(models.BasicSample).join(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==plate_rsl).filter(models.BasicSample.submitter_id==sample_rsl).first()
return ctx.database_session.query(models.BasicSample).filter(models.BasicSample.submissions.any(models.BasicSubmission.rsl_plate_num==plate_rsl)).filter(models.WastewaterSample.rsl_number==sample_rsl).first()
def lookup_ww_sample_by_sub_sample_well(ctx:Settings, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WWSample:
def lookup_ww_sample_by_sub_sample_well(ctx:Settings, sample_rsl:str, well_num:str, plate_rsl:str) -> models.WastewaterSample:
"""
Retrieves a wastewater sample from the database by its rsl sample number and parent rsl plate number.
This will likely replace simply looking up by the sample rsl above cine I need to control for repeats.
@@ -800,10 +863,10 @@ def lookup_ww_sample_by_sub_sample_well(ctx:Settings, sample_rsl:str, well_num:s
# .filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \
# .filter(models.WWSample.rsl_number==sample_rsl) \
# .filter(models.WWSample.well_number==well_num).first()
return ctx.database_session.query(models.WWSample).join(models.BasicSubmission) \
return ctx.database_session.query(models.WastewaterSample).join(models.BasicSubmission) \
.filter(models.BasicSubmission.rsl_plate_num==plate_rsl) \
.filter(models.WWSample.rsl_number==sample_rsl) \
.filter(models.WWSample.well_number==well_num).first()
.filter(models.WastewaterSample.rsl_number==sample_rsl) \
.filter(models.WastewaterSample.well_number==well_num).first()
def update_ww_sample(ctx:Settings, sample_obj:dict):
"""
@@ -815,25 +878,26 @@ def update_ww_sample(ctx:Settings, sample_obj:dict):
"""
# ww_samp = lookup_ww_sample_by_rsl_sample_number(ctx=ctx, rsl_number=sample_obj['sample'])
logger.debug(f"Looking up {sample_obj['sample']} in plate {sample_obj['plate_rsl']}")
ww_samp = lookup_ww_sample_by_sub_sample_rsl(ctx=ctx, sample_rsl=sample_obj['sample'], plate_rsl=sample_obj['plate_rsl'])
# ww_samp = lookup_ww_sample_by_sub_sample_rsl(ctx=ctx, sample_rsl=sample_obj['sample'], plate_rsl=sample_obj['plate_rsl'])
assoc = lookup_ww_association_by_plate_sample(ctx=ctx, rsl_plate_num=sample_obj['plate_rsl'], rsl_sample_num=sample_obj['sample'])
# ww_samp = lookup_ww_sample_by_sub_sample_well(ctx=ctx, sample_rsl=sample_obj['sample'], well_num=sample_obj['well_num'], plate_rsl=sample_obj['plate_rsl'])
if ww_samp != None:
if assoc != None:
# del sample_obj['well_number']
for key, value in sample_obj.items():
# set attribute 'key' to 'value'
try:
check = getattr(ww_samp, key)
check = getattr(assoc, key)
except AttributeError:
continue
if check == None:
logger.debug(f"Setting {key} to {value}")
setattr(ww_samp, key, value)
setattr(assoc, key, value)
else:
logger.error(f"Unable to find sample {sample_obj['sample']}")
return
# ctx['database_session'].add(ww_samp)
# ctx["database_session"].commit()
ctx.database_session.add(ww_samp)
ctx.database_session.add(assoc)
ctx.database_session.commit()
def lookup_discounts_by_org_and_kit(ctx:Settings, kit_id:int, lab_id:int) -> list:
@@ -860,7 +924,7 @@ def lookup_discounts_by_org_and_kit(ctx:Settings, kit_id:int, lab_id:int) -> lis
def hitpick_plate(submission:models.BasicSubmission, plate_number:int=0) -> list:
"""
Creates a list of sample positions and statuses to be used by plate mapping and csv output to biomek software.
DEPRECIATED: replaced by Submission.hitpick
Args:
submission (models.BasicSubmission): Input submission
plate_number (int, optional): plate position in the series of selected plates. Defaults to 0.
@@ -881,7 +945,7 @@ def hitpick_plate(submission:models.BasicSubmission, plate_number:int=0) -> list
this_sample = dict(
plate_number = plate_number,
sample_name = samp['name'],
column = samp['col'],
column = samp['column'],
row = samp['row'],
positive = samp['positive'],
plate_name = submission.rsl_plate_num
@@ -966,7 +1030,7 @@ def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str) -> models.Reag
except AttributeError:
return None
def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None) -> dict|None:
def check_kit_integrity(sub:models.BasicSubmission|models.KitType, reagenttypes:list|None=None) -> dict|None:
"""
Ensures all reagents expected in kit are listed in Submission
@@ -980,16 +1044,20 @@ def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None
logger.debug(type(sub))
# What type is sub?
match sub:
case BasicSubmission():
case models.BasicSubmission():
# Get all required reagent types for this kit.
ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1]
# ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1]
ext_kit_rtypes = [item.name for item in sub.extraction_kit.get_reagents(required=True)]
# Overwrite function parameter reagenttypes
try:
reagenttypes = [reagent.type.name for reagent in sub.reagents]
except AttributeError as e:
logger.error(f"Problem parsing reagents: {[f'{reagent.lot}, {reagent.type}' for reagent in sub.reagents]}")
case KitType():
ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types if reagenttype.required == 1]
case models.KitType():
# ext_kit_rtypes = [reagenttype.name for reagenttype in sub.reagent_types if reagenttype.required == 1]
ext_kit_rtypes = [item.name for item in sub.get_reagents(required=True)]
case _:
raise ValueError(f"There was no match for the integrity object.\n\nCheck to make sure they are imported from the same place because it matters.")
logger.debug(f"Kit reagents: {ext_kit_rtypes}")
logger.debug(f"Submission reagents: {reagenttypes}")
# check if lists are equal
@@ -1003,4 +1071,83 @@ def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None
result = None
else:
result = {'message' : f"The submission you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.upper() for item in missing]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", 'missing': missing}
return result
return result
def lookup_sample_by_submitter_id(ctx:Settings, submitter_id:str) -> models.BasicSample:
"""
_summary_
Args:
ctx (Settings): _description_
submitter_id (str): _description_
Returns:
BasicSample: _description_
"""
return ctx.database_session.query(models.BasicSample).filter(models.BasicSample.submitter_id==submitter_id).first()
def get_all_submission_types(ctx:Settings) -> List[str]:
"""
_summary_
Args:
ctx (Settings): _description_
Returns:
List[str]: _description_
"""
kits = ctx.database_session.query(KitType).all()
uses = [list(item.used_for.keys()) for item in kits]
flat_list = [item for sublist in uses for item in sublist]
return list(set(flat_list)).sort()
def get_reagents_in_extkit(ctx:Settings, kit_name:str) -> List[str]:
"""
_summary_
DEPRECIATED, use kit.get_reagents() instead
Args:
ctx (Settings): _description_
kit_name (str): _description_
Returns:
List[str]: _description_
"""
kit = lookup_kittype_by_name(ctx=ctx, name=kit_name)
return kit.get_reagents(required=False)
def lookup_ww_association_by_plate_sample(ctx:Settings, rsl_plate_num:str, rsl_sample_num:str) -> models.SubmissionSampleAssociation:
"""
_summary_
Args:
ctx (Settings): _description_
rsl_plate_num (str): _description_
sample_submitter_id (_type_): _description_
Returns:
models.SubmissionSampleAssociation: _description_
"""
return ctx.database_session.query(models.SubmissionSampleAssociation)\
.join(models.BasicSubmission)\
.join(models.WastewaterSample)\
.filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)\
.filter(models.WastewaterSample.rsl_number==rsl_sample_num)\
.first()
def lookup_all_reagent_names_by_role(ctx:Settings, role_name:str) -> List[str]:
"""
_summary_
Args:
ctx (Settings): _description_
role_name (str): _description_
Returns:
List[str]: _description_
"""
role = lookup_reagenttype_by_name(ctx=ctx, rt_name=role_name)
try:
return [reagent.name for reagent in role.instances]
except AttributeError:
return []

View File

@@ -7,7 +7,7 @@ Base = declarative_base()
metadata = Base.metadata
from .controls import Control, ControlType
from .kits import KitType, ReagentType, Reagent, Discount
from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation
from .organizations import Organization, Contact
from .samples import WWSample, BCSample
from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic
# from .samples import WWSample, BCSample, BasicSample
from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, BasicSample, SubmissionSampleAssociation, WastewaterAssociation

View File

@@ -4,14 +4,23 @@ All kit and reagent related models
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, CheckConstraint
from sqlalchemy.orm import relationship, validates
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date
import logging
logger = logging.getLogger(f'submissions.{__name__}')
# Table containing reagenttype-kittype relationships
reagenttypes_kittypes = Table("_reagentstypes_kittypes", Base.metadata, Column("reagent_types_id", INTEGER, ForeignKey("_reagent_types.id")), Column("kits_id", INTEGER, ForeignKey("_kits.id")))
# # Table containing reagenttype-kittype relationships
# reagenttypes_kittypes = Table("_reagentstypes_kittypes", Base.metadata,
# Column("reagent_types_id", INTEGER, ForeignKey("_reagent_types.id")),
# Column("kits_id", INTEGER, ForeignKey("_kits.id")),
# # The entry will look like ["Bacteria Culture":{"row":1, "column":4}]
# Column("uses", JSON),
# # is the reagent required for that kit?
# Column("required", INTEGER)
# )
class KitType(Base):
@@ -25,12 +34,24 @@ class KitType(Base):
submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for
used_for = Column(JSON) #: list of names of sample types this kit can process
cost_per_run = Column(FLOAT(2)) #: dollar amount for each full run of this kit NOTE: depreciated, use the constant and mutable costs instead
# TODO: Change below to 'mutable_cost_column' and 'mutable_cost_sample' before moving to production.
mutable_cost_column = Column(FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc)
mutable_cost_sample = Column(FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc)
constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc)
reagent_types = relationship("ReagentType", back_populates="kits", uselist=True, secondary=reagenttypes_kittypes) #: reagent types this kit contains
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id", ondelete='SET NULL', use_alter=True, name="fk_KT_reagentstype_id")) #: joined reagent type id
# reagent_types = relationship("ReagentType", back_populates="kits", uselist=True, secondary=reagenttypes_kittypes) #: reagent types this kit contains
# reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id", ondelete='SET NULL', use_alter=True, name="fk_KT_reagentstype_id")) #: joined reagent type id
# kit_reagenttype_association =
kit_reagenttype_associations = relationship(
"KitTypeReagentTypeAssociation",
back_populates="kit_type",
cascade="all, delete-orphan",
)
# association proxy of "user_keyword_associations" collection
# to "keyword" attribute
reagent_types = association_proxy("kit_reagenttype_associations", "reagenttype")
def __repr__(self) -> str:
return f"KitType({self.name})"
def __str__(self) -> str:
"""
@@ -41,6 +62,61 @@ class KitType(Base):
"""
return self.name
def get_reagents(self, required:bool=False) -> list:
if required:
return [item.reagenttype for item in self.kit_reagenttype_associations if item.required == 1]
else:
return [item.reagenttype for item in self.kit_reagenttype_associations]
def construct_xl_map_for_use(self, use:str) -> dict:
# map = self.used_for[use]
map = {}
assocs = [item for item in self.kit_reagenttype_associations if use in item.uses]
for assoc in assocs:
try:
map[assoc.reagenttype.name] = assoc.uses[use]
except TypeError:
continue
return map
class KitTypeReagentTypeAssociation(Base):
"""
table containing reagenttype/kittype associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
__tablename__ = "_reagenttypes_kittypes"
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
uses = Column(JSON)
required = Column(INTEGER)
# reagent_type_name = Column(INTEGER, ForeignKey("_reagent_types.name"))
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations")
# reference to the "ReagentType" object
reagenttype = relationship("ReagentType")
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
self.kit = kit_type
self.reagenttype = reagent_type
self.uses = uses
self.required = required
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
@validates('reagenttype')
def validate_reagenttype(self, key, value):
if not isinstance(value, ReagentType):
raise ValueError(f'{value} is not a reagenttype')
return value
class ReagentType(Base):
"""
@@ -50,17 +126,17 @@ class ReagentType(Base):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of reagent type
kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete="SET NULL", use_alter=True, name="fk_RT_kits_id")) #: id of joined kit type
kits = relationship("KitType", back_populates="reagent_types", uselist=True, foreign_keys=[kit_id]) #: kits this reagent is used in
# kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete="SET NULL", use_alter=True, name="fk_RT_kits_id")) #: id of joined kit type
# kits = relationship("KitType", back_populates="reagent_types", uselist=True, foreign_keys=[kit_id]) #: kits this reagent is used in
instances = relationship("Reagent", back_populates="type") #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit
# required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit
last_used = Column(String(32)) #: last used lot number of this type of reagent
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}')
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
def __str__(self) -> str:
@@ -71,6 +147,9 @@ class ReagentType(Base):
str: string representing this object's name
"""
return self.name
def __repr__(self):
return f"ReagentType({self.name})"
class Reagent(Base):
@@ -87,6 +166,13 @@ class Reagent(Base):
expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically
submissions = relationship("BasicSubmission", back_populates="reagents", uselist=True) #: submissions this reagent is used in
def __repr__(self):
if self.name != None:
return f"Reagent({self.name}-{self.lot})"
else:
return f"Reagent({self.type.name}-{self.lot})"
def __str__(self) -> str:
"""
string representing this object
@@ -142,4 +228,6 @@ class Discount(Base):
client = relationship("Organization") #: joined client lab
client_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete='SET NULL', name="fk_org_id"))
name = Column(String(128))
amount = Column(FLOAT(2))
amount = Column(FLOAT(2))

View File

@@ -21,7 +21,7 @@ class Organization(Base):
submissions = relationship("BasicSubmission", back_populates="submitting_lab") #: submissions this organization has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization", secondary=orgs_contacts) #: contacts involved with this org
contact_ids = Column(INTEGER, ForeignKey("_contacts.id", ondelete="SET NULL", name="fk_org_contact_id")) #: contact ids of this organization
# contact_ids = Column(INTEGER, ForeignKey("_contacts.id", ondelete="SET NULL", name="fk_org_contact_id")) #: contact ids of this organization
def __str__(self) -> str:
"""
@@ -44,5 +44,5 @@ class Contact(Base):
email = Column(String(64)) #: contact email
phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization
organization_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete="SET NULL", name="fk_contact_org_id")) #: joined organization ids
# organization_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete="SET NULL", name="fk_contact_org_id")) #: joined organization ids

View File

@@ -1,158 +0,0 @@
'''
All models for individual samples.
'''
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, FLOAT, BOOLEAN, JSON
from sqlalchemy.orm import relationship
import logging
logger = logging.getLogger(f"submissions.{__name__}")
class WWSample(Base):
"""
Base wastewater sample
"""
__tablename__ = "_ww_samples"
id = Column(INTEGER, primary_key=True) #: primary key
ww_processing_num = Column(String(64)) #: wastewater processing number
ww_sample_full_id = Column(String(64), nullable=False, unique=True)
rsl_number = Column(String(64)) #: rsl plate identification number
rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date submission received
well_number = Column(String(8)) #: location on 96 well plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
testing_type = Column(String(64))
site_status = Column(String(64))
notes = Column(String(2000))
ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
n1_status = Column(String(32))
n2_status = Column(String(32))
seq_submitted = Column(BOOLEAN())
ww_seq_run_id = Column(String(64))
sample_type = Column(String(8))
pcr_results = Column(JSON)
well_24 = Column(String(8)) #: location on 24 well plate
artic_rsl_plate = relationship("WastewaterArtic", back_populates="samples")
artic_well_number = Column(String(8))
def to_string(self) -> str:
"""
string representing sample object
Returns:
str: string representing location and sample id
"""
return f"{self.well_number}: {self.ww_sample_full_id}"
def to_sub_dict(self) -> dict:
"""
gui friendly dictionary
Returns:
dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below
"""
if self.ct_n1 != None and self.ct_n2 != None:
# logger.debug(f"Using well info in name.")
name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})"
else:
# logger.debug(f"NOT using well info in name for: {self.ww_sample_full_id}")
name = self.ww_sample_full_id
return {
"well": self.well_number,
"name": name,
}
def to_hitpick(self) -> dict|None:
"""
Outputs a dictionary of locations if sample is positive
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
# dictionary to translate row letters into numbers
row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
try:
positive = any(["positive" in item for item in [self.n1_status, self.n2_status]])
except TypeError as e:
logger.error(f"Couldn't check positives for {self.rsl_number}. Looks like there isn't PCR data.")
return None
well_row = row_dict[self.well_number[0]]
well_col = self.well_number[1:]
# if positive:
# try:
# # The first character of the elution well is the row
# well_row = row_dict[self.elution_well[0]]
# # The remaining charagers are the columns
# well_col = self.elution_well[1:]
# except TypeError as e:
# logger.error(f"This sample doesn't have elution plate info.")
# return None
return dict(name=self.ww_sample_full_id,
row=well_row,
col=well_col,
positive=positive)
# else:
# return None
class BCSample(Base):
"""
base of bacterial culture sample
"""
__tablename__ = "_bc_samples"
id = Column(INTEGER, primary_key=True) #: primary key
well_number = Column(String(8)) #: location on parent plate
sample_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
organism = Column(String(64)) #: bacterial specimen
concentration = Column(String(16)) #:
rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_BCS_sample_id")) #: id of parent plate
rsl_plate = relationship("BacterialCulture", back_populates="samples") #: relationship to parent plate
def to_string(self) -> str:
"""
string representing object
Returns:
str: string representing well location, sample id and organism
"""
return f"{self.well_number}: {self.sample_id} - {self.organism}"
def to_sub_dict(self) -> dict:
"""
gui friendly dictionary
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
"""
return {
"well": self.well_number,
"name": f"{self.sample_id} - ({self.organism})",
}
def to_hitpick(self) -> dict|None:
"""
Outputs a dictionary of locations
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
# dictionary to translate row letters into numbers
row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
well_row = row_dict[self.well_number[0]]
# The remaining charagers are the columns
well_col = self.well_number[1:]
return dict(name=self.sample_id,
row=well_row,
col=well_col,
positive=False)

View File

@@ -3,12 +3,15 @@ Models for the main submission types.
'''
import math
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT
from sqlalchemy.orm import relationship
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, BOOLEAN
from sqlalchemy.orm import relationship, validates
import logging
import json
from json.decoder import JSONDecodeError
from math import ceil
from sqlalchemy.ext.associationproxy import association_proxy
import uuid
from . import Base
logger = logging.getLogger(f"submissions.{__name__}")
@@ -40,6 +43,15 @@ class BasicSubmission(Base):
uploaded_by = Column(String(32)) #: user name of person who submitted the submission to the database.
comment = Column(JSON)
submission_sample_associations = relationship(
"SubmissionSampleAssociation",
back_populates="submission",
cascade="all, delete-orphan",
)
# association proxy of "user_keyword_associations" collection
# to "keyword" attribute
samples = association_proxy("submission_sample_associations", "sample")
# Allows for subclassing into ex. BacterialCulture, Wastewater, etc.
__mapper_args__ = {
"polymorphic_identity": "basic_submission",
@@ -47,6 +59,9 @@ class BasicSubmission(Base):
"with_polymorphic": "*",
}
def __repr__(self):
return f"{self.submission_type}Submission({self.rsl_plate_num})"
def to_string(self) -> str:
"""
string presenting basic submission
@@ -64,6 +79,7 @@ class BasicSubmission(Base):
dict: dictionary used in submissions summary
"""
# get lab from nested organization object
try:
sub_lab = self.submitting_lab.name
except AttributeError:
@@ -90,10 +106,20 @@ class BasicSubmission(Base):
except Exception as e:
logger.error(f"We got an error retrieving reagents: {e}")
reagents = None
try:
samples = [item.to_sub_dict() for item in self.samples]
except:
samples = None
# try:
# samples = [item.sample.to_sub_dict(item.__dict__()) for item in self.submission_sample_associations]
# except Exception as e:
# logger.error(f"Problem making list of samples: {e}")
# samples = None
samples = []
for item in self.submission_sample_associations:
sample = item.sample.to_sub_dict(submission_rsl=self.rsl_plate_num)
# try:
# sample['well'] = f"{row_map[item.row]}{item.column}"
# except KeyError as e:
# logger.error(f"Unable to find row {item.row} in row_map.")
# sample['well'] = None
samples.append(sample)
try:
comments = self.comment
except:
@@ -115,11 +141,8 @@ class BasicSubmission(Base):
"ext_info": ext_info,
"comments": comments
}
# logger.debug(f"{self.rsl_plate_num} extraction: {output['Extraction Status']}")
# logger.debug(f"Output dict: {output}")
return output
def report_dict(self) -> dict:
"""
dictionary used in creating reports
@@ -141,13 +164,6 @@ class BasicSubmission(Base):
ext_kit = self.extraction_kit.name
except AttributeError:
ext_kit = None
# get extraction kit cost from nested kittype object
# depreciated as it will change kit cost overtime
# try:
# cost = self.extraction_kit.cost_per_run
# except AttributeError:
# cost = None
output = {
"id": self.id,
"Plate Number": self.rsl_plate_num,
@@ -168,24 +184,47 @@ class BasicSubmission(Base):
except Exception as e:
logger.error(f"Column count error: {e}")
# cols_count_24 = ceil(int(self.sample_count) / 3)
try:
self.run_cost = self.extraction_kit.constant_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
except Exception as e:
logger.error(f"Calculation error: {e}")
if all(item == 0.0 for item in [self.extraction_kit.constant_cost, self.extraction_kit.mutable_cost_column, self.extraction_kit.mutable_cost_sample]):
try:
self.run_cost = self.extraction_kit.cost_per_run
except Exception as e:
logger.error(f"Calculation error: {e}")
else:
try:
self.run_cost = self.extraction_kit.constant_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
except Exception as e:
logger.error(f"Calculation error: {e}")
def calculate_column_count(self):
columns = [int(sample.well_number[-2:]) for sample in self.samples]
logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}")
return max(columns)
logger.debug(f"Here's the samples: {self.samples}")
# columns = [int(sample.well_number[-2:]) for sample in self.samples]
columns = [assoc.column for assoc in self.submission_sample_associations]
logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}")
return max(columns)
def hitpick_plate(self, plate_number:int|None=None) -> list:
output_list = []
for assoc in self.submission_sample_associations:
samp = assoc.sample.to_hitpick(submission_rsl=self.rsl_plate_num)
if samp != None:
if plate_number != None:
samp['plate_number'] = plate_number
samp['row'] = assoc.row
samp['column'] = assoc.column
samp['plate_name'] = self.rsl_plate_num
output_list.append(samp)
else:
continue
return output_list
# Below are the custom submission types
class BacterialCulture(BasicSubmission):
class BacterialCulture(BasicSubmission):
"""
derivative submission type from BasicSubmission
"""
controls = relationship("Control", back_populates="submission", uselist=True) #: A control sample added to submission
samples = relationship("BCSample", back_populates="rsl_plate", uselist=True)
# samples = relationship("BCSample", back_populates="rsl_plate", uselist=True)
__mapper_args__ = {"polymorphic_identity": "bacterial_culture", "polymorphic_load": "inline"}
def to_dict(self) -> dict:
@@ -197,26 +236,13 @@ class BacterialCulture(BasicSubmission):
"""
output = super().to_dict()
output['controls'] = [item.to_sub_dict() for item in self.controls]
return output
# def calculate_base_cost(self):
# try:
# cols_count_96 = ceil(int(self.sample_count) / 8)
# except Exception as e:
# logger.error(f"Column count error: {e}")
# # cols_count_24 = ceil(int(self.sample_count) / 3)
# try:
# self.run_cost = self.extraction_kit.constant_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
# except Exception as e:
# logger.error(f"Calculation error: {e}")
return output
class Wastewater(BasicSubmission):
"""
derivative submission type from BasicSubmission
"""
samples = relationship("WWSample", back_populates="rsl_plate", uselist=True)
# samples = relationship("WWSample", back_populates="rsl_plate", uselist=True)
pcr_info = Column(JSON)
# ww_sample_id = Column(String, ForeignKey("_ww_samples.id", ondelete="SET NULL", name="fk_WW_sample_id"))
__mapper_args__ = {"polymorphic_identity": "wastewater", "polymorphic_load": "inline"}
@@ -235,23 +261,11 @@ class Wastewater(BasicSubmission):
pass
return output
# def calculate_base_cost(self):
# try:
# cols_count_96 = ceil(int(self.sample_count) / 8) + 1 #: Adding in one column to account for 24 samples + ext negatives
# except Exception as e:
# logger.error(f"Column count error: {e}")
# # cols_count_24 = ceil(int(self.sample_count) / 3)
# try:
# self.run_cost = self.extraction_kit.constant_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
# except Exception as e:
# logger.error(f"Calculation error: {e}")
class WastewaterArtic(BasicSubmission):
"""
derivative submission type for artic wastewater
"""
samples = relationship("WWSample", back_populates="artic_rsl_plate", uselist=True)
# samples = relationship("WWSample", back_populates="artic_rsl_plate", uselist=True)
# Can it use the pcr_info from the wastewater? Cause I can't define pcr_info here due to conflicts with that
# Not necessary because we don't get any results for this procedure.
__mapper_args__ = {"polymorphic_identity": "wastewater_artic", "polymorphic_load": "inline"}
@@ -273,3 +287,252 @@ class WastewaterArtic(BasicSubmission):
self.run_cost = const_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
except Exception as e:
logger.error(f"Calculation error: {e}")
class BasicSample(Base):
"""
Base of basic sample which polymorphs into BCSample and WWSample
"""
__tablename__ = "_samples"
id = Column(INTEGER, primary_key=True) #: primary key
submitter_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
sample_type = Column(String(32))
sample_submission_associations = relationship(
"SubmissionSampleAssociation",
back_populates="sample",
cascade="all, delete-orphan",
)
__mapper_args__ = {
"polymorphic_identity": "basic_sample",
"polymorphic_on": sample_type,
"with_polymorphic": "*",
}
submissions = association_proxy("sample_submission_associations", "submission")
@validates('submitter_id')
def create_id(self, key, value):
logger.debug(f"validating sample_id of: {value}")
if value == None:
return uuid.uuid4().hex.upper()
else:
return value
def __repr__(self) -> str:
return f"{self.sample_type}Sample({self.submitter_id})"
def to_sub_dict(self, submission_rsl:str) -> dict:
row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"}
self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
sample = {}
try:
sample['well'] = f"{row_map[self.assoc.row]}{self.assoc.column}"
except KeyError as e:
logger.error(f"Unable to find row {self.assoc.row} in row_map.")
sample['well'] = None
sample['name'] = self.submitter_id
return sample
def to_hitpick(self, submission_rsl:str) -> dict|None:
"""
Outputs a dictionary of locations
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
# dictionary to translate row letters into numbers
# row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
# well_row = row_dict[self.well_number[0]]
# The remaining charagers are the columns
# well_col = self.well_number[1:]
return dict(name=self.submitter_id,
# row=well_row,
# col=well_col,
positive=False)
class WastewaterSample(BasicSample):
"""
Base wastewater sample
"""
# __tablename__ = "_ww_samples"
# id = Column(INTEGER, primary_key=True) #: primary key
ww_processing_num = Column(String(64)) #: wastewater processing number
# ww_sample_full_id = Column(String(64), nullable=False, unique=True)
rsl_number = Column(String(64)) #: rsl plate identification number
# rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
# rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date submission received
# well_number = Column(String(8)) #: location on 96 well plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
testing_type = Column(String(64))
site_status = Column(String(64))
notes = Column(String(2000))
# ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
# ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
# n1_status = Column(String(32))
# n2_status = Column(String(32))
seq_submitted = Column(BOOLEAN())
ww_seq_run_id = Column(String(64))
# sample_type = Column(String(16))
# pcr_results = Column(JSON)
well_24 = Column(String(8)) #: location on 24 well plate
# artic_rsl_plate = relationship("WastewaterArtic", back_populates="samples")
# artic_well_number = Column(String(8))
__mapper_args__ = {"polymorphic_identity": "wastewater_sample", "polymorphic_load": "inline"}
# def to_string(self) -> str:
# """
# string representing sample object
# Returns:
# str: string representing location and sample id
# """
# return f"{self.well_number}: {self.ww_sample_full_id}"
def to_sub_dict(self, submission_rsl:str) -> dict:
"""
Gui friendly dictionary. Inherited from BasicSample
This version will include PCR status.
Args:
submission_rsl (str): RSL plate number (passed down from the submission.to_dict() functino)
Returns:
dict: Alphanumeric well id and sample name
"""
# Get the relevant submission association for this sample
sample = super().to_sub_dict(submission_rsl=submission_rsl)
try:
check = self.assoc.ct_n1 != None and self.assoc.ct_n2 != None
except AttributeError as e:
check = False
if check:
logger.debug(f"Using well info in name.")
sample['name'] = f"{self.submitter_id}\n\t- ct N1: {'{:.2f}'.format(self.assoc.ct_n1)} ({self.assoc.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.assoc.ct_n2)} ({self.assoc.n2_status})"
else:
logger.error(f"Couldn't get the pcr info")
return sample
def to_hitpick(self, submission_rsl:str) -> dict|None:
"""
Outputs a dictionary of locations if sample is positive
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
sample = super().to_hitpick(submission_rsl=submission_rsl)
# dictionary to translate row letters into numbers
# row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
try:
sample['positive'] = any(["positive" in item for item in [self.assoc.n1_status, self.assoc.n2_status]])
except (TypeError, AttributeError) as e:
logger.error(f"Couldn't check positives for {self.rsl_number}. Looks like there isn't PCR data.")
# return None
# positive = False
# well_row = row_dict[self.well_number[0]]
# well_col = self.well_number[1:]
# if positive:
# try:
# # The first character of the elution well is the row
# well_row = row_dict[self.elution_well[0]]
# # The remaining charagers are the columns
# well_col = self.elution_well[1:]
# except TypeError as e:
# logger.error(f"This sample doesn't have elution plate info.")
# return None
return sample
class BacterialCultureSample(BasicSample):
"""
base of bacterial culture sample
"""
# __tablename__ = "_bc_samples"
# id = Column(INTEGER, primary_key=True) #: primary key
# well_number = Column(String(8)) #: location on parent plate
# sample_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
organism = Column(String(64)) #: bacterial specimen
concentration = Column(String(16)) #:
# sample_type = Column(String(16))
# rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_BCS_sample_id")) #: id of parent plate
# rsl_plate = relationship("BacterialCulture", back_populates="samples") #: relationship to parent plate
__mapper_args__ = {"polymorphic_identity": "bacterial_culture_sample", "polymorphic_load": "inline"}
# def to_string(self) -> str:
# """
# string representing object
# Returns:
# str: string representing well location, sample id and organism
# """
# return f"{self.well_number}: {self.sample_id} - {self.organism}"
def to_sub_dict(self, submission_rsl:str) -> dict:
"""
gui friendly dictionary
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
"""
sample = super().to_sub_dict(submission_rsl=submission_rsl)
sample['name'] = f"{self.submitter_id} - ({self.organism})"
# return {
# # "well": self.well_number,
# "name": f"{self.submitter_id} - ({self.organism})",
# }
return sample
class SubmissionSampleAssociation(Base):
"""
table containing submission/sample associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
__tablename__ = "_submission_sample"
sample_id = Column(INTEGER, ForeignKey("_samples.id"), primary_key=True)
submission_id = Column(INTEGER, ForeignKey("_submissions.id"), primary_key=True)
row = Column(INTEGER)
column = Column(INTEGER)
submission = relationship(BasicSubmission, back_populates="submission_sample_associations")
# reference to the "ReagentType" object
# sample = relationship("BasicSample")
sample = relationship(BasicSample, back_populates="sample_submission_associations")
base_sub_type = Column(String)
# """Refers to the type of parent."""
__mapper_args__ = {
"polymorphic_identity": "basic_association",
"polymorphic_on": base_sub_type,
"with_polymorphic": "*",
}
def __init__(self, submission:BasicSubmission=None, sample:BasicSample=None, row:int=1, column:int=1):
self.submission = submission
self.sample = sample
self.row = row
self.column = column
class WastewaterAssociation(SubmissionSampleAssociation):
ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
n1_status = Column(String(32))
n2_status = Column(String(32))
pcr_results = Column(JSON)
__mapper_args__ = {"polymorphic_identity": "wastewater", "polymorphic_load": "inline"}