Debugged upgrades.

This commit is contained in:
lwark
2024-05-13 07:44:06 -05:00
parent f30f6403d6
commit 84fac23890
15 changed files with 447 additions and 487 deletions

View File

@@ -21,7 +21,7 @@ from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import check_not_nan, row_map, setup_lookup, jinja_template_loading, rreplace
from datetime import datetime, date
from typing import List, Any, Tuple
from typing import List, Any, Tuple, Literal
from dateutil.parser import parse
from dateutil.parser import ParserError
from pathlib import Path
@@ -418,11 +418,11 @@ class BasicSubmission(BaseClass):
case "samples":
for sample in value:
# logger.debug(f"Parsing {sample} to sql.")
sample, _ = sample.toSQL(submission=self)
sample, _ = sample.to_sql(submission=self)
return
case "reagents":
logger.debug(f"Reagents coming into SQL: {value}")
field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for
field_value = [reagent['value'].to_sql()[0] if isinstance(reagent, dict) else reagent.to_sql()[0] for
reagent in value]
logger.debug(f"Reagents coming out of SQL: {field_value}")
case "submission_type":
@@ -620,7 +620,7 @@ class BasicSubmission(BaseClass):
return plate_map
@classmethod
def parse_info(cls, input_dict: dict, xl: Workbook | None = None) -> dict:
def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None) -> dict:
"""
Update submission dictionary with type specific information
@@ -666,7 +666,7 @@ class BasicSubmission(BaseClass):
return input_dict
@classmethod
def custom_autofill(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
def custom_info_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
"""
Adds custom autofill methods for submission
@@ -730,7 +730,9 @@ class BasicSubmission(BaseClass):
repeat = "1"
except AttributeError as e:
repeat = ""
return re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
outstr = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
abb = cls.get_default_info('abbreviation')
return re.sub(rf"{abb}(\d)", rf"{abb}-\1", outstr)
# return outstr
@classmethod
@@ -1045,7 +1047,7 @@ class BasicSubmission(BaseClass):
logger.debug(widg)
widg.setParent(None)
pyd = self.to_pydantic(backup=True)
form = pyd.toForm(parent=obj)
form = pyd.to_form(parent=obj)
obj.app.table_widget.formwidget.layout().addWidget(form)
def add_comment(self, obj):
@@ -1112,7 +1114,7 @@ class BasicSubmission(BaseClass):
# wb = pyd.autofill_excel()
# wb = pyd.autofill_samples(wb)
# wb = pyd.autofill_equipment(wb)
writer = pyd.toWriter()
writer = pyd.to_writer()
# wb.save(filename=fname.with_suffix(".xlsx"))
writer.xl.save(filename=fname.with_suffix(".xlsx"))
@@ -1166,25 +1168,25 @@ class BacterialCulture(BasicSubmission):
plate_map.iloc[6, 0] = num2
return plate_map
@classmethod
def custom_autofill(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
"""
Stupid stopgap solution to there being an issue with the Bacterial Culture plate map. Extends parent.
Args:
input_excel (Workbook): Input openpyxl workbook
Returns:
Workbook: Updated openpyxl workbook
"""
input_excel = super().custom_autofill(input_excel)
sheet = input_excel['Plate Map']
if sheet.cell(12, 2).value == None:
sheet.cell(row=12, column=2, value="=IF(ISBLANK('Sample List'!$B42),\"\",'Sample List'!$B42)")
if sheet.cell(13, 2).value == None:
sheet.cell(row=13, column=2, value="=IF(ISBLANK('Sample List'!$B43),\"\",'Sample List'!$B43)")
input_excel["Sample List"].cell(row=15, column=2, value=getuser())
return input_excel
# @classmethod
# def custom_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
# """
# Stupid stopgap solution to there being an issue with the Bacterial Culture plate map. Extends parent.
#
# Args:
# input_excel (Workbook): Input openpyxl workbook
#
# Returns:
# Workbook: Updated openpyxl workbook
# """
# input_excel = super().custom_writer(input_excel)
# sheet = input_excel['Plate Map']
# if sheet.cell(12, 2).value == None:
# sheet.cell(row=12, column=2, value="=IF(ISBLANK('Sample List'!$B42),\"\",'Sample List'!$B42)")
# if sheet.cell(13, 2).value == None:
# sheet.cell(row=13, column=2, value="=IF(ISBLANK('Sample List'!$B43),\"\",'Sample List'!$B43)")
# input_excel["Sample List"].cell(row=15, column=2, value=getuser())
# return input_excel
@classmethod
def get_regex(cls) -> str:
@@ -1297,7 +1299,7 @@ class Wastewater(BasicSubmission):
return output
@classmethod
def parse_info(cls, input_dict: dict, xl: Workbook | None = None) -> dict:
def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None) -> dict:
"""
Update submission dictionary with type specific information. Extends parent
@@ -1307,7 +1309,7 @@ class Wastewater(BasicSubmission):
Returns:
dict: Updated sample dictionary
"""
input_dict = super().parse_info(input_dict)
input_dict = super().custom_info_parser(input_dict)
if xl != None:
input_dict['csv'] = xl["Copy to import file"]
return input_dict
@@ -1458,7 +1460,7 @@ class WastewaterArtic(BasicSubmission):
return output
@classmethod
def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict:
def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None) -> dict:
"""
Update submission dictionary with type specific information
@@ -1469,17 +1471,23 @@ class WastewaterArtic(BasicSubmission):
Returns:
dict: Updated sample dictionary
"""
input_dict = super().parse_info(input_dict)
workbook = load_workbook(xl.io, data_only=True)
ws = workbook['Egel results']
input_dict = super().custom_info_parser(input_dict)
# workbook = load_workbook(xl.io, data_only=True)
ws = xl['Egel results']
data = [ws.cell(row=ii, column=jj) for jj in range(15, 27) for ii in range(10, 18)]
data = [cell for cell in data if cell.value is not None and "NTC" in cell.value]
input_dict['gel_controls'] = [
dict(sample_id=cell.value, location=f"{row_map[cell.row - 9]}{str(cell.column - 14).zfill(2)}") for cell in
data]
ws = workbook['First Strand List']
ws = xl['First Strand List']
data = [dict(plate=ws.cell(row=ii, column=3).value, starting_sample=ws.cell(row=ii, column=4).value) for ii in
range(8, 11)]
for datum in data:
if datum['plate'] in ["None", None, ""]:
continue
else:
from backend.validators import RSLNamer
datum['plate'] = RSLNamer(filename=datum['plate'], sub_type="Wastewater").parsed_name
input_dict['source_plates'] = data
return input_dict
@@ -1493,10 +1501,13 @@ class WastewaterArtic(BasicSubmission):
instr = re.sub(r"Artic", "", instr, flags=re.IGNORECASE)
except (AttributeError, TypeError) as e:
logger.error(f"Problem using regex: {e}")
# logger.debug(f"Before RSL addition: {instr}")
instr = instr.replace("-", "")
logger.debug(f"Before RSL addition: {instr}")
try:
instr = instr.replace("-", "")
except AttributeError:
instr = date.today().strftime("%Y%m%d")
instr = re.sub(r"^(\d{6})", f"RSL-AR-\\1", instr)
# logger.debug(f"name coming out of Artic namer: {instr}")
logger.debug(f"name coming out of Artic namer: {instr}")
outstr = super().enforce_name(instr=instr, data=data)
return outstr
@@ -1527,8 +1538,12 @@ class WastewaterArtic(BasicSubmission):
del input_dict['sample_name_(ww)']
except KeyError:
logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}")
if "ENC" in input_dict['submitter_id']:
input_dict['submitter_id'] = cls.en_adapter(input_str=input_dict['submitter_id'])
year = str(date.today().year)[-2:]
# if "ENC" in input_dict['submitter_id']:
if re.search(rf"^{year}-(ENC)", input_dict['submitter_id']):
input_dict['rsl_number'] = cls.en_adapter(input_str=input_dict['submitter_id'])
if re.search(rf"^{year}-(RSL)", input_dict['submitter_id']):
input_dict['rsl_number'] = cls.pbs_adapter(input_str=input_dict['submitter_id'])
return input_dict
@classmethod
@@ -1543,8 +1558,10 @@ class WastewaterArtic(BasicSubmission):
str: output name
"""
logger.debug(f"input string raw: {input_str}")
# Remove letters.
processed = re.sub(r"[A-QS-Z]+\d*", "", input_str)
# Remove letters.
processed = input_str.replace("RSL", "")
processed = re.sub(r"\(.*\)$", "", processed).strip()
processed = re.sub(r"[A-QS-Z]+\d*", "", processed)
# Remove trailing '-' if any
processed = processed.strip("-")
logger.debug(f"Processed after stripping letters: {processed}")
@@ -1554,7 +1571,7 @@ class WastewaterArtic(BasicSubmission):
except AttributeError:
en_num = "1"
en_num = en_num.strip("-")
logger.debug(f"Processed after en-num: {processed}")
logger.debug(f"Processed after en_num: {processed}")
try:
plate_num = re.search(r"\-\d{1}R?\d?$", processed).group()
processed = rreplace(processed, plate_num, "")
@@ -1571,7 +1588,58 @@ class WastewaterArtic(BasicSubmission):
logger.debug(f"Processed after month: {processed}")
year = re.search(r'^(?:\d{2})?\d{2}', processed).group()
year = f"20{year}"
final_en_name = f"EN{year}{month}{day}-{en_num}"
final_en_name = f"EN{en_num}-{year}{month}{day}"
logger.debug(f"Final EN name: {final_en_name}")
return final_en_name
@classmethod
def pbs_adapter(cls, input_str):
"""
Stopgap solution because WW names their ENs different
Args:
input_str (str): input name
Returns:
str: output name
"""
logger.debug(f"input string raw: {input_str}")
# Remove letters.
processed = input_str.replace("RSL", "")
processed = re.sub(r"\(.*\)$", "", processed).strip()
processed = re.sub(r"[A-QS-Z]+\d*", "", processed)
# Remove trailing '-' if any
processed = processed.strip("-")
logger.debug(f"Processed after stripping letters: {processed}")
# try:
# en_num = re.search(r"\-\d{1}$", processed).group()
# processed = rreplace(processed, en_num, "")
# except AttributeError:
# en_num = "1"
# en_num = en_num.strip("-")
# logger.debug(f"Processed after en_num: {processed}")
try:
plate_num = re.search(r"\-\d{1}R?\d?$", processed).group()
processed = rreplace(processed, plate_num, "")
except AttributeError:
plate_num = "1"
plate_num = plate_num.strip("-")
logger.debug(f"Plate num: {plate_num}")
repeat_num = re.search(r"R(?P<repeat>\d)?$", "PBS20240426-2R").groups()[0]
if repeat_num is None and "R" in plate_num:
repeat_num = "1"
plate_num = re.sub(r"R", rf"R{repeat_num}", plate_num)
logger.debug(f"Processed after plate-num: {processed}")
day = re.search(r"\d{2}$", processed).group()
processed = rreplace(processed, day, "")
logger.debug(f"Processed after day: {processed}")
month = re.search(r"\d{2}$", processed).group()
processed = rreplace(processed, month, "")
processed = processed.replace("--", "")
logger.debug(f"Processed after month: {processed}")
year = re.search(r'^(?:\d{2})?\d{2}', processed).group()
year = f"20{year}"
final_en_name = f"PBS{year}{month}{day}-{plate_num}"
logger.debug(f"Final EN name: {final_en_name}")
return final_en_name
@@ -1600,11 +1668,17 @@ class WastewaterArtic(BasicSubmission):
dict: Updated parser product.
"""
input_dict = super().finalize_parse(input_dict, xl, info_map)
input_dict['csv'] = xl.parse("hitpicks_csv_to_export")
logger.debug(f"Incoming input_dict: {pformat(input_dict)}")
# TODO: Move to validator?
for sample in input_dict['samples']:
logger.debug(f"Sample: {sample}")
if re.search(r"^NTC", sample['submitter_id']):
sample['submitter_id'] = sample['submitter_id'] + "-WWG-" + input_dict['rsl_plate_num']['value']
input_dict['csv'] = xl["hitpicks_csv_to_export"]
return input_dict
@classmethod
def custom_autofill(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
def custom_info_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook:
"""
Adds custom autofill methods for submission. Extends Parent
@@ -1616,10 +1690,10 @@ class WastewaterArtic(BasicSubmission):
Returns:
Workbook: Updated workbook
"""
input_excel = super().custom_autofill(input_excel, info, backup)
worksheet = input_excel["First Strand List"]
samples = cls.query(rsl_number=info['rsl_plate_num']['value']).submission_sample_associations
samples = sorted(samples, key=attrgetter('column', 'row'))
input_excel = super().custom_info_writer(input_excel, info, backup)
# worksheet = input_excel["First Strand List"]
# samples = cls.query(rsl_number=info['rsl_plate_num']['value']).submission_sample_associations
# samples = sorted(samples, key=attrgetter('column', 'row'))
logger.debug(f"Info:\n{pformat(info)}")
check = 'source_plates' in info.keys() and info['source_plates'] is not None
if check:
@@ -1708,15 +1782,22 @@ class WastewaterArtic(BasicSubmission):
"""
logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.")
output = []
set_plate = None
for assoc in self.submission_sample_associations:
dicto = assoc.to_sub_dict()
old_sub = assoc.sample.get_previous_ww_submission(current_artic_submission=self)
try:
dicto['plate_name'] = old_sub.rsl_plate_num
except AttributeError:
dicto['plate_name'] = ""
old_assoc = WastewaterAssociation.query(submission=old_sub, sample=assoc.sample, limit=1)
dicto['well'] = f"{row_map[old_assoc.row]}{old_assoc.column}"
# old_sub = assoc.sample.get_previous_ww_submission(current_artic_submission=self)
# try:
# dicto['plate_name'] = old_sub.rsl_plate_num
# except AttributeError:
# dicto['plate_name'] = ""
# old_assoc = WastewaterAssociation.query(submission=old_sub, sample=assoc.sample, limit=1)
# dicto['well'] = f"{row_map[old_assoc.row]}{old_assoc.column}"
for item in self.source_plates:
old_plate = WastewaterAssociation.query(submission=item['plate'], sample=assoc.sample, limit=1)
if old_plate is not None:
set_plate = old_plate.submission.rsl_plate_num
break
dicto['plate_name'] = set_plate
output.append(dicto)
return output
@@ -1892,6 +1973,10 @@ class BasicSample(BaseClass):
logger.info(f"Recruiting model: {model}")
return model
@classmethod
def sql_enforcer(cls, pyd_sample:"PydSample"):
return pyd_sample
@classmethod
def parse_sample(cls, input_dict: dict) -> dict:
f"""
@@ -1996,15 +2081,16 @@ class BasicSample(BaseClass):
disallowed = ["id"]
if kwargs == {}:
raise ValueError("Need to narrow down query or the first available instance will be returned.")
for key in kwargs.keys():
if key in disallowed:
raise ValueError(
f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects.")
# for key in kwargs.keys():
# if key in disallowed:
# raise ValueError(
# f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects.")
sanitized_kwargs = {k:v for k,v in kwargs.items() if k not in disallowed}
instance = cls.query(sample_type=sample_type, limit=1, **kwargs)
logger.debug(f"Retrieved instance: {instance}")
if instance == None:
used_class = cls.find_polymorphic_subclass(attrs=kwargs, polymorphic_identity=sample_type)
instance = used_class(**kwargs)
if instance is None:
used_class = cls.find_polymorphic_subclass(attrs=sanitized_kwargs, polymorphic_identity=sample_type)
instance = used_class(**sanitized_kwargs)
instance.sample_type = sample_type
logger.debug(f"Creating instance: {instance}")
return instance
@@ -2061,7 +2147,7 @@ class WastewaterSample(BasicSample):
dict: well location and name (sample id, organism) NOTE: keys must sync with WWSample to_sub_dict above
"""
sample = super().to_sub_dict(full_data=full_data)
sample['WW Processing Number'] = self.ww_processing_num
sample['WW Processing Num'] = self.ww_processing_num
sample['Sample Location'] = self.sample_location
sample['Received Date'] = self.received_date
sample['Collection Date'] = self.collection_date
@@ -2080,14 +2166,17 @@ class WastewaterSample(BasicSample):
"""
output_dict = super().parse_sample(input_dict)
logger.debug(f"Initial sample dict: {pformat(output_dict)}")
disallowed = ["", None, "None"]
try:
check = output_dict['rsl_number'] in [None, "None"]
except KeyError:
check = True
if check:
output_dict['rsl_number'] = "RSL-WW-" + output_dict['ww_processing_number']
if output_dict['ww_full_sample_id'] is not None:
output_dict['rsl_number'] = "RSL-WW-" + output_dict['ww_processing_num']
if output_dict['ww_full_sample_id'] is not None and output_dict["submitter_id"] in disallowed:
output_dict["submitter_id"] = output_dict['ww_full_sample_id']
# if re.search(r"^NTC", output_dict['submitter_id']):
# output_dict['submitter_id'] = "Artic-" + output_dict['submitter_id']
# Ad hoc repair method for WW (or possibly upstream) not formatting some dates properly.
# NOTE: Should be handled by validator.
# match output_dict['collection_date']:
@@ -2166,7 +2255,7 @@ class SubmissionSampleAssociation(BaseClass):
submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission
row = Column(INTEGER, primary_key=True) #: row on the 96 well plate
column = Column(INTEGER, primary_key=True) #: column on the 96 well plate
submission_rank = Column(INTEGER, nullable=False, default=1) #: Location in sample list
submission_rank = Column(INTEGER, nullable=False, default=0) #: Location in sample list
# reference to the Submission object
submission = relationship(BasicSubmission,
@@ -2186,12 +2275,13 @@ class SubmissionSampleAssociation(BaseClass):
}
def __init__(self, submission: BasicSubmission = None, sample: BasicSample = None, row: int = 1, column: int = 1,
id: int | None = None):
id: int | None = None, submission_rank: int = 0):
self.submission = submission
self.sample = sample
self.row = row
self.column = column
if id != None:
self.submission_rank = submission_rank
if id is not None:
self.id = id
else:
self.id = self.__class__.autoincrement_id()
@@ -2257,6 +2347,7 @@ class SubmissionSampleAssociation(BaseClass):
Returns:
int: incremented id
"""
try:
return max([item.id for item in cls.query()]) + 1
except ValueError as e:
@@ -2360,7 +2451,7 @@ class SubmissionSampleAssociation(BaseClass):
association_type: str = "Basic Association",
submission: BasicSubmission | str | None = None,
sample: BasicSample | str | None = None,
# id:int|None=None,
id:int|None=None,
**kwargs) -> SubmissionSampleAssociation:
"""
Queries for an association, if none exists creates a new one.
@@ -2401,10 +2492,11 @@ class SubmissionSampleAssociation(BaseClass):
instance = cls.query(submission=submission, sample=sample, row=row, column=column, limit=1)
except StatementError:
instance = None
if instance == None:
if instance is None:
# sanitized_kwargs = {k:v for k,v in kwargs.items() if k not in ['id']}
used_cls = cls.find_polymorphic_subclass(polymorphic_identity=association_type)
# instance = used_cls(submission=submission, sample=sample, id=id, **kwargs)
instance = used_cls(submission=submission, sample=sample, **kwargs)
instance = used_cls(submission=submission, sample=sample, id=id, **kwargs)
return instance
def delete(self):