Working new version.

This commit is contained in:
Landon Wark
2023-09-11 13:45:10 -05:00
parent e0b80f6c7a
commit 5a978c9bff
15 changed files with 417 additions and 743 deletions

View File

@@ -9,5 +9,4 @@ metadata = Base.metadata
from .controls import Control, ControlType
from .kits import KitType, ReagentType, Reagent, Discount, KitTypeReagentTypeAssociation, SubmissionType, SubmissionTypeKitTypeAssociation
from .organizations import Organization, Contact
# from .samples import WWSample, BCSample, BasicSample
from .submissions import BasicSubmission, BacterialCulture, Wastewater, WastewaterArtic, WastewaterSample, BacterialCultureSample, BasicSample, SubmissionSampleAssociation, WastewaterAssociation

View File

@@ -12,16 +12,6 @@ import logging
logger = logging.getLogger(f'submissions.{__name__}')
# # Table containing reagenttype-kittype relationships
# reagenttypes_kittypes = Table("_reagentstypes_kittypes", Base.metadata,
# Column("reagent_types_id", INTEGER, ForeignKey("_reagent_types.id")),
# Column("kits_id", INTEGER, ForeignKey("_kits.id")),
# # The entry will look like ["Bacteria Culture":{"row":1, "column":4}]
# Column("uses", JSON),
# # is the reagent required for that kit?
# Column("required", INTEGER)
# )
reagenttypes_reagents = Table("_reagenttypes_reagents", Base.metadata, Column("reagent_id", INTEGER, ForeignKey("_reagents.id")), Column("reagenttype_id", INTEGER, ForeignKey("_reagent_types.id")))
@@ -34,13 +24,7 @@ class KitType(Base):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64), unique=True) #: name of kit
submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for
# used_for = Column(JSON) #: list of names of sample types this kit can process
# used_for = relationship("SubmissionType", back_populates="extraction_kits", uselist=True, secondary=submissiontype_kittypes)
# cost_per_run = Column(FLOAT(2)) #: dollar amount for each full run of this kit NOTE: depreciated, use the constant and mutable costs instead
# reagent_types = relationship("ReagentType", back_populates="kits", uselist=True, secondary=reagenttypes_kittypes) #: reagent types this kit contains
# reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id", ondelete='SET NULL', use_alter=True, name="fk_KT_reagentstype_id")) #: joined reagent type id
# kit_reagenttype_association =
kit_reagenttype_associations = relationship(
"KitTypeReagentTypeAssociation",
back_populates="kit_type",
@@ -51,7 +35,6 @@ class KitType(Base):
# to "keyword" attribute
reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type")
kit_submissiontype_associations = relationship(
"SubmissionTypeKitTypeAssociation",
back_populates="kit_type",
@@ -60,7 +43,6 @@ class KitType(Base):
used_for = association_proxy("kit_submissiontype_associations", "submission_type")
def __repr__(self) -> str:
return f"<KitType({self.name})>"
@@ -74,6 +56,15 @@ class KitType(Base):
return self.name
def get_reagents(self, required:bool=False) -> list:
"""
Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation.
Args:
required (bool, optional): If true only return required types. Defaults to False.
Returns:
list: List of ReagentTypes
"""
if required:
return [item.reagent_type for item in self.kit_reagenttype_associations if item.required == 1]
else:
@@ -81,14 +72,24 @@ class KitType(Base):
def construct_xl_map_for_use(self, use:str) -> dict:
# map = self.used_for[use]
"""
Creates map of locations in excel workbook for a SubmissionType
Args:
use (str): Submissiontype.name
Returns:
dict: Dictionary containing information locations.
"""
map = {}
# Get all KitTypeReagentTypeAssociation for SubmissionType
assocs = [item for item in self.kit_reagenttype_associations if use in item.uses]
for assoc in assocs:
try:
map[assoc.reagent_type.name] = assoc.uses[use]
except TypeError:
continue
# Get SubmissionType info map
try:
st_assoc = [item for item in self.used_for if use == item.name][0]
map['info'] = st_assoc.info_map
@@ -106,7 +107,6 @@ class KitTypeReagentTypeAssociation(Base):
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
uses = Column(JSON)
required = Column(INTEGER)
# reagent_type_name = Column(INTEGER, ForeignKey("_reagent_types.name"))
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations")
@@ -139,11 +139,8 @@ class ReagentType(Base):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of reagent type
# kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete="SET NULL", use_alter=True, name="fk_RT_kits_id")) #: id of joined kit type
# kits = relationship("KitType", back_populates="reagent_types", uselist=True, foreign_keys=[kit_id]) #: kits this reagent is used in
instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
# required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit
last_used = Column(String(32)) #: last used lot number of this type of reagent
@validates('required')
@@ -202,8 +199,10 @@ class Reagent(Base):
dict: gui friendly dictionary
"""
if extraction_kit != None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
try:
reagent_role = list(set(self.type).intersection(extraction_kit.reagent_types))[0]
# Most will be able to fall back to first ReagentType in itself because most will only have 1.
except:
reagent_role = self.type[0]
else:
@@ -212,9 +211,9 @@ class Reagent(Base):
rtype = reagent_role.name.replace("_", " ").title()
except AttributeError:
rtype = "Unknown"
# Calculate expiry with EOL from ReagentType
try:
place_holder = self.expiry + reagent_role.eol_ext
# logger.debug(f"EOL_ext for {self.lot} -- {self.expiry} + {self.type.eol_ext} = {place_holder}")
except TypeError as e:
place_holder = date.today()
logger.debug(f"We got a type error setting {self.lot} expiry: {e}. setting to today for testing")
@@ -227,9 +226,28 @@ class Reagent(Base):
"expiry": place_holder.strftime("%Y-%m-%d")
}
def to_reagent_dict(self) -> dict:
def to_reagent_dict(self, extraction_kit:KitType=None) -> dict:
"""
Returns basic reagent dictionary.
Returns:
dict: Basic reagent dictionary of 'type', 'lot', 'expiry'
"""
if extraction_kit != None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
try:
reagent_role = list(set(self.type).intersection(extraction_kit.reagent_types))[0]
# Most will be able to fall back to first ReagentType in itself because most will only have 1.
except:
reagent_role = self.type[0]
else:
reagent_role = self.type[0]
try:
rtype = reagent_role.name
except AttributeError:
rtype = "Unknown"
return {
"type": type,
"type": rtype,
"lot": self.lot,
"expiry": self.expiry.strftime("%Y-%m-%d")
}
@@ -249,12 +267,14 @@ class Discount(Base):
amount = Column(FLOAT(2))
class SubmissionType(Base):
"""
Abstract of types of submissions.
"""
__tablename__ = "_submission_types"
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(128), unique=True) #: name of submission type
info_map = Column(JSON)
info_map = Column(JSON) #: Where basic information is found in the excel workbook corresponding to this type.
instances = relationship("BasicSubmission", backref="submission_type")
submissiontype_kit_associations = relationship(
@@ -269,14 +289,15 @@ class SubmissionType(Base):
return f"<SubmissionType({self.name})>"
class SubmissionTypeKitTypeAssociation(Base):
"""
Abstract of relationship between kits and their submission type.
"""
__tablename__ = "_submissiontypes_kittypes"
submission_types_id = Column(INTEGER, ForeignKey("_submission_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
mutable_cost_column = Column(FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc)
mutable_cost_sample = Column(FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc)
constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc)
# reagent_type_name = Column(INTEGER, ForeignKey("_reagent_types.name"))
kit_type = relationship(KitType, back_populates="kit_submissiontype_associations")

View File

@@ -21,7 +21,6 @@ class Organization(Base):
submissions = relationship("BasicSubmission", back_populates="submitting_lab") #: submissions this organization has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization", secondary=orgs_contacts) #: contacts involved with this org
# contact_ids = Column(INTEGER, ForeignKey("_contacts.id", ondelete="SET NULL", name="fk_org_contact_id")) #: contact ids of this organization
def __str__(self) -> str:
"""
@@ -47,5 +46,4 @@ class Contact(Base):
email = Column(String(64)) #: contact email
phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization
# organization_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete="SET NULL", name="fk_contact_org_id")) #: joined organization ids

View File

@@ -3,7 +3,7 @@ Models for the main submission types.
'''
import math
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, case
from sqlalchemy.orm import relationship, validates
import logging
import json
@@ -11,10 +11,10 @@ from json.decoder import JSONDecodeError
from math import ceil
from sqlalchemy.ext.associationproxy import association_proxy
import uuid
from . import Base
from pandas import Timestamp
from dateutil.parser import parse
import pprint
from tools import check_not_nan
logger = logging.getLogger(f"submissions.{__name__}")
@@ -23,7 +23,7 @@ reagents_submissions = Table("_reagents_submissions", Base.metadata, Column("rea
class BasicSubmission(Base):
"""
Base of basic submission which polymorphs into BacterialCulture and Wastewater
Concrete of basic submission which polymorphs into BacterialCulture and Wastewater
"""
__tablename__ = "_submissions"
@@ -36,7 +36,6 @@ class BasicSubmission(Base):
sample_count = Column(INTEGER) #: Number of samples in the submission
extraction_kit = relationship("KitType", back_populates="submissions") #: The extraction kit used
extraction_kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete="SET NULL", name="fk_BS_extkit_id"))
# submission_type = Column(String(32)) #: submission type (should be string in D3 of excel sheet)
submission_type_name = Column(String, ForeignKey("_submission_types.name", ondelete="SET NULL", name="fk_BS_subtype_name"))
technician = Column(String(64)) #: initials of processing tech(s)
# Move this into custom types?
@@ -83,7 +82,6 @@ class BasicSubmission(Base):
dict: dictionary used in submissions summary
"""
# get lab from nested organization object
try:
sub_lab = self.submitting_lab.name
except AttributeError:
@@ -105,24 +103,16 @@ class BasicSubmission(Base):
except JSONDecodeError as e:
ext_info = None
logger.debug(f"Json error in {self.rsl_plate_num}: {e}")
# Updated 2023-09 to use the extraction kit to pull reagents.
try:
reagents = [item.to_sub_dict(extraction_kit=self.extraction_kit) for item in self.reagents]
except Exception as e:
logger.error(f"We got an error retrieving reagents: {e}")
reagents = None
# try:
# samples = [item.sample.to_sub_dict(item.__dict__()) for item in self.submission_sample_associations]
# except Exception as e:
# logger.error(f"Problem making list of samples: {e}")
# samples = None
samples = []
# Updated 2023-09 to get sample association with plate number
for item in self.submission_sample_associations:
sample = item.sample.to_sub_dict(submission_rsl=self.rsl_plate_num)
# try:
# sample['well'] = f"{row_map[item.row]}{item.column}"
# except KeyError as e:
# logger.error(f"Unable to find row {item.row} in row_map.")
# sample['well'] = None
samples.append(sample)
try:
comments = self.comment
@@ -171,7 +161,7 @@ class BasicSubmission(Base):
output = {
"id": self.id,
"Plate Number": self.rsl_plate_num,
"Submission Type": self.submission_type.replace("_", " ").title(),
"Submission Type": self.submission_type_name.replace("_", " ").title(),
"Submitter Plate Number": self.submitter_plate_num,
"Submitted Date": self.submitted_date.strftime("%Y-%m-%d"),
"Submitting Lab": sub_lab,
@@ -182,16 +172,18 @@ class BasicSubmission(Base):
return output
def calculate_base_cost(self):
"""
Calculates cost of the plate
"""
# Calculate number of columns based on largest column number
try:
# cols_count_96 = ceil(int(self.sample_count) / 8)
cols_count_96 = self.calculate_column_count()
except Exception as e:
logger.error(f"Column count error: {e}")
# cols_count_24 = ceil(int(self.sample_count) / 3)
logger.debug(f"Pre-association check. {pprint.pformat(self.__dict__)}")
# Get kit associated with this submission
assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type][0]
logger.debug(f"Came up with association: {assoc}")
# if all(item == 0.0 for item in [self.extraction_kit.constant_cost, self.extraction_kit.mutable_cost_column, self.extraction_kit.mutable_cost_sample]):
# If every individual cost is 0 this is probably an old plate.
if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]):
try:
self.run_cost = self.extraction_kit.cost_per_run
@@ -203,14 +195,28 @@ class BasicSubmission(Base):
except Exception as e:
logger.error(f"Calculation error: {e}")
def calculate_column_count(self):
def calculate_column_count(self) -> int:
"""
Calculate the number of columns in this submission
Returns:
int: largest column number
"""
logger.debug(f"Here's the samples: {self.samples}")
# columns = [int(sample.well_number[-2:]) for sample in self.samples]
columns = [assoc.column for assoc in self.submission_sample_associations]
logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}")
return max(columns)
def hitpick_plate(self, plate_number:int|None=None) -> list:
"""
Returns positve sample locations for plate
Args:
plate_number (int | None, optional): Plate id. Defaults to None.
Returns:
list: list of htipick dictionaries for each sample
"""
output_list = []
for assoc in self.submission_sample_associations:
samp = assoc.sample.to_hitpick(submission_rsl=self.rsl_plate_num)
@@ -232,7 +238,6 @@ class BacterialCulture(BasicSubmission):
derivative submission type from BasicSubmission
"""
controls = relationship("Control", back_populates="submission", uselist=True) #: A control sample added to submission
# samples = relationship("BCSample", back_populates="rsl_plate", uselist=True)
__mapper_args__ = {"polymorphic_identity": "Bacterial Culture", "polymorphic_load": "inline"}
def to_dict(self) -> dict:
@@ -250,11 +255,9 @@ class Wastewater(BasicSubmission):
"""
derivative submission type from BasicSubmission
"""
# samples = relationship("WWSample", back_populates="rsl_plate", uselist=True)
pcr_info = Column(JSON)
ext_technician = Column(String(64))
pcr_technician = Column(String(64))
# ww_sample_id = Column(String, ForeignKey("_ww_samples.id", ondelete="SET NULL", name="fk_WW_sample_id"))
__mapper_args__ = {"polymorphic_identity": "Wastewater", "polymorphic_load": "inline"}
def to_dict(self) -> dict:
@@ -276,10 +279,7 @@ class WastewaterArtic(BasicSubmission):
"""
derivative submission type for artic wastewater
"""
# samples = relationship("WWSample", back_populates="artic_rsl_plate", uselist=True)
# Can it use the pcr_info from the wastewater? Cause I can't define pcr_info here due to conflicts with that
# Not necessary because we don't get any results for this procedure.
__mapper_args__ = {"polymorphic_identity": "wastewater_artic", "polymorphic_load": "inline"}
__mapper_args__ = {"polymorphic_identity": "Wastewater Artic", "polymorphic_load": "inline"}
def calculate_base_cost(self):
"""
@@ -290,12 +290,13 @@ class WastewaterArtic(BasicSubmission):
cols_count_96 = ceil(int(self.sample_count) / 8)
except Exception as e:
logger.error(f"Column count error: {e}")
assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type][0]
# Since we have multiple output plates per submission form, the constant cost will have to reflect this.
output_plate_count = math.ceil(int(self.sample_count) / 16)
logger.debug(f"Looks like we have {output_plate_count} output plates.")
const_cost = self.extraction_kit.constant_cost * output_plate_count
const_cost = assoc.constant_cost * output_plate_count
try:
self.run_cost = const_cost + (self.extraction_kit.mutable_cost_column * cols_count_96) + (self.extraction_kit.mutable_cost_sample * int(self.sample_count))
self.run_cost = const_cost + (assoc.mutable_cost_column * cols_count_96) + (assoc.mutable_cost_sample * int(self.sample_count))
except Exception as e:
logger.error(f"Calculation error: {e}")
@@ -318,7 +319,15 @@ class BasicSample(Base):
__mapper_args__ = {
"polymorphic_identity": "basic_sample",
"polymorphic_on": sample_type,
# "polymorphic_on": sample_type,
"polymorphic_on": case(
[
(sample_type == "Wastewater Sample", "Wastewater Sample"),
(sample_type == "Wastewater Artic Sample", "Wastewater Sample"),
(sample_type == "Bacterial Culture Sample", "Bacterial Culture Sample"),
],
else_="basic_sample"
),
"with_polymorphic": "*",
}
@@ -335,7 +344,23 @@ class BasicSample(Base):
def __repr__(self) -> str:
return f"<{self.sample_type.replace('_', ' ').title(). replace(' ', '')}({self.submitter_id})>"
def set_attribute(self, name, value):
# logger.debug(f"Setting {name} to {value}")
try:
setattr(self, name, value)
except AttributeError:
logger.error(f"Attribute {name} not found")
def to_sub_dict(self, submission_rsl:str) -> dict:
"""
Returns a dictionary of locations.
Args:
submission_rsl (str): Submission RSL number.
Returns:
dict: 'well' and sample submitter_id as 'name'
"""
row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"}
self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
sample = {}
@@ -347,73 +372,30 @@ class BasicSample(Base):
sample['name'] = self.submitter_id
return sample
def to_hitpick(self, submission_rsl:str) -> dict|None:
def to_hitpick(self, submission_rsl:str|None=None) -> dict|None:
"""
Outputs a dictionary of locations
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
# dictionary to translate row letters into numbers
# row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
# well_row = row_dict[self.well_number[0]]
# The remaining charagers are the columns
# well_col = self.well_number[1:]
return dict(name=self.submitter_id,
# row=well_row,
# col=well_col,
positive=False)
# self.assoc = [item for item in self.sample_submission_associations if item.submission.rsl_plate_num==submission_rsl][0]
# Since there is no PCR, negliable result is necessary.
return dict(name=self.submitter_id, positive=False)
class WastewaterSample(BasicSample):
"""
Base wastewater sample
Derivative wastewater sample
"""
# __tablename__ = "_ww_samples"
# id = Column(INTEGER, primary_key=True) #: primary key
ww_processing_num = Column(String(64)) #: wastewater processing number
ww_sample_full_id = Column(String(64))
ww_full_sample_id = Column(String(64))
rsl_number = Column(String(64)) #: rsl plate identification number
# rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
# rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date sample collected
received_date = Column(TIMESTAMP) #: Date sample received
# well_number = Column(String(8)) #: location on 96 well plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
# testing_type = Column(String(64))
# site_status = Column(String(64))
notes = Column(String(2000))
# ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
# ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
# n1_status = Column(String(32))
# n2_status = Column(String(32))
# seq_submitted = Column(BOOLEAN())
# ww_seq_run_id = Column(String(64))
# sample_type = Column(String(16))
# pcr_results = Column(JSON)
sample_location = Column(String(8)) #: location on 24 well plate
# artic_rsl_plate = relationship("WastewaterArtic", back_populates="samples")
# artic_well_number = Column(String(8))
__mapper_args__ = {"polymorphic_identity": "Wastewater Sample", "polymorphic_load": "inline"}
# def to_string(self) -> str:
# """
# string representing sample object
# Returns:
# str: string representing location and sample id
# """
# return f"{self.well_number}: {self.ww_sample_full_id}"
# @validates("received-date")
# def convert_rdate_time(self, key, value):
# if isinstance(value, Timestamp):
# return value.date()
# return value
@validates("collected-date")
def convert_cdate_time(self, key, value):
@@ -423,31 +405,68 @@ class WastewaterSample(BasicSample):
if isinstance(value, str):
return parse(value)
return value
@validates("rsl_number")
def use_submitter_id(self, key, value):
logger.debug(f"Validating {key}: {value}")
return value or self.submitter_id
# @collection_date.setter
# def collection_date(self, value):
# match value:
# case Timestamp():
# self.collection_date = value.date()
# case str():
# self.collection_date = parse(value)
# case _:
# self.collection_date = value
# def __init__(self, **kwargs):
# # Had a problem getting collection date from excel as text only.
# if 'collection_date' in kwargs.keys():
# logger.debug(f"Got collection_date: {kwargs['collection_date']}. Attempting parse.")
# if isinstance(kwargs['collection_date'], str):
# logger.debug(f"collection_date is a string...")
# kwargs['collection_date'] = parse(kwargs['collection_date'])
# logger.debug(f"output is {kwargs['collection_date']}")
# # Due to the plate map being populated with RSL numbers, we have to do some shuffling.
# try:
# kwargs['rsl_number'] = kwargs['submitter_id']
# except KeyError as e:
# logger.error(f"Error using {kwargs} for submitter_id")
# try:
# check = check_not_nan(kwargs['ww_full_sample_id'])
# except KeyError:
# logger.error(f"Error using {kwargs} for ww_full_sample_id")
# check = False
# if check:
# kwargs['submitter_id'] = kwargs["ww_full_sample_id"]
# super().__init__(**kwargs)
def __init__(self, **kwargs):
if 'collection_date' in kwargs.keys():
logger.debug(f"Got collection_date: {kwargs['collection_date']}. Attempting parse.")
if isinstance(kwargs['collection_date'], str):
logger.debug(f"collection_date is a string...")
kwargs['collection_date'] = parse(kwargs['collection_date'])
logger.debug(f"output is {kwargs['collection_date']}")
super().__init__(**kwargs)
def set_attribute(self, name:str, value):
"""
Set an attribute of this object. Extends parent.
Args:
name (str): _description_
value (_type_): _description_
"""
# Due to the plate map being populated with RSL numbers, we have to do some shuffling.
# logger.debug(f"Input - {name}:{value}")
match name:
case "submitter_id":
if self.submitter_id != None:
return
else:
super().set_attribute("rsl_number", value)
case "ww_full_sample_id":
if value != None:
super().set_attribute(name, value)
name = "submitter_id"
case 'collection_date':
if isinstance(value, str):
logger.debug(f"collection_date {value} is a string. Attempting parse...")
value = parse(value)
case "rsl_number":
if value == None:
value = self.submitter_id
# logger.debug(f"Output - {name}:{value}")
super().set_attribute(name, value)
def to_sub_dict(self, submission_rsl:str) -> dict:
"""
Gui friendly dictionary. Inherited from BasicSample
Gui friendly dictionary. Extends parent method.
This version will include PCR status.
Args:
@@ -458,15 +477,13 @@ class WastewaterSample(BasicSample):
"""
# Get the relevant submission association for this sample
sample = super().to_sub_dict(submission_rsl=submission_rsl)
# check if PCR data exists.
try:
check = self.assoc.ct_n1 != None and self.assoc.ct_n2 != None
except AttributeError as e:
check = False
if check:
# logger.debug(f"Using well info in name.")
sample['name'] = f"{self.submitter_id}\n\t- ct N1: {'{:.2f}'.format(self.assoc.ct_n1)} ({self.assoc.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.assoc.ct_n2)} ({self.assoc.n2_status})"
# else:
# logger.error(f"Couldn't get the pcr info")
return sample
def to_hitpick(self, submission_rsl:str) -> dict|None:
@@ -477,67 +494,30 @@ class WastewaterSample(BasicSample):
dict: dictionary of sample id, row and column in elution plate
"""
sample = super().to_hitpick(submission_rsl=submission_rsl)
# dictionary to translate row letters into numbers
# row_dict = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
# if either n1 or n2 is positive, include this sample
try:
sample['positive'] = any(["positive" in item for item in [self.assoc.n1_status, self.assoc.n2_status]])
except (TypeError, AttributeError) as e:
logger.error(f"Couldn't check positives for {self.rsl_number}. Looks like there isn't PCR data.")
# return None
# positive = False
# well_row = row_dict[self.well_number[0]]
# well_col = self.well_number[1:]
# if positive:
# try:
# # The first character of the elution well is the row
# well_row = row_dict[self.elution_well[0]]
# # The remaining charagers are the columns
# well_col = self.elution_well[1:]
# except TypeError as e:
# logger.error(f"This sample doesn't have elution plate info.")
# return None
return sample
class BacterialCultureSample(BasicSample):
"""
base of bacterial culture sample
"""
# __tablename__ = "_bc_samples"
# id = Column(INTEGER, primary_key=True) #: primary key
# well_number = Column(String(8)) #: location on parent plate
# sample_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
organism = Column(String(64)) #: bacterial specimen
concentration = Column(String(16)) #:
# sample_type = Column(String(16))
# rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_BCS_sample_id")) #: id of parent plate
# rsl_plate = relationship("BacterialCulture", back_populates="samples") #: relationship to parent plate
concentration = Column(String(16)) #: sample concentration
__mapper_args__ = {"polymorphic_identity": "Bacterial Culture Sample", "polymorphic_load": "inline"}
# def to_string(self) -> str:
# """
# string representing object
# Returns:
# str: string representing well location, sample id and organism
# """
# return f"{self.well_number}: {self.sample_id} - {self.organism}"
def to_sub_dict(self, submission_rsl:str) -> dict:
"""
gui friendly dictionary
gui friendly dictionary, extends parent method.
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
"""
sample = super().to_sub_dict(submission_rsl=submission_rsl)
sample['name'] = f"{self.submitter_id} - ({self.organism})"
# return {
# # "well": self.well_number,
# "name": f"{self.submitter_id} - ({self.organism})",
# }
return sample
class SubmissionSampleAssociation(Base):
@@ -548,18 +528,19 @@ class SubmissionSampleAssociation(Base):
__tablename__ = "_submission_sample"
sample_id = Column(INTEGER, ForeignKey("_samples.id"), nullable=False)
submission_id = Column(INTEGER, ForeignKey("_submissions.id"), primary_key=True)
row = Column(INTEGER, primary_key=True)
column = Column(INTEGER, primary_key=True)
row = Column(INTEGER, primary_key=True) #: row on the 96 well plate
column = Column(INTEGER, primary_key=True) #: column on the 96 well plate
# reference to the Submission object
submission = relationship(BasicSubmission, back_populates="submission_sample_associations")
# reference to the "ReagentType" object
# sample = relationship("BasicSample")
# reference to the Sample object
sample = relationship(BasicSample, back_populates="sample_submission_associations")
base_sub_type = Column(String)
# """Refers to the type of parent."""
# Refers to the type of parent.
# Hooooooo boy, polymorphic association type, now we're getting into the weeds!
__mapper_args__ = {
"polymorphic_identity": "basic_association",
"polymorphic_on": base_sub_type,
@@ -576,11 +557,14 @@ class SubmissionSampleAssociation(Base):
return f"<SubmissionSampleAssociation({self.submission.rsl_plate_num} & {self.sample.submitter_id})"
class WastewaterAssociation(SubmissionSampleAssociation):
"""
Derivative custom Wastewater/Submission Association... fancy.
"""
ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
n1_status = Column(String(32))
n2_status = Column(String(32))
pcr_results = Column(JSON)
n1_status = Column(String(32)) #: positive or negative for N1
n2_status = Column(String(32)) #: positive or negative for N2
pcr_results = Column(JSON) #: imported PCR status from QuantStudio
__mapper_args__ = {"polymorphic_identity": "wastewater", "polymorphic_load": "inline"}
__mapper_args__ = {"polymorphic_identity": "wastewater", "polymorphic_load": "inline"}