First working example.

This commit is contained in:
lwark
2025-05-05 14:47:50 -05:00
parent ea10798686
commit 5508f68bc8
7 changed files with 338 additions and 310 deletions

View File

@@ -24,28 +24,6 @@ Base: DeclarativeMeta = declarative_base()
logger = logging.getLogger(f"submissions.{__name__}")
class LogMixin(Base):
tracking_exclusion: ClassVar = ['artic_technician', 'submission_sample_associations',
'submission_reagent_associations', 'submission_equipment_associations',
'submission_tips_associations', 'contact_id', 'gel_info', 'gel_controls',
'source_plates']
__abstract__ = True
@property
def truncated_name(self):
name = str(self)
if len(name) > 64:
name = name.replace("<", "").replace(">", "")
if len(name) > 64:
# NOTE: As if re'agent'
name = name.replace("agent", "")
if len(name) > 64:
name = f"...{name[-61:]}"
return name
class BaseClass(Base):
"""
Abstract class to pass ctx values to all SQLAlchemy objects.
@@ -243,8 +221,10 @@ class BaseClass(Base):
Returns:
Any | List[Any]: Single result if limit = 1 or List if other.
"""
logger.debug(f"Kwargs: {kwargs}")
if model is None:
model = cls
logger.debug(f"Model: {model}")
if query is None:
query: Query = cls.__database_session__.query(model)
singles = model.get_default_info('singles')
@@ -477,6 +457,28 @@ class BaseClass(Base):
return output_date
class LogMixin(Base):
tracking_exclusion: ClassVar = ['artic_technician', 'submission_sample_associations',
'submission_reagent_associations', 'submission_equipment_associations',
'submission_tips_associations', 'contact_id', 'gel_info', 'gel_controls',
'source_plates']
__abstract__ = True
@property
def truncated_name(self):
name = str(self)
if len(name) > 64:
name = name.replace("<", "").replace(">", "")
if len(name) > 64:
# NOTE: As if re'agent'
name = name.replace("agent", "")
if len(name) > 64:
name = f"...{name[-61:]}"
return name
class ConfigItem(BaseClass):
"""
Key:JSON objects to store config settings in database.

View File

@@ -123,6 +123,9 @@ class Control(BaseClass):
controltype = relationship("ControlType", back_populates="instances",
foreign_keys=[controltype_name]) #: reference to parent control type
name = Column(String(255), unique=True) #: Sample ID
sample_id = Column(String, ForeignKey("_basicsample.id", ondelete="SET NULL",
name="fk_Cont_sample_id")) #: name of joined submission type
sample = relationship("BasicSample", back_populates="control") #: This control's submission sample
submitted_date = Column(TIMESTAMP) #: Date submitted to Robotics
submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id")) #: parent submission id
submission = relationship("BasicSubmission", back_populates="controls",
@@ -362,7 +365,6 @@ class IridaControl(Control):
refseq_version = Column(String(16)) #: version of refseq used in fastq parsing
kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing
kraken2_db_version = Column(String(32)) #: folder name of kraken2 db
sample = relationship("BacterialCultureSample", back_populates="control") #: This control's submission sample
sample_id = Column(INTEGER,
ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key

View File

@@ -848,7 +848,7 @@ class SubmissionType(BaseClass):
name = Column(String(128), unique=True) #: name of submission type
info_map = Column(JSON) #: Where parsable information is found in the excel workbook corresponding to this type.
defaults = Column(JSON) #: Basic information about this submission type
instances = relationship("BasicSubmission") #: Concrete instances of this type.
instances = relationship("ClientSubmission", back_populates="submission_type") #: Concrete instances of this type.
template_file = Column(BLOB) #: Blank form for this type stored as binary.
processes = relationship("Process", back_populates="submission_types",
secondary=submissiontypes_processes) #: Relation to equipment processes used for this type.
@@ -1048,9 +1048,9 @@ class SubmissionType(BaseClass):
}
"""
runtype_kit_associations = relationship(
"RunTypeKitTypeAssociation",
back_populates="runtype",
submissiontype_kit_associations = relationship(
"SubmissionTypeKitTypeAssociation",
back_populates="submission_type",
cascade="all, delete-orphan",
) #: Association of kittypes

View File

@@ -15,8 +15,8 @@ from operator import itemgetter
from pprint import pformat
from pandas import DataFrame
from sqlalchemy.ext.hybrid import hybrid_property
from . import Base, BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, LogMixin, \
SubmissionReagentAssociation
from . import Base, BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, \
SubmissionReagentAssociation, LogMixin
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified
@@ -55,14 +55,15 @@ class ClientSubmission(BaseClass, LogMixin):
String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name
sample_count = Column(INTEGER) #: Number of samples in the submission
runs = relationship("BasicRun", back_populates="client_submission") #: many-to-one relationship
runs = relationship("BasicSubmission", back_populates="client_submission") #: many-to-one relationship
contact = relationship("Contact", back_populates="submissions") #: client org
contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL",
name="fk_BS_contact_id")) #: client lab id from _organizations
submission_type = relationship("SubmissionType", back_populates="instances") #: archetype of this submission
submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL",
name="fk_BS_subtype_name")) #: name of joined submission type
submission_type = relationship("SubmissionType", back_populates="instances") #: archetype of this submission
cost_centre = Column(
String(64)) #: Permanent storage of used cost centre in case organization field changed in the future.
@@ -98,7 +99,9 @@ class BasicSubmission(BaseClass, LogMixin):
"""
id = Column(INTEGER, primary_key=True) #: primary key
rsl_plate_number = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012)
rsl_plate_num = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012)
client_submission_id = Column(INTEGER, ForeignKey("_clientsubmission.id", ondelete="SET NULL",
name="fk_BS_clientsub_id")) #: client lab id from _organizations)
client_submission = relationship("ClientSubmission", back_populates="runs")
# submitter_plate_num = Column(String(127), unique=True) #: The number given to the submission by the submitting lab
started_date = Column(TIMESTAMP) #: Date this run was started.
@@ -304,7 +307,8 @@ class BasicSubmission(BaseClass, LogMixin):
case SubmissionType():
return sub_type
case _:
return SubmissionType.query(cls.__mapper_args__['polymorphic_identity'])
# return SubmissionType.query(cls.__mapper_args__['polymorphic_identity'])
return None
@classmethod
def construct_info_map(cls, submission_type: SubmissionType | None = None,
@@ -356,7 +360,7 @@ class BasicSubmission(BaseClass, LogMixin):
"""
# NOTE: get lab from nested organization object
try:
sub_lab = self.submitting_lab.name
sub_lab = self.client_submission.submitting_lab.name
except AttributeError:
sub_lab = None
try:
@@ -364,24 +368,24 @@ class BasicSubmission(BaseClass, LogMixin):
except AttributeError:
pass
# NOTE: get extraction kit name from nested kit object
try:
ext_kit = self.extraction_kit.name
except AttributeError:
ext_kit = None
# try:
# ext_kit = self.extraction_kit.name
# except AttributeError:
# ext_kit = None
# NOTE: load scraped extraction info
try:
ext_info = self.extraction_info
except TypeError:
ext_info = None
# try:
# ext_info = self.extraction_info
# except TypeError:
# ext_info = None
output = {
"id": self.id,
"plate_number": self.rsl_plate_num,
"submission_type": self.submission_type_name,
"submitter_plate_number": self.submitter_plate_num,
"submitted_date": self.submitted_date.strftime("%Y-%m-%d"),
"submission_type": self.client_submission.submission_type_name,
"submitter_plate_number": self.client_submission.submitter_plate_num,
"submitted_date": self.client_submission.submitted_date.strftime("%Y-%m-%d"),
"submitting_lab": sub_lab,
"sample_count": self.sample_count,
"extraction_kit": ext_kit,
"sample_count": self.client_submission.sample_count,
"extraction_kit": "Change submissions.py line 388",
"cost": self.run_cost
}
if report:
@@ -433,11 +437,11 @@ class BasicSubmission(BaseClass, LogMixin):
contact_phone = self.contact.phone
except AttributeError:
contact_phone = "NA"
output["submission_category"] = self.submission_category
output["submission_category"] = self.client_submission.submission_category
output["technician"] = self.technician
output["reagents"] = reagents
output["samples"] = samples
output["extraction_info"] = ext_info
# output["extraction_info"] = ext_info
output["comment"] = comments
output["equipment"] = equipment
output["tips"] = tips
@@ -1236,7 +1240,8 @@ class BasicSubmission(BaseClass, LogMixin):
# # else:
start_date = cls.rectify_query_date(start_date)
end_date = cls.rectify_query_date(end_date, eod=True)
query = query.filter(model.submitted_date.between(start_date, end_date))
logger.debug(f"Start date: {start_date}, end date: {end_date}")
query = query.join(ClientSubmission).filter(ClientSubmission.submitted_date.between(start_date, end_date))
# NOTE: by reagent (for some reason)
match reagent:
case str():
@@ -1256,7 +1261,9 @@ class BasicSubmission(BaseClass, LogMixin):
pass
match submission_type_name:
case str():
query = query.filter(model.submission_type_name == submission_type_name)
if not start_date:
query = query.join(ClientSubmission)
query = query.filter(ClientSubmission.submission_type_name == submission_type_name)
case _:
pass
# NOTE: by id (returns only a single value)
@@ -1269,7 +1276,7 @@ class BasicSubmission(BaseClass, LogMixin):
limit = 1
case _:
pass
query = query.order_by(cls.submitted_date.desc())
# query = query.order_by(cls.submitted_date.desc())
# NOTE: Split query results into pages of size {page_size}
if page_size > 0:
query = query.limit(page_size)
@@ -1471,7 +1478,7 @@ class BasicSubmission(BaseClass, LogMixin):
completed = self.completed_date.date()
except AttributeError:
completed = None
return self.calculate_turnaround(start_date=self.submitted_date.date(), end_date=completed)
return self.calculate_turnaround(start_date=self.client_submission.submitted_date.date(), end_date=completed)
@classmethod
def calculate_turnaround(cls, start_date: date | None = None, end_date: date | None = None) -> int:
@@ -2459,6 +2466,7 @@ class BasicSample(BaseClass, LogMixin):
submitter_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
sample_type = Column(String(32)) #: mode_sub_type of sample
misc_info = Column(JSON)
control = relationship("Control", back_populates="sample", uselist=False)
sample_submission_associations = relationship(
"SubmissionSampleAssociation",
@@ -2775,140 +2783,140 @@ class BasicSample(BaseClass, LogMixin):
# NOTE: Below are the custom sample types
class WastewaterSample(BasicSample):
"""
Derivative wastewater sample
"""
id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True)
ww_processing_num = Column(String(64)) #: wastewater processing number
ww_full_sample_id = Column(String(64)) #: full id given by entrics
rsl_number = Column(String(64)) #: rsl plate identification number
collection_date = Column(TIMESTAMP) #: Date sample collected
received_date = Column(TIMESTAMP) #: Date sample received
notes = Column(String(2000)) #: notes from submission form
sample_location = Column(String(8)) #: location on 24 well plate
__mapper_args__ = dict(polymorphic_identity="Wastewater Sample",
polymorphic_load="inline",
inherit_condition=(id == BasicSample.id))
@classmethod
def get_default_info(cls, *args):
"""
Returns default info for a model. Extends BaseClass method.
Returns:
dict | list | str: Output of key:value dict or single (list, str) desired variable
"""
dicto = super().get_default_info(*args)
match dicto:
case dict():
dicto['singles'] += ['ww_processing_num']
output = {}
for k, v in dicto.items():
if len(args) > 0 and k not in args:
continue
else:
output[k] = v
if len(args) == 1:
return output[args[0]]
case list():
if "singles" in args:
dicto += ['ww_processing_num']
return dicto
case _:
pass
def to_sub_dict(self, full_data: bool = False) -> dict:
"""
gui friendly dictionary, extends parent method.
Returns:
dict: sample id, type, received date, collection date
"""
sample = super().to_sub_dict(full_data=full_data)
sample['ww_processing_num'] = self.ww_processing_num
sample['sample_location'] = self.sample_location
sample['received_date'] = self.received_date
sample['collection_date'] = self.collection_date
return sample
@classmethod
def parse_sample(cls, input_dict: dict) -> dict:
"""
Custom sample parser. Extends parent
Args:
input_dict (dict): Basic parser results for this sample.
Returns:
dict: Updated parser results.
"""
output_dict = super().parse_sample(input_dict)
disallowed = ["", None, "None"]
try:
check = output_dict['rsl_number'] in disallowed
except KeyError:
check = True
if check:
output_dict['rsl_number'] = "RSL-WW-" + output_dict['ww_processing_num']
if output_dict['ww_full_sample_id'] is not None and output_dict["submitter_id"] in disallowed:
output_dict["submitter_id"] = output_dict['ww_full_sample_id']
# check = check_key_or_attr("rsl_number", output_dict, check_none=True)
return output_dict
@classproperty
def searchables(cls) -> List[dict]:
"""
Delivers a list of fields that can be used in fuzzy search. Extends parent.
Returns:
List[str]: List of fields.
"""
searchables = deepcopy(super().searchables)
for item in ["ww_processing_num", "ww_full_sample_id", "rsl_number"]:
label = item.strip("ww_").replace("_", " ").replace("rsl", "RSL").title()
searchables.append(dict(label=label, field=item))
return searchables
class BacterialCultureSample(BasicSample):
"""
base of bacterial culture sample
"""
id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True)
organism = Column(String(64)) #: bacterial specimen
concentration = Column(String(16)) #: sample concentration
control = relationship("IridaControl", back_populates="sample", uselist=False)
__mapper_args__ = dict(polymorphic_identity="Bacterial Culture Sample",
polymorphic_load="inline",
inherit_condition=(id == BasicSample.id))
def to_sub_dict(self, full_data: bool = False) -> dict:
"""
gui friendly dictionary, extends parent method.
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
"""
sample = super().to_sub_dict(full_data=full_data)
sample['name'] = self.submitter_id
sample['organism'] = self.organism
try:
sample['concentration'] = f"{float(self.concentration):.2f}"
except (TypeError, ValueError):
sample['concentration'] = 0.0
if self.control is not None:
sample['colour'] = [0, 128, 0]
target = next((v for k, v in self.control.controltype.targets.items() if k == self.control.subtype),
"Not Available")
try:
target = ", ".join(target)
except:
target = "None"
sample['tooltip'] = f"\nControl: {self.control.controltype.name} - {target}"
return sample
# class WastewaterSample(BasicSample):
# """
# Derivative wastewater sample
# """
#
# id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True)
# ww_processing_num = Column(String(64)) #: wastewater processing number
# ww_full_sample_id = Column(String(64)) #: full id given by entrics
# rsl_number = Column(String(64)) #: rsl plate identification number
# collection_date = Column(TIMESTAMP) #: Date sample collected
# received_date = Column(TIMESTAMP) #: Date sample received
# notes = Column(String(2000)) #: notes from submission form
# sample_location = Column(String(8)) #: location on 24 well plate
# __mapper_args__ = dict(polymorphic_identity="Wastewater Sample",
# polymorphic_load="inline",
# inherit_condition=(id == BasicSample.id))
#
# @classmethod
# def get_default_info(cls, *args):
# """
# Returns default info for a model. Extends BaseClass method.
#
# Returns:
# dict | list | str: Output of key:value dict or single (list, str) desired variable
# """
# dicto = super().get_default_info(*args)
# match dicto:
# case dict():
# dicto['singles'] += ['ww_processing_num']
# output = {}
# for k, v in dicto.items():
# if len(args) > 0 and k not in args:
# continue
# else:
# output[k] = v
# if len(args) == 1:
# return output[args[0]]
# case list():
# if "singles" in args:
# dicto += ['ww_processing_num']
# return dicto
# case _:
# pass
#
# def to_sub_dict(self, full_data: bool = False) -> dict:
# """
# gui friendly dictionary, extends parent method.
#
# Returns:
# dict: sample id, type, received date, collection date
# """
# sample = super().to_sub_dict(full_data=full_data)
# sample['ww_processing_num'] = self.ww_processing_num
# sample['sample_location'] = self.sample_location
# sample['received_date'] = self.received_date
# sample['collection_date'] = self.collection_date
# return sample
#
# @classmethod
# def parse_sample(cls, input_dict: dict) -> dict:
# """
# Custom sample parser. Extends parent
#
# Args:
# input_dict (dict): Basic parser results for this sample.
#
# Returns:
# dict: Updated parser results.
# """
# output_dict = super().parse_sample(input_dict)
# disallowed = ["", None, "None"]
# try:
# check = output_dict['rsl_number'] in disallowed
# except KeyError:
# check = True
# if check:
# output_dict['rsl_number'] = "RSL-WW-" + output_dict['ww_processing_num']
# if output_dict['ww_full_sample_id'] is not None and output_dict["submitter_id"] in disallowed:
# output_dict["submitter_id"] = output_dict['ww_full_sample_id']
# # check = check_key_or_attr("rsl_number", output_dict, check_none=True)
# return output_dict
#
# @classproperty
# def searchables(cls) -> List[dict]:
# """
# Delivers a list of fields that can be used in fuzzy search. Extends parent.
#
# Returns:
# List[str]: List of fields.
# """
# searchables = deepcopy(super().searchables)
# for item in ["ww_processing_num", "ww_full_sample_id", "rsl_number"]:
# label = item.strip("ww_").replace("_", " ").replace("rsl", "RSL").title()
# searchables.append(dict(label=label, field=item))
# return searchables
#
#
# class BacterialCultureSample(BasicSample):
# """
# base of bacterial culture sample
# """
# id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True)
# organism = Column(String(64)) #: bacterial specimen
# concentration = Column(String(16)) #: sample concentration
# control = relationship("IridaControl", back_populates="sample", uselist=False)
# __mapper_args__ = dict(polymorphic_identity="Bacterial Culture Sample",
# polymorphic_load="inline",
# inherit_condition=(id == BasicSample.id))
#
# def to_sub_dict(self, full_data: bool = False) -> dict:
# """
# gui friendly dictionary, extends parent method.
#
# Returns:
# dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
# """
# sample = super().to_sub_dict(full_data=full_data)
# sample['name'] = self.submitter_id
# sample['organism'] = self.organism
# try:
# sample['concentration'] = f"{float(self.concentration):.2f}"
# except (TypeError, ValueError):
# sample['concentration'] = 0.0
# if self.control is not None:
# sample['colour'] = [0, 128, 0]
# target = next((v for k, v in self.control.controltype.targets.items() if k == self.control.subtype),
# "Not Available")
# try:
# target = ", ".join(target)
# except:
# target = "None"
# sample['tooltip'] = f"\nControl: {self.control.controltype.name} - {target}"
# return sample
#
# # NOTE: Submission to Sample Associations
@@ -2920,26 +2928,27 @@ class SubmissionSampleAssociation(BaseClass):
id = Column(INTEGER, unique=True, nullable=False) #: id to be used for inheriting purposes
sample_id = Column(INTEGER, ForeignKey("_basicsample.id"), nullable=False) #: id of associated sample
submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission
submission_id = Column(INTEGER, ForeignKey("_clientsubmission.id"), primary_key=True) #: id of associated submission
row = Column(INTEGER, primary_key=True) #: row on the 96 well plate
column = Column(INTEGER, primary_key=True) #: column on the 96 well plate
submission_rank = Column(INTEGER, nullable=False, default=0) #: Location in sample list
misc_info = Column(JSON)
# NOTE: reference to the Submission object
submission = relationship(BasicSubmission,
submission = relationship(ClientSubmission,
back_populates="submission_sample_associations") #: associated submission
# NOTE: reference to the Sample object
sample = relationship(BasicSample, back_populates="sample_submission_associations") #: associated sample
base_sub_type = Column(String) #: string of mode_sub_type name
# base_sub_type = Column(String) #: string of mode_sub_type name
# NOTE: Refers to the type of parent.
__mapper_args__ = {
"polymorphic_identity": "Basic Association",
"polymorphic_on": base_sub_type,
"with_polymorphic": "*",
}
# __mapper_args__ = {
# "polymorphic_identity": "Basic Association",
# "polymorphic_on": base_sub_type,
# "with_polymorphic": "*",
# }
def __init__(self, submission: BasicSubmission = None, sample: BasicSample = None, row: int = 1, column: int = 1,
id: int | None = None, submission_rank: int = 0, **kwargs):
@@ -3183,96 +3192,96 @@ class SubmissionSampleAssociation(BaseClass):
raise AttributeError(f"Delete not implemented for {self.__class__}")
class WastewaterAssociation(SubmissionSampleAssociation):
"""
table containing wastewater specific submission/sample associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True)
ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
n1_status = Column(String(32)) #: positive or negative for N1
n2_status = Column(String(32)) #: positive or negative for N2
pcr_results = Column(JSON) #: imported PCR status from QuantStudio
__mapper_args__ = dict(polymorphic_identity="Wastewater Association",
polymorphic_load="inline",
inherit_condition=(id == SubmissionSampleAssociation.id))
def to_sub_dict(self) -> dict:
"""
Returns a sample dictionary updated with instance information. Extends parent
Returns:
dict: Updated dictionary with row, column and well updated
"""
sample = super().to_sub_dict()
sample['ct'] = f"({self.ct_n1}, {self.ct_n2})"
try:
sample['source_row'] = row_keys[self.sample.sample_location[0]]
sample['source_column'] = int(self.sample.sample_location[1:])
except (TypeError, AttributeError) as e:
logger.error(f"Couldn't set sources for {self.sample.rsl_number}. Looks like there isn't data.")
try:
sample['positive'] = any(["positive" in item for item in [self.n1_status, self.n2_status]])
except (TypeError, AttributeError) as e:
logger.error(f"Couldn't check positives for {self.sample.rsl_number}. Looks like there isn't PCR data.")
return sample
@property
def hitpicked(self) -> dict | None:
"""
Outputs a dictionary usable for html plate maps. Extends parent
Returns:
dict: dictionary of sample id, row and column in elution plate
"""
sample = super().hitpicked
try:
scaler = max([self.ct_n1, self.ct_n2])
except TypeError:
scaler = 0.0
if scaler == 0.0:
scaler = 45
bg = (45 - scaler) * 17
red = min([64 + bg, 255])
grn = max([255 - bg, 0])
blu = 128
sample['background_color'] = f"rgb({red}, {grn}, {blu})"
try:
sample[
'tooltip'] += f"<br>- ct N1: {'{:.2f}'.format(self.ct_n1)}<br>- ct N2: {'{:.2f}'.format(self.ct_n2)}"
except (TypeError, AttributeError) as e:
logger.error(f"Couldn't set tooltip for {self.sample.rsl_number}. Looks like there isn't PCR data.")
return sample
class WastewaterArticAssociation(SubmissionSampleAssociation):
"""
table containing wastewater artic specific submission/sample associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True)
source_plate = Column(String(32))
source_plate_number = Column(INTEGER)
source_well = Column(String(8))
ct = Column(String(8)) #: AKA ct for N1
__mapper_args__ = dict(polymorphic_identity="Wastewater Artic Association",
polymorphic_load="inline",
inherit_condition=(id == SubmissionSampleAssociation.id))
def to_sub_dict(self) -> dict:
"""
Returns a sample dictionary updated with instance information. Extends parent
Returns:
dict: Updated dictionary with row, column and well updated
"""
sample = super().to_sub_dict()
sample['ct'] = self.ct
sample['source_plate'] = self.source_plate
sample['source_plate_number'] = self.source_plate_number
sample['source_well'] = self.source_well
return sample
# class WastewaterAssociation(SubmissionSampleAssociation):
# """
# table containing wastewater specific submission/sample associations
# DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
# """
# id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True)
# ct_n1 = Column(FLOAT(2)) #: AKA ct for N1
# ct_n2 = Column(FLOAT(2)) #: AKA ct for N2
# n1_status = Column(String(32)) #: positive or negative for N1
# n2_status = Column(String(32)) #: positive or negative for N2
# pcr_results = Column(JSON) #: imported PCR status from QuantStudio
#
# __mapper_args__ = dict(polymorphic_identity="Wastewater Association",
# polymorphic_load="inline",
# inherit_condition=(id == SubmissionSampleAssociation.id))
#
# def to_sub_dict(self) -> dict:
# """
# Returns a sample dictionary updated with instance information. Extends parent
#
# Returns:
# dict: Updated dictionary with row, column and well updated
# """
#
# sample = super().to_sub_dict()
# sample['ct'] = f"({self.ct_n1}, {self.ct_n2})"
# try:
# sample['source_row'] = row_keys[self.sample.sample_location[0]]
# sample['source_column'] = int(self.sample.sample_location[1:])
# except (TypeError, AttributeError) as e:
# logger.error(f"Couldn't set sources for {self.sample.rsl_number}. Looks like there isn't data.")
# try:
# sample['positive'] = any(["positive" in item for item in [self.n1_status, self.n2_status]])
# except (TypeError, AttributeError) as e:
# logger.error(f"Couldn't check positives for {self.sample.rsl_number}. Looks like there isn't PCR data.")
# return sample
#
# @property
# def hitpicked(self) -> dict | None:
# """
# Outputs a dictionary usable for html plate maps. Extends parent
#
# Returns:
# dict: dictionary of sample id, row and column in elution plate
# """
# sample = super().hitpicked
# try:
# scaler = max([self.ct_n1, self.ct_n2])
# except TypeError:
# scaler = 0.0
# if scaler == 0.0:
# scaler = 45
# bg = (45 - scaler) * 17
# red = min([64 + bg, 255])
# grn = max([255 - bg, 0])
# blu = 128
# sample['background_color'] = f"rgb({red}, {grn}, {blu})"
# try:
# sample[
# 'tooltip'] += f"<br>- ct N1: {'{:.2f}'.format(self.ct_n1)}<br>- ct N2: {'{:.2f}'.format(self.ct_n2)}"
# except (TypeError, AttributeError) as e:
# logger.error(f"Couldn't set tooltip for {self.sample.rsl_number}. Looks like there isn't PCR data.")
# return sample
#
#
# class WastewaterArticAssociation(SubmissionSampleAssociation):
# """
# table containing wastewater artic specific submission/sample associations
# DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
# """
# id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True)
# source_plate = Column(String(32))
# source_plate_number = Column(INTEGER)
# source_well = Column(String(8))
# ct = Column(String(8)) #: AKA ct for N1
#
# __mapper_args__ = dict(polymorphic_identity="Wastewater Artic Association",
# polymorphic_load="inline",
# inherit_condition=(id == SubmissionSampleAssociation.id))
#
# def to_sub_dict(self) -> dict:
# """
# Returns a sample dictionary updated with instance information. Extends parent
#
# Returns:
# dict: Updated dictionary with row, column and well updated
# """
# sample = super().to_sub_dict()
# sample['ct'] = self.ct
# sample['source_plate'] = self.source_plate
# sample['source_plate_number'] = self.source_plate_number
# sample['source_well'] = self.source_well
# return sample

View File

@@ -47,7 +47,7 @@ class ReportMaker(object):
# NOTE: Set page size to zero to override limiting query size.
self.subs = BasicSubmission.query(start_date=start_date, end_date=end_date, page_size=0)
if organizations is not None:
self.subs = [sub for sub in self.subs if sub.submitting_lab.name in organizations]
self.subs = [sub for sub in self.subs if sub.client_submission.submitting_lab.name in organizations]
self.detailed_df, self.summary_df = self.make_report_xlsx()
self.html = self.make_report_html(df=self.summary_df)
@@ -183,7 +183,10 @@ class TurnaroundMaker(ReportArchetype):
except (AttributeError, KeyError):
tat = None
if not tat:
tat = ctx.TaT_threshold
try:
tat = ctx.TaT_threshold
except AttributeError:
tat = 3
try:
tat_ok = days <= tat
except TypeError:

View File

@@ -12,7 +12,7 @@ import logging, numpy as np
from pprint import pformat
from typing import Tuple, List
from pathlib import Path
from backend.db.models import WastewaterArtic
from backend.db.models import BasicSubmission
logger = logging.getLogger(f"submissions.{__name__}")
@@ -20,7 +20,7 @@ logger = logging.getLogger(f"submissions.{__name__}")
# Main window class
class GelBox(QDialog):
def __init__(self, parent, img_path: str | Path, submission: WastewaterArtic):
def __init__(self, parent, img_path: str | Path, submission: BasicSubmission):
super().__init__(parent)
# NOTE: setting title
self.setWindowTitle(f"Gel - {img_path}")

View File

@@ -1195,36 +1195,48 @@ class Settings(BaseSettings, extra="allow"):
func = function[1]
# NOTE: assign function based on its name being in config: startup/teardown
# NOTE: scripts must be registered using {name: Null} in the database
if name in self.startup_scripts.keys():
self.startup_scripts[name] = func
if name in self.teardown_scripts.keys():
self.teardown_scripts[name] = func
try:
if name in self.startup_scripts.keys():
self.startup_scripts[name] = func
except AttributeError:
pass
try:
if name in self.teardown_scripts.keys():
self.teardown_scripts[name] = func
except AttributeError:
pass
@timer
def run_startup(self):
"""
Runs startup scripts.
"""
for script in self.startup_scripts.values():
try:
logger.info(f"Running startup script: {script.__name__}")
thread = Thread(target=script, args=(ctx,))
thread.start()
except AttributeError:
logger.error(f"Couldn't run startup script: {script}")
try:
for script in self.startup_scripts.values():
try:
logger.info(f"Running startup script: {script.__name__}")
thread = Thread(target=script, args=(ctx,))
thread.start()
except AttributeError:
logger.error(f"Couldn't run startup script: {script}")
except AttributeError:
pass
@timer
def run_teardown(self):
"""
Runs teardown scripts.
"""
for script in self.teardown_scripts.values():
try:
logger.info(f"Running teardown script: {script.__name__}")
thread = Thread(target=script, args=(ctx,))
thread.start()
except AttributeError:
logger.error(f"Couldn't run teardown script: {script}")
try:
for script in self.teardown_scripts.values():
try:
logger.info(f"Running teardown script: {script.__name__}")
thread = Thread(target=script, args=(ctx,))
thread.start()
except AttributeError:
logger.error(f"Couldn't run teardown script: {script}")
except AttributeError:
pass
@classmethod
def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str: