Moments before disaster

This commit is contained in:
lwark
2024-09-27 10:09:09 -05:00
parent 3c98d2a4ad
commit 9b08fc5186
6 changed files with 170 additions and 169 deletions

View File

@@ -168,7 +168,7 @@ class BaseClass(Base):
logger.error(f"Error message: {type(e)}")
self.__database_session__.rollback()
report.add_result(Result(msg=e, status="Critical"))
return report
return report
class ConfigItem(BaseClass):

View File

@@ -245,7 +245,6 @@ class BasicSubmission(BaseClass):
case _:
return SubmissionType.query(cls.__mapper_args__['polymorphic_identity'])
@classmethod
def construct_info_map(cls, submission_type: SubmissionType | None = None,
mode: Literal["read", "write"] = "read") -> dict:
@@ -424,8 +423,9 @@ class BasicSubmission(BaseClass):
except Exception as e:
logger.error(f"Column count error: {e}")
# NOTE: Get kit associated with this submission
assoc = next((item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type),
None)
assoc = next((item for item in self.extraction_kit.kit_submissiontype_associations if
item.submission_type == self.submission_type),
None)
# logger.debug(f"Came up with association: {assoc}")
# NOTE: If every individual cost is 0 this is probably an old plate.
if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]):
@@ -464,15 +464,23 @@ class BasicSubmission(BaseClass):
Returns:
str: html output string.
"""
output_samples = []
# output_samples = []
# logger.debug("Setting locations.")
for column in range(1, plate_columns + 1):
for row in range(1, plate_rows + 1):
try:
well = next((item for item in sample_list if item['row'] == row and item['column'] == column), dict(name="", row=row, column=column, background_color="#ffffff"))
except StopIteration:
well = dict(name="", row=row, column=column, background_color="#ffffff")
output_samples.append(well)
# for column in range(1, plate_columns + 1):
# for row in range(1, plate_rows + 1):
# try:
# well = next((item for item in sample_list if item['row'] == row and item['column'] == column), dict(name="", row=row, column=column, background_color="#ffffff"))
# except StopIteration:
# well = dict(name="", row=row, column=column, background_color="#ffffff")
# output_samples.append(well)
rows = range(1, plate_rows + 1)
columns = range(1, plate_columns + 1)
# NOTE: An overly complicated list comprehension create a list of sample locations
# NOTE: next will return a blank cell if no value found for row/column
output_samples = [next((item for item in sample_list if item['row'] == row and item['column'] == column),
dict(name="", row=row, column=column, background_color="#ffffff"))
for row in rows
for column in columns]
env = jinja_template_loading()
template = env.get_template("plate_map.html")
html = template.render(samples=output_samples, PLATE_ROWS=plate_rows, PLATE_COLUMNS=plate_columns)
@@ -510,16 +518,17 @@ class BasicSubmission(BaseClass):
df = pd.DataFrame.from_records(subs)
# logger.debug(f"Column names: {df.columns}")
# NOTE: Exclude sub information
excluded = ['controls', 'extraction_info', 'pcr_info', 'comment', 'comments', 'samples', 'reagents',
exclude = ['controls', 'extraction_info', 'pcr_info', 'comment', 'comments', 'samples', 'reagents',
'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls',
'source_plates', 'pcr_technician', 'ext_technician', 'artic_technician', 'cost_centre',
'signed_by', 'artic_date', 'gel_barcode', 'gel_date', 'ngs_date', 'contact_phone', 'contact',
'tips', 'gel_image_path', 'custom']
for item in excluded:
try:
df = df.drop(item, axis=1)
except:
logger.warning(f"Couldn't drop '{item}' column from submissionsheet df.")
df = df.loc[:, ~df.columns.isin(exclude)]
# for item in excluded:
# try:
# df = df.drop(item, axis=1)
# except:
# logger.warning(f"Couldn't drop '{item}' column from submissionsheet df.")
if chronologic:
try:
df.sort_values(by="id", axis=0, inplace=True, ascending=False)
@@ -599,7 +608,7 @@ class BasicSubmission(BaseClass):
field_value = value.strip()
except AttributeError:
field_value = value
# insert into field
# NOTE: insert into field
try:
self.__setattr__(key, field_value)
except AttributeError as e:
@@ -616,16 +625,17 @@ class BasicSubmission(BaseClass):
Returns:
Result: _description_
"""
# assoc = [item for item in self.submission_sample_associations if item.sample == sample][0]
try:
assoc = next(item for item in self.submission_sample_associations if item.sample == sample)
except StopIteration:
report = Report()
report.add_result(Result(msg=f"Couldn't find submission sample association for {sample.submitter_id}", status="Warning"))
report.add_result(
Result(msg=f"Couldn't find submission sample association for {sample.submitter_id}", status="Warning"))
return report
for k, v in input_dict.items():
try:
setattr(assoc, k, v)
# NOTE: for some reason I don't think assoc.__setattr__(k, v) doesn't work here.
except AttributeError:
logger.error(f"Can't set {k} to {v}")
result = assoc.save()
@@ -702,15 +712,7 @@ class BasicSubmission(BaseClass):
Returns:
re.Pattern: Regular expression pattern to discriminate between submission types.
"""
# rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}'
# rstring = rf'{"|".join([item.defaults["regex"] for item in SubmissionType.query()])}'
res = []
for item in SubmissionType.query():
try:
res.append(item.defaults['regex'])
except TypeError:
logger.error(f"Problem: {item.__dict__}")
continue
res = [st.defaults['regex'] for st in SubmissionType.query() if st.defaults]
rstring = rf'{"|".join(res)}'
regex = re.compile(rstring, flags=re.IGNORECASE | re.VERBOSE)
return regex
@@ -737,21 +739,21 @@ class BasicSubmission(BaseClass):
match polymorphic_identity:
case str():
try:
logger.info(f"Recruiting: {cls}")
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(
f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}, falling back to BasicSubmission")
case _:
pass
if attrs is None or len(attrs) == 0:
return model
if any([not hasattr(cls, attr) for attr in attrs.keys()]):
# if attrs is None or len(attrs) == 0:
# logger.info(f"Recruiting: {cls}")
# return model
if attrs and any([not hasattr(cls, attr) for attr in attrs.keys()]):
# looks for first model that has all included kwargs
try:
model = [subclass for subclass in cls.__subclasses__() if
all([hasattr(subclass, attr) for attr in attrs.keys()])][0]
except IndexError as e:
model = next(subclass for subclass in cls.__subclasses__() if
all([hasattr(subclass, attr) for attr in attrs.keys()]))
except StopIteration as e:
raise AttributeError(
f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs.keys())}")
logger.info(f"Recruiting model: {model}")
@@ -785,15 +787,19 @@ class BasicSubmission(BaseClass):
input_dict['custom'][k] = ws.cell(row=v['read']['row'], column=v['read']['column']).value
case "range":
ws = xl[v['sheet']]
input_dict['custom'][k] = []
# input_dict['custom'][k] = []
if v['start_row'] != v['end_row']:
v['end_row'] = v['end_row'] + 1
rows = range(v['start_row'], v['end_row'])
if v['start_column'] != v['end_column']:
v['end_column'] = v['end_column'] + 1
for ii in range(v['start_row'], v['end_row']):
for jj in range(v['start_column'], v['end_column'] + 1):
input_dict['custom'][k].append(
dict(value=ws.cell(row=ii, column=jj).value, row=ii, column=jj))
columns = range(v['start_column'], v['end_column'])
input_dict['custom'][k] = [dict(value=ws.cell(row=row, column=column).value, row=row, column=column)
for row in rows for column in columns]
# for ii in range(v['start_row'], v['end_row']):
# for jj in range(v['start_column'], v['end_column'] + 1):
# input_dict['custom'][k].append(
# dict(value=ws.cell(row=ii, column=jj).value, row=ii, column=jj))
return input_dict
@classmethod
@@ -907,38 +913,43 @@ class BasicSubmission(BaseClass):
else:
outstr = instr
if re.search(rf"{data['abbreviation']}", outstr, flags=re.IGNORECASE) is None:
# NOTE: replace RSL- with RSL-abbreviation-
outstr = re.sub(rf"RSL-?", rf"RSL-{data['abbreviation']}-", outstr, flags=re.IGNORECASE)
try:
# NOTE: remove dashes from date
outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr)
# NOTE: insert dash between abbreviation and date
outstr = re.sub(rf"{data['abbreviation']}(\d{6})", rf"{data['abbreviation']}-\1", outstr,
flags=re.IGNORECASE).upper()
except (AttributeError, TypeError) as e:
logger.error(f"Error making outstr: {e}, sending to RSLNamer to make new plate name.")
outstr = RSLNamer.construct_new_plate_name(data=data)
try:
# NOTE: Grab plate number
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", outstr).group().strip("_").strip("-")
# logger.debug(f"Plate number is: {plate_number}")
except AttributeError as e:
plate_number = "1"
# NOTE: insert dash between date and plate number
outstr = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", outstr)
# logger.debug(f"After addition of plate number the plate name is: {outstr}")
try:
# NOTE: grab repeat number
repeat = re.search(r"-\dR(?P<repeat>\d)?", outstr).groupdict()['repeat']
if repeat is None:
repeat = "1"
except AttributeError as e:
repeat = ""
# NOTE: Insert repeat number?
outstr = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
# abb = cls.get_default_info('abbreviation')
# outstr = re.sub(rf"RSL{abb}", rf"RSL-{abb}", outstr)
# return re.sub(rf"{abb}(\d)", rf"{abb}-\1", outstr)
# NOTE: This should already have been done. Do I dare remove it?
outstr = re.sub(rf"RSL{data['abbreviation']}", rf"RSL-{data['abbreviation']}", outstr)
return re.sub(rf"{data['abbreviation']}(\d)", rf"{data['abbreviation']}-\1", outstr)
@classmethod
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list:
"""
Perform custom parsing of pcr info.
Perform parsing of pcr info. Since most of our PC outputs are the same format, this should work for most.
Args:
xl (pd.DataFrame): pcr info form
@@ -959,7 +970,6 @@ class BasicSubmission(BaseClass):
for k, v in fields.items():
sheet = xl[v['sheet']]
sample[k] = sheet.cell(row=idx, column=v['column']).value
# yield sample
samples.append(sample)
return samples
@@ -974,19 +984,20 @@ class BasicSubmission(BaseClass):
"""
return "{{ rsl_plate_num }}"
@classmethod
def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
"""
Updates row information
Args:
sample (_type_): _description_
worksheet (Workbook): _description_
Returns:
int: New row number
"""
return None
# @classmethod
# def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
# """
# Updates row information
#
# Args:
# sample (_type_): _description_
# worksheet (Workbook): _description_
#
# Returns:
# int: New row number
# """
# logger.debug(f"Sample from args: {sample}")
# return None
@classmethod
def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]:
@@ -1232,7 +1243,10 @@ class BasicSubmission(BaseClass):
except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e:
self.__database_session__.rollback()
raise e
obj.setData()
try:
obj.setData()
except AttributeError:
logger.debug("App will not refresh data at this time.")
def show_details(self, obj):
"""
@@ -1408,29 +1422,29 @@ class BacterialCulture(BasicSubmission):
pos_control_reg['missing'] = False
return input_dict
@classmethod
def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
"""
Extends parent
"""
# logger.debug(f"Checking {sample.well}")
# logger.debug(f"here's the worksheet: {worksheet}")
row = super().custom_sample_autofill_row(sample, worksheet)
df = pd.DataFrame(list(worksheet.values))
# logger.debug(f"Here's the dataframe: {df}")
idx = df[df[0] == sample.well]
if idx.empty:
new = f"{sample.well[0]}{sample.well[1:].zfill(2)}"
# logger.debug(f"Checking: {new}")
idx = df[df[0] == new]
# logger.debug(f"Here is the row: {idx}")
row = idx.index.to_list()[0]
return row + 1
# @classmethod
# def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
# """
# Extends parent
# """
# # logger.debug(f"Checking {sample.well}")
# # logger.debug(f"here's the worksheet: {worksheet}")
# row = super().custom_sample_autofill_row(sample, worksheet)
# df = pd.DataFrame(list(worksheet.values))
# # logger.debug(f"Here's the dataframe: {df}")
# idx = df[df[0] == sample.well]
# if idx.empty:
# new = f"{sample.well[0]}{sample.well[1:].zfill(2)}"
# # logger.debug(f"Checking: {new}")
# idx = df[df[0] == new]
# # logger.debug(f"Here is the row: {idx}")
# row = idx.index.to_list()[0]
# return row + 1
@classmethod
def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict:
input_dict = super().custom_info_parser(input_dict=input_dict, xl=xl, custom_fields=custom_fields)
logger.debug(f"\n\nInfo dictionary:\n\n{pformat(input_dict)}\n\n")
# logger.debug(f"\n\nInfo dictionary:\n\n{pformat(input_dict)}\n\n")
return input_dict
@@ -1561,20 +1575,20 @@ class Wastewater(BasicSubmission):
samples = [item for item in samples if not item.submitter_id.startswith("EN")]
return samples
@classmethod
def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
"""
Extends parent
"""
# logger.debug(f"Checking {sample.well}")
# logger.debug(f"here's the worksheet: {worksheet}")
row = super().custom_sample_autofill_row(sample, worksheet)
df = pd.DataFrame(list(worksheet.values))
# logger.debug(f"Here's the dataframe: {df}")
idx = df[df[1] == sample.sample_location]
# logger.debug(f"Here is the row: {idx}")
row = idx.index.to_list()[0]
return row + 1
# @classmethod
# def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
# """
# Extends parent
# """
# # logger.debug(f"Checking {sample.well}")
# # logger.debug(f"here's the worksheet: {worksheet}")
# row = super().custom_sample_autofill_row(sample, worksheet)
# df = pd.DataFrame(list(worksheet.values))
# # logger.debug(f"Here's the dataframe: {df}")
# idx = df[df[1] == sample.sample_location]
# # logger.debug(f"Here is the row: {idx}")
# row = idx.index.to_list()[0]
# return row + 1
@classmethod
def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]:
@@ -1816,7 +1830,7 @@ class WastewaterArtic(BasicSubmission):
datum['values'].append(d)
data.append(datum)
input_dict['gel_info'] = data
logger.debug(f"Wastewater Artic custom info:\n\n{pformat(input_dict)}")
# logger.debug(f"Wastewater Artic custom info:\n\n{pformat(input_dict)}")
egel_image_section = custom_fields['image_range']
img: Image = scrape_image(wb=xl, info_dict=egel_image_section)
if img is not None:
@@ -2259,18 +2273,19 @@ class BasicSample(BaseClass):
def to_sub_dict(self, full_data: bool = False) -> dict:
"""
gui friendly dictionary, extends parent method.
gui friendly dictionary
Args:
full_data (bool): Whether to use full object or truncated. Defaults to False
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
dict: submitter id and sample type and linked submissions if full data
"""
# logger.debug(f"Converting {self} to dict.")
sample = {}
sample['submitter_id'] = self.submitter_id
sample['sample_type'] = self.sample_type
sample = dict(
submitter_id=self.submitter_id,
sample_type=self.sample_type
)
if full_data:
sample['submissions'] = sorted([item.to_sub_dict() for item in self.sample_submission_associations],
key=itemgetter('submitted_date'))
@@ -2307,22 +2322,22 @@ class BasicSample(BaseClass):
polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity is not None:
try:
# return [item for item in cls.__subclasses__() if
# item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
model = cls
logger.info(f"Recruiting model: {model}")
return model
else:
model = cls
if attrs is None or len(attrs) == 0:
return model
if any([not hasattr(cls, attr) for attr in attrs.keys()]):
# looks for first model that has all included kwargs
# NOTE: looks for first model that has all included kwargs
try:
model = [subclass for subclass in cls.__subclasses__() if
all([hasattr(subclass, attr) for attr in attrs.keys()])][0]
except IndexError as e:
model = next(subclass for subclass in cls.__subclasses__() if
all([hasattr(subclass, attr) for attr in attrs.keys()]))
except StopIteration as e:
raise AttributeError(
f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs.keys())}")
logger.info(f"Recruiting model: {model}")
@@ -2343,7 +2358,7 @@ class BasicSample(BaseClass):
return input_dict
@classmethod
def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]:
def get_details_template(cls) -> Template:
"""
Get the details jinja template for the correct class
@@ -2353,7 +2368,6 @@ class BasicSample(BaseClass):
Returns:
Tuple(dict, Template): (Updated dictionary, Template to be rendered)
"""
base_dict['excluded'] = ['submissions', 'excluded', 'colour', 'tooltip']
env = jinja_template_loading()
temp_name = f"{cls.__name__.lower()}_details.html"
# logger.debug(f"Returning template: {temp_name}")
@@ -2362,7 +2376,7 @@ class BasicSample(BaseClass):
except TemplateNotFound as e:
logger.error(f"Couldn't find template {e}")
template = env.get_template("basicsample_details.html")
return base_dict, template
return template
@classmethod
@setup_lookup
@@ -2459,6 +2473,7 @@ class BasicSample(BaseClass):
search = f"%{v}%"
try:
attr = getattr(model, k)
# NOTE: the secret sauce is in attr.like
query = query.filter(attr.like(search))
except (ArgumentError, AttributeError) as e:
logger.error(f"Attribute {k} unavailable due to:\n\t{e}\nSkipping.")
@@ -2488,8 +2503,6 @@ class BasicSample(BaseClass):
Returns:
pd.DataFrame: Dataframe all samples
"""
if not isinstance(sample_list, list):
sample_list = [sample_list]
try:
samples = [sample.to_sub_dict() for sample in sample_list]
except TypeError as e:
@@ -2497,12 +2510,9 @@ class BasicSample(BaseClass):
return None
df = pd.DataFrame.from_records(samples)
# NOTE: Exclude sub information
for item in ['concentration', 'organism', 'colour', 'tooltip', 'comments', 'samples', 'reagents',
'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls']:
try:
df = df.drop(item, axis=1)
except KeyError as e:
logger.warning(f"Couldn't drop '{item}' column from submissionsheet df due to {e}.")
exclude = ['concentration', 'organism', 'colour', 'tooltip', 'comments', 'samples', 'reagents',
'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls']
df = df.loc[:, ~df.columns.isin(exclude)]
return df
def show_details(self, obj):
@@ -2570,7 +2580,7 @@ class WastewaterSample(BasicSample):
gui friendly dictionary, extends parent method.
Returns:
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
dict: sample id, type, received date, collection date
"""
sample = super().to_sub_dict(full_data=full_data)
sample['ww_processing_num'] = self.ww_processing_num
@@ -2713,7 +2723,7 @@ class SubmissionSampleAssociation(BaseClass):
Returns:
dict: Updated dictionary with row, column and well updated
"""
# Get sample info
# NOTE: Get associated sample info
# logger.debug(f"Running {self.__repr__()}")
sample = self.sample.to_sub_dict()
# logger.debug("Sample conversion complete.")
@@ -2791,8 +2801,6 @@ class SubmissionSampleAssociation(BaseClass):
model = cls
else:
try:
# output = [item for item in cls.__subclasses__() if
# item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
@@ -2974,11 +2982,10 @@ class WastewaterAssociation(SubmissionSampleAssociation):
if scaler == 0.0:
scaler = 45
bg = (45 - scaler) * 17
red = min([128 + bg, 255])
red = min([64 + bg, 255])
grn = max([255 - bg, 0])
blu = 128
rgb = f"rgb({red}, {grn}, {blu})"
sample['background_color'] = rgb
sample['background_color'] = f"rgb({red}, {grn}, {blu})"
try:
sample[
'tooltip'] += f"<br>- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})<br>- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})"
@@ -2995,13 +3002,13 @@ class WastewaterAssociation(SubmissionSampleAssociation):
int: incremented id
"""
try:
parent = next((base for base in cls.__bases__ if base.__name__=="SubmissionSampleAssociation"),
parent = next((base for base in cls.__bases__ if base.__name__ == "SubmissionSampleAssociation"),
SubmissionSampleAssociation)
return max([item.id for item in parent.query()]) + 1
except StopIteration as e:
logger.error(f"Problem incrementing id: {e}")
return 1
@classmethod
def autoincrement_id(cls) -> int:
return super().autoincrement_id()

View File

@@ -1,7 +1,6 @@
'''
contains parser objects for pulling values from client generated submission sheets.
'''
import sys
from copy import copy
from getpass import getuser
from pprint import pformat
@@ -12,9 +11,7 @@ from backend.db.models import *
from backend.validators import PydSubmission, PydReagent, RSLNamer, PydSample, PydEquipment, PydTips
import logging, re
from collections import OrderedDict
from datetime import date
from dateutil.parser import parse, ParserError
from tools import check_not_nan, convert_nans_to_nones, is_missing, remove_key_from_list_of_dicts, check_key_or_attr
from tools import check_not_nan, convert_nans_to_nones, is_missing, check_key_or_attr
logger = logging.getLogger(f"submissions.{__name__}")
@@ -70,34 +67,24 @@ class SheetParser(object):
check = info['submission_type']['value'] not in [None, "None", "", " "]
except KeyError:
return
logger.debug(f"Checking old submission type: {self.submission_type.name} against new: {info['submission_type']['value']}")
logger.info(
f"Checking for updated submission type: {self.submission_type.name} against new: {info['submission_type']['value']}")
if self.submission_type.name != info['submission_type']['value']:
# logger.debug(f"info submission type: {info}")
if check:
self.submission_type = SubmissionType.query(name=info['submission_type']['value'])
logger.debug(f"Updated self.submission_type to {self.submission_type}. Rerunning parse.")
logger.info(f"Updated self.submission_type to {self.submission_type}. Rerunning parse.")
self.parse_info()
else:
self.submission_type = RSLNamer.retrieve_submission_type(filename=self.filepath)
self.parse_info()
for k, v in info.items():
match k:
# NOTE: exclude samples.
case "sample":
continue
# case "submission_type":
# self.sub[k] = v
# # NOTE: Rescue submission type using scraped values to be used in Sample, Reagents, etc.
# if v not in [None, "None", "", " "]:
# self.submission_type = SubmissionType.query(name=v)
# logger.debug(f"Updated self.submission_type to {self.submission_type}")
case _:
self.sub[k] = v
# print(f"\n\n {self.sub} \n\n")
def parse_reagents(self, extraction_kit: str | None = None):
"""
@@ -110,8 +97,8 @@ class SheetParser(object):
extraction_kit = self.sub['extraction_kit']
# logger.debug(f"Parsing reagents for {extraction_kit}")
parser = ReagentParser(xl=self.xl, submission_type=self.submission_type,
extraction_kit=extraction_kit)
self.sub['reagents'] = [item for item in parser.parse_reagents()]
extraction_kit=extraction_kit)
self.sub['reagents'] = [reagent for reagent in parser.parse_reagents()]
def parse_samples(self):
"""
@@ -125,14 +112,14 @@ class SheetParser(object):
Calls equipment parser to pull info from the excel sheet
"""
parser = EquipmentParser(xl=self.xl, submission_type=self.submission_type)
self.sub['equipment'] = parser.parse_equipment()
self.sub['equipment'] = [equipment for equipment in parser.parse_equipment()]
def parse_tips(self):
"""
Calls tips parser to pull info from the excel sheet
"""
parser = TipParser(xl=self.xl, submission_type=self.submission_type)
self.sub['tips'] = parser.parse_tips()
self.sub['tips'] = [tip for tip in parser.parse_tips()]
def import_kit_validation_check(self):
"""
@@ -297,16 +284,18 @@ class ReagentParser(object):
Object to pull reagents from excel sheet.
"""
def __init__(self, xl: Workbook, submission_type: str, extraction_kit: str,
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, extraction_kit: str,
sub_object: BasicSubmission | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str): Type of submission expected (Wastewater, Bacterial Culture, etc.)
submission_type (str|SubmissionType): Type of submission expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (str): Extraction kit used.
sub_object (BasicSubmission | None, optional): Submission object holding methods. Defaults to None.
"""
# logger.debug("\n\nHello from ReagentParser!\n\n")
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type_obj = submission_type
self.sub_object = sub_object
if isinstance(extraction_kit, dict):
@@ -381,7 +370,7 @@ class ReagentParser(object):
check = True
if check:
yield dict(role=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment,
missing=missing)
missing=missing)
# return listo
@@ -408,7 +397,8 @@ class SampleParser(object):
self.submission_type = submission_type.name
self.submission_type_obj = submission_type
if sub_object is None:
logger.warning(f"Sample parser attempting to fetch submission class with polymorphic identity: {self.submission_type}")
logger.warning(
f"Sample parser attempting to fetch submission class with polymorphic identity: {self.submission_type}")
sub_object = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = sub_object
self.sample_info_map = self.fetch_sample_info_map(submission_type=submission_type, sample_map=sample_map)
@@ -521,18 +511,16 @@ class SampleParser(object):
new_samples.append(PydSample(**translated_dict))
return result, new_samples
def reconcile_samples(self) -> List[dict]:
def reconcile_samples(self) -> Generator[dict, None, None]:
"""
Merges sample info from lookup table and plate map.
Returns:
List[dict]: Reconciled samples
"""
# TODO: Move to pydantic validator?
if self.plate_map_samples is None or self.lookup_samples is None:
self.samples = self.lookup_samples or self.plate_map_samples
return
samples = []
merge_on_id = self.sample_info_map['lookup_table']['merge_on_id']
plate_map_samples = sorted(copy(self.plate_map_samples), key=lambda d: d['id'])
lookup_samples = sorted(copy(self.lookup_samples), key=lambda d: d[merge_on_id])
@@ -561,9 +549,11 @@ class SampleParser(object):
if not check_key_or_attr(key='submitter_id', interest=new, check_none=True):
new['submitter_id'] = psample['id']
new = self.sub_object.parse_samples(new)
samples.append(new)
samples = remove_key_from_list_of_dicts(samples, "id")
return sorted(samples, key=lambda k: (k['row'], k['column']))
del new['id']
yield new
# samples.append(new)
# samples = remove_key_from_list_of_dicts(samples, "id")
# return sorted(samples, key=lambda k: (k['row'], k['column']))
class EquipmentParser(object):
@@ -643,13 +633,13 @@ class EquipmentParser(object):
eq = Equipment.query(name=asset)
process = ws.cell(row=v['process']['row'], column=v['process']['column']).value
try:
output.append(
dict(name=eq.name, processes=[process], role=k, asset_number=eq.asset_number,
nickname=eq.nickname))
# output.append(
yield dict(name=eq.name, processes=[process], role=k, asset_number=eq.asset_number,
nickname=eq.nickname)
except AttributeError:
logger.error(f"Unable to add {eq} to list.")
# logger.debug(f"Here is the output so far: {pformat(output)}")
return output
# return output
class TipParser(object):
@@ -676,7 +666,7 @@ class TipParser(object):
Returns:
List[dict]: List of locations
"""
return {k:v for k,v in self.submission_type.construct_tips_map()}
return {k: v for k, v in self.submission_type.construct_tips_map()}
def parse_tips(self) -> List[dict]:
"""
@@ -710,12 +700,12 @@ class TipParser(object):
# logger.debug(f"asset: {asset}")
eq = Tips.query(lot=lot, name=asset, limit=1)
try:
output.append(
dict(name=eq.name, role=k, lot=lot))
# output.append(
yield dict(name=eq.name, role=k, lot=lot)
except AttributeError:
logger.error(f"Unable to add {eq} to PydTips list.")
# logger.debug(f"Here is the output so far: {pformat(output)}")
return output
# return output
class PCRParser(object):

View File

@@ -582,7 +582,7 @@ class PydSubmission(BaseModel, extra='allow'):
@field_validator("samples")
@classmethod
def assign_ids(cls, value, values):
def assign_ids(cls, value):
starting_id = SubmissionSampleAssociation.autoincrement_id()
output = []
for iii, sample in enumerate(value, start=starting_id):