Automated Control lot retrieval
This commit is contained in:
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey
|
||||
from sqlalchemy.orm import relationship, Query
|
||||
from sqlalchemy_json import NestedMutableJson
|
||||
import logging
|
||||
import logging, re
|
||||
from operator import itemgetter
|
||||
from . import BaseClass
|
||||
from tools import setup_lookup
|
||||
@@ -25,6 +25,9 @@ class ControlType(BaseClass):
|
||||
targets = Column(JSON) #: organisms checked for
|
||||
instances = relationship("Control", back_populates="controltype") #: control samples created of this type.
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<ControlType({self.name})>"
|
||||
|
||||
@classmethod
|
||||
@setup_lookup
|
||||
def query(cls,
|
||||
@@ -74,6 +77,16 @@ class ControlType(BaseClass):
|
||||
subtypes = [item for item in jsoner[genera] if "_hashes" not in item and "_ratio" not in item]
|
||||
return subtypes
|
||||
|
||||
@classmethod
|
||||
def get_positive_control_types(cls):
|
||||
return [item for item in cls.query() if item.targets != []]
|
||||
|
||||
@classmethod
|
||||
def build_positive_regex(cls):
|
||||
strings = list(set([item.name.split("-")[0] for item in cls.get_positive_control_types()]))
|
||||
return re.compile(rf"(^{'|^'.join(strings)})-.*", flags=re.IGNORECASE)
|
||||
|
||||
|
||||
class Control(BaseClass):
|
||||
"""
|
||||
Base class of a control sample.
|
||||
|
||||
@@ -1117,6 +1117,39 @@ class BacterialCulture(BasicSubmission):
|
||||
# input_dict['submitted_date']['missing'] = True
|
||||
# return input_dict
|
||||
|
||||
@classmethod
|
||||
def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, plate_map: dict | None = None) -> dict:
|
||||
"""
|
||||
Extends parent. Currently finds control sample and adds to reagents.
|
||||
|
||||
Args:
|
||||
input_dict (dict): _description_
|
||||
xl (pd.ExcelFile | None, optional): _description_. Defaults to None.
|
||||
info_map (dict | None, optional): _description_. Defaults to None.
|
||||
plate_map (dict | None, optional): _description_. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: _description_
|
||||
"""
|
||||
from . import ControlType
|
||||
input_dict = super().finalize_parse(input_dict, xl, info_map, plate_map)
|
||||
# build regex for all control types that have targets
|
||||
regex = ControlType.build_positive_regex()
|
||||
# search samples for match
|
||||
for sample in input_dict['samples']:
|
||||
matched = regex.match(sample.submitter_id)
|
||||
if bool(matched):
|
||||
logger.debug(f"Control match found: {sample.submitter_id}")
|
||||
new_lot = matched.group()
|
||||
try:
|
||||
pos_control_reg = [reg for reg in input_dict['reagents'] if reg.type=="Bacterial-Positive Control"][0]
|
||||
except IndexError:
|
||||
logger.error(f"No positive control reagent listed")
|
||||
return input_dict
|
||||
pos_control_reg.lot = new_lot
|
||||
pos_control_reg.missing = False
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
|
||||
"""
|
||||
@@ -1505,53 +1538,53 @@ class WastewaterArtic(BasicSubmission):
|
||||
# logger.debug(pformat(input_dict))
|
||||
# logger.debug(pformat(info_map))
|
||||
# logger.debug(pformat(plate_map))
|
||||
samples = []
|
||||
for sample in input_dict['samples']:
|
||||
logger.debug(f"Input sample: {pformat(sample.__dict__)}")
|
||||
if sample.submitter_id == "NTC1":
|
||||
samples.append(dict(sample=sample.submitter_id, destination_row=8, destination_column=2, source_row=0, source_column=0, plate_number='control', plate=None))
|
||||
continue
|
||||
elif sample.submitter_id == "NTC2":
|
||||
samples.append(dict(sample=sample.submitter_id, destination_row=8, destination_column=5, source_row=0, source_column=0, plate_number='control', plate=None))
|
||||
continue
|
||||
destination_row = sample.row[0]
|
||||
destination_column = sample.column[0]
|
||||
# logger.debug(f"Looking up: {sample.submitter_id} friend.")
|
||||
lookup_sample = BasicSample.query(submitter_id=sample.submitter_id)
|
||||
lookup_ssa = SubmissionSampleAssociation.query(sample=lookup_sample, exclude_submission_type=cls.__mapper_args__['polymorphic_identity'] , chronologic=True, reverse=True, limit=1)
|
||||
try:
|
||||
plate = lookup_ssa.submission.rsl_plate_num
|
||||
source_row = lookup_ssa.row
|
||||
source_column = lookup_ssa.column
|
||||
except AttributeError as e:
|
||||
logger.error(f"Problem with lookup: {e}")
|
||||
plate = "Error"
|
||||
source_row = 0
|
||||
source_column = 0
|
||||
# continue
|
||||
output_sample = dict(
|
||||
sample=sample.submitter_id,
|
||||
destination_column=destination_column,
|
||||
destination_row=destination_row,
|
||||
plate=plate,
|
||||
source_column=source_column,
|
||||
source_row = source_row
|
||||
)
|
||||
logger.debug(f"output sample: {pformat(output_sample)}")
|
||||
samples.append(output_sample)
|
||||
plates = sorted(list(set([sample['plate'] for sample in samples if sample['plate'] != None and sample['plate'] != "Error"])))
|
||||
logger.debug(f"Here's what I got for plates: {plates}")
|
||||
for iii, plate in enumerate(plates):
|
||||
for sample in samples:
|
||||
if sample['plate'] == plate:
|
||||
sample['plate_number'] = iii + 1
|
||||
df = pd.DataFrame.from_records(samples).fillna(value="")
|
||||
try:
|
||||
df.source_row = df.source_row.astype(int)
|
||||
df.source_column = df.source_column.astype(int)
|
||||
df.sort_values(by=['destination_column', 'destination_row'], inplace=True)
|
||||
except AttributeError as e:
|
||||
logger.error(f"Couldn't construct df due to {e}")
|
||||
# samples = []
|
||||
# for sample in input_dict['samples']:
|
||||
# logger.debug(f"Input sample: {pformat(sample.__dict__)}")
|
||||
# if sample.submitter_id == "NTC1":
|
||||
# samples.append(dict(sample=sample.submitter_id, destination_row=8, destination_column=2, source_row=0, source_column=0, plate_number='control', plate=None))
|
||||
# continue
|
||||
# elif sample.submitter_id == "NTC2":
|
||||
# samples.append(dict(sample=sample.submitter_id, destination_row=8, destination_column=5, source_row=0, source_column=0, plate_number='control', plate=None))
|
||||
# continue
|
||||
# destination_row = sample.row[0]
|
||||
# destination_column = sample.column[0]
|
||||
# # logger.debug(f"Looking up: {sample.submitter_id} friend.")
|
||||
# lookup_sample = BasicSample.query(submitter_id=sample.submitter_id)
|
||||
# lookup_ssa = SubmissionSampleAssociation.query(sample=lookup_sample, exclude_submission_type=cls.__mapper_args__['polymorphic_identity'] , chronologic=True, reverse=True, limit=1)
|
||||
# try:
|
||||
# plate = lookup_ssa.submission.rsl_plate_num
|
||||
# source_row = lookup_ssa.row
|
||||
# source_column = lookup_ssa.column
|
||||
# except AttributeError as e:
|
||||
# logger.error(f"Problem with lookup: {e}")
|
||||
# plate = "Error"
|
||||
# source_row = 0
|
||||
# source_column = 0
|
||||
# # continue
|
||||
# output_sample = dict(
|
||||
# sample=sample.submitter_id,
|
||||
# destination_column=destination_column,
|
||||
# destination_row=destination_row,
|
||||
# plate=plate,
|
||||
# source_column=source_column,
|
||||
# source_row = source_row
|
||||
# )
|
||||
# logger.debug(f"output sample: {pformat(output_sample)}")
|
||||
# samples.append(output_sample)
|
||||
# plates = sorted(list(set([sample['plate'] for sample in samples if sample['plate'] != None and sample['plate'] != "Error"])))
|
||||
# logger.debug(f"Here's what I got for plates: {plates}")
|
||||
# for iii, plate in enumerate(plates):
|
||||
# for sample in samples:
|
||||
# if sample['plate'] == plate:
|
||||
# sample['plate_number'] = iii + 1
|
||||
# df = pd.DataFrame.from_records(samples).fillna(value="")
|
||||
# try:
|
||||
# df.source_row = df.source_row.astype(int)
|
||||
# df.source_column = df.source_column.astype(int)
|
||||
# df.sort_values(by=['destination_column', 'destination_row'], inplace=True)
|
||||
# except AttributeError as e:
|
||||
# logger.error(f"Couldn't construct df due to {e}")
|
||||
# input_dict['csv'] = df
|
||||
input_dict['csv'] = xl.parse("hitpicks_csv_to_export")
|
||||
return input_dict
|
||||
|
||||
@@ -809,8 +809,8 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
# # what reagent types are in both lists?
|
||||
# missing = list(set(ext_kit_rtypes).difference(reagenttypes))
|
||||
missing = []
|
||||
# output_reagents = self.reagents
|
||||
output_reagents = ext_kit_rtypes
|
||||
output_reagents = self.reagents
|
||||
# output_reagents = ext_kit_rtypes
|
||||
logger.debug(f"Already have these reagent types: {reagenttypes}")
|
||||
for rt in ext_kit_rtypes:
|
||||
if rt.type not in reagenttypes:
|
||||
|
||||
Reference in New Issue
Block a user