Various bug fixes for new forms.

This commit is contained in:
Landon Wark
2024-04-25 08:55:32 -05:00
parent 5466c78c3a
commit 8cc161ec56
18 changed files with 520 additions and 218 deletions

View File

@@ -3,12 +3,12 @@ Models for the main submission types.
'''
from __future__ import annotations
from getpass import getuser
import logging, uuid, tempfile, re, yaml, base64
import logging, uuid, tempfile, re, yaml, base64, sys
from zipfile import ZipFile
from tempfile import TemporaryDirectory
from reportlab.graphics.barcode import createBarcodeImageInMemory
from reportlab.graphics.shapes import Drawing
from reportlab.lib.units import mm
# from reportlab.graphics.barcode import createBarcodeImageInMemory
# from reportlab.graphics.shapes import Drawing
# from reportlab.lib.units import mm
from operator import attrgetter, itemgetter
from pprint import pformat
from . import BaseClass, Reagent, SubmissionType, KitType, Organization
@@ -18,9 +18,6 @@ from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLO
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.ext.associationproxy import association_proxy
# from sqlalchemy.ext.declarative import declared_attr
# from sqlalchemy_json import NestedMutableJson
# from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
import pandas as pd
@@ -59,9 +56,10 @@ class BasicSubmission(BaseClass):
reagents_id = Column(String, ForeignKey("_reagent.id", ondelete="SET NULL", name="fk_BS_reagents_id")) #: id of used reagents
extraction_info = Column(JSON) #: unstructured output from the extraction table logger.
run_cost = Column(FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kit costs at time of creation.
uploaded_by = Column(String(32)) #: user name of person who submitted the submission to the database.
signed_by = Column(String(32)) #: user name of person who submitted the submission to the database.
comment = Column(JSON) #: user notes
submission_category = Column(String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name
cost_centre = Column(String(64)) #: Permanent storage of used cost centre in case organization field changed in the future.
submission_sample_associations = relationship(
"SubmissionSampleAssociation",
@@ -103,12 +101,59 @@ class BasicSubmission(BaseClass):
return f"{submission_type}Submission({self.rsl_plate_num})"
@classmethod
def jsons(cls):
def jsons(cls) -> List[str]:
output = [item.name for item in cls.__table__.columns if isinstance(item.type, JSON)]
if issubclass(cls, BasicSubmission) and not cls.__name__ == "BasicSubmission":
output += BasicSubmission.jsons()
return output
@classmethod
def get_default_info(cls, *args):
# Create defaults for all submission_types
# print(args)
recover = ['filepath', 'samples', 'csv', 'comment', 'equipment']
dicto = dict(
details_ignore = ['excluded', 'reagents', 'samples',
'extraction_info', 'comment', 'barcode',
'platemap', 'export_map', 'equipment'],
form_recover = recover,
form_ignore = ['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by'] + recover,
parser_ignore = ['samples', 'signed_by'] + cls.jsons(),
excel_ignore = []
)
# Grab subtype specific info.
st = cls.get_submission_type()
if st is None:
logger.error("No default info for BasicSubmission.")
return dicto
else:
dicto['submission_type'] = st.name
output = {}
for k,v in dicto.items():
if len(args) > 0 and k not in args:
logger.debug(f"Don't want {k}")
continue
else:
output[k] = v
for k,v in st.defaults.items():
if len(args) > 0 and k not in args:
logger.debug(f"Don't want {k}")
continue
else:
match v:
case list():
output[k] += v
case _:
output[k] = v
if len(args) == 1:
return output[args[0]]
return output
@classmethod
def get_submission_type(cls):
name = cls.__mapper_args__['polymorphic_identity']
return SubmissionType.query(name=name)
def to_dict(self, full_data:bool=False, backup:bool=False, report:bool=False) -> dict:
"""
Constructs dictionary used in submissions summary
@@ -168,6 +213,11 @@ class BasicSubmission(BaseClass):
logger.debug(f"Attempting reagents.")
try:
reagents = [item.to_sub_dict(extraction_kit=self.extraction_kit) for item in self.submission_reagent_associations]
for k in self.extraction_kit.construct_xl_map_for_use(self.submission_type):
if k == 'info':
continue
if not any([item['type']==k for item in reagents]):
reagents.append(dict(type=k, name="Not Applicable", lot="NA", expiry=date(year=1970, month=1, day=1), missing=True))
except Exception as e:
logger.error(f"We got an error retrieving reagents: {e}")
reagents = None
@@ -181,10 +231,12 @@ class BasicSubmission(BaseClass):
except Exception as e:
logger.error(f"Error setting equipment: {e}")
equipment = None
cost_centre = self.cost_centre
else:
reagents = None
samples = None
equipment = None
cost_centre = None
# logger.debug("Getting comments")
try:
comments = self.comment
@@ -198,6 +250,8 @@ class BasicSubmission(BaseClass):
output["extraction_info"] = ext_info
output["comment"] = comments
output["equipment"] = equipment
output["Cost Centre"] = cost_centre
output["Signed By"] = self.signed_by
return output
def calculate_column_count(self) -> int:
@@ -293,18 +347,18 @@ class BasicSubmission(BaseClass):
"""
return [item.role for item in self.submission_equipment_associations]
def make_plate_barcode(self, width:int=100, height:int=25) -> Drawing:
"""
Creates a barcode image for this BasicSubmission.
# def make_plate_barcode(self, width:int=100, height:int=25) -> Drawing:
# """
# Creates a barcode image for this BasicSubmission.
Args:
width (int, optional): Width (pixels) of image. Defaults to 100.
height (int, optional): Height (pixels) of image. Defaults to 25.
# Args:
# width (int, optional): Width (pixels) of image. Defaults to 100.
# height (int, optional): Height (pixels) of image. Defaults to 25.
Returns:
Drawing: image object
"""
return createBarcodeImageInMemory('Code128', value=self.rsl_plate_num, width=width*mm, height=height*mm, humanReadable=True, format="png")
# Returns:
# Drawing: image object
# """
# return createBarcodeImageInMemory('Code128', value=self.rsl_plate_num, width=width*mm, height=height*mm, humanReadable=True, format="png")
@classmethod
def submissions_to_df(cls, submission_type:str|None=None, limit:int=0) -> pd.DataFrame:
@@ -384,13 +438,19 @@ class BasicSubmission(BaseClass):
case item if item in self.jsons():
logger.debug(f"Setting JSON attribute.")
existing = self.__getattribute__(key)
if value == "" or value is None or value == 'null':
logger.error(f"No value given, not setting.")
return
if existing is None:
existing = []
if value in existing:
logger.warning("Value already exists. Preventing duplicate addition.")
return
else:
existing.append(value)
if isinstance(value, list):
existing += value
else:
existing.append(value)
self.__setattr__(key, existing)
flag_modified(self, key)
return
@@ -634,7 +694,7 @@ class BasicSubmission(BaseClass):
from backend.validators import RSLNamer
logger.debug(f"instr coming into {cls}: {instr}")
logger.debug(f"data coming into {cls}: {data}")
defaults = cls.get_default_info()
defaults = cls.get_default_info("abbreviation", "submission_type")
data['abbreviation'] = defaults['abbreviation']
if 'submission_type' not in data.keys() or data['submission_type'] in [None, ""]:
data['submission_type'] = defaults['submission_type']
@@ -737,9 +797,7 @@ class BasicSubmission(BaseClass):
Returns:
Tuple(dict, Template): (Updated dictionary, Template to be rendered)
"""
base_dict['excluded'] = ['excluded', 'reagents', 'samples', 'controls',
'extraction_info', 'pcr_info', 'comment',
'barcode', 'platemap', 'export_map', 'equipment']
base_dict['excluded'] = cls.get_default_info('details_ignore')
env = jinja_template_loading()
temp_name = f"{cls.__name__.lower()}_details.html"
logger.debug(f"Returning template: {temp_name}")
@@ -1067,9 +1125,9 @@ class BacterialCulture(BasicSubmission):
output['controls'] = [item.to_sub_dict() for item in self.controls]
return output
@classmethod
def get_default_info(cls) -> dict:
return dict(abbreviation="BC", submission_type="Bacterial Culture")
# @classmethod
# def get_default_info(cls) -> dict:
# return dict(abbreviation="BC", submission_type="Bacterial Culture")
@classmethod
def custom_platemap(cls, xl: pd.ExcelFile, plate_map: pd.DataFrame) -> pd.DataFrame:
@@ -1214,13 +1272,19 @@ class Wastewater(BasicSubmission):
output['pcr_info'] = self.pcr_info
except TypeError as e:
pass
ext_tech = self.ext_technician or self.technician
pcr_tech = self.pcr_technician or self.technician
output['Technician'] = f"Enr: {self.technician}, Ext: {ext_tech}, PCR: {pcr_tech}"
if self.ext_technician is None or self.ext_technician == "None":
output['Ext Technician'] = self.technician
else:
output["Ext Technician"] = self.ext_technician
if self.pcr_technician is None or self.pcr_technician == "None":
output["PCR Technician"] = self.technician
else:
output['PCR Technician'] = self.pcr_technician
# output['Technician'] = self.technician}, Ext: {ext_tech}, PCR: {pcr_tech}"
return output
@classmethod
def get_default_info(cls) -> dict:
# @classmethod
# def get_default_info(cls) -> dict:
return dict(abbreviation="WW", submission_type="Wastewater")
@classmethod
@@ -1334,36 +1398,6 @@ class Wastewater(BasicSubmission):
from frontend.widgets import select_open_file
fname = select_open_file(obj=obj, file_extension="xlsx")
parser = PCRParser(filepath=fname)
# Check if PCR info already exists
# if hasattr(self, 'pcr_info') and self.pcr_info != None:
# # existing = json.loads(sub.pcr_info)
# existing = self.pcr_info
# logger.debug(f"Found existing pcr info: {pformat(self.pcr_info)}")
# else:
# existing = None
# if existing != None:
# # update pcr_info
# try:
# logger.debug(f"Updating {type(existing)}:\n {pformat(existing)} with {type(parser.pcr)}:\n {pformat(parser.pcr)}")
# # if json.dumps(parser.pcr) not in sub.pcr_info:
# if parser.pcr not in self.pcr_info:
# logger.debug(f"This is new pcr info, appending to existing")
# existing.append(parser.pcr)
# else:
# logger.debug("This info already exists, skipping.")
# # logger.debug(f"Setting {self.rsl_plate_num} PCR to:\n {pformat(existing)}")
# # sub.pcr_info = json.dumps(existing)
# self.pcr_info = existing
# except TypeError:
# logger.error(f"Error updating!")
# # sub.pcr_info = json.dumps([parser.pcr])
# self.pcr_info = [parser.pcr]
# logger.debug(f"Final pcr info for {self.rsl_plate_num}:\n {pformat(self.pcr_info)}")
# else:
# # sub.pcr_info = json.dumps([parser.pcr])
# self.pcr_info = [parser.pcr]
# # logger.debug(f"Existing {type(self.pcr_info)}: {self.pcr_info}")
# # logger.debug(f"Inserting {type(parser.pcr)}: {parser.pcr}")
self.set_attribute("pcr_info", parser.pcr)
self.save(original=False)
logger.debug(f"Got {len(parser.samples)} samples to update!")
@@ -1390,7 +1424,6 @@ class WastewaterArtic(BasicSubmission):
gel_controls = Column(JSON) #: locations of controls on the gel
source_plates = Column(JSON) #: wastewater plates that samples come from
__mapper_args__ = dict(polymorphic_identity="Wastewater Artic",
polymorphic_load="inline",
inherit_condition=(id == BasicSubmission.id))
@@ -1411,9 +1444,9 @@ class WastewaterArtic(BasicSubmission):
output['source_plates'] = self.source_plates
return output
@classmethod
def get_default_info(cls) -> str:
return dict(abbreviation="AR", submission_type="Wastewater Artic")
# @classmethod
# def get_default_info(cls) -> str:
# return dict(abbreviation="AR", submission_type="Wastewater Artic")
@classmethod
def parse_info(cls, input_dict:dict, xl:pd.ExcelFile|None=None) -> dict:
@@ -1429,12 +1462,16 @@ class WastewaterArtic(BasicSubmission):
"""
# from backend.validators import RSLNamer
input_dict = super().parse_info(input_dict)
ws = load_workbook(xl.io, data_only=True)['Egel results']
data = [ws.cell(row=jj,column=ii) for ii in range(15,27) for jj in range(10,18)]
workbook = load_workbook(xl.io, data_only=True)
ws = workbook['Egel results']
data = [ws.cell(row=ii,column=jj) for jj in range(15,27) for ii in range(10,18)]
data = [cell for cell in data if cell.value is not None and "NTC" in cell.value]
input_dict['gel_controls'] = [dict(sample_id=cell.value, location=f"{row_map[cell.row-9]}{str(cell.column-14).zfill(2)}") for cell in data]
# df = xl.parse("Egel results").iloc[7:16, 13:26]
# df = df.set_index(df.columns[0])
ws = workbook['First Strand List']
data = [dict(plate=ws.cell(row=ii, column=3).value, starting_sample=ws.cell(row=ii, column=4).value) for ii in range(8,11)]
input_dict['source_plates'] = data
return input_dict
@classmethod
@@ -1500,34 +1537,38 @@ class WastewaterArtic(BasicSubmission):
Returns:
str: output name
"""
logger.debug(f"input string raw: {input_str}")
# Remove letters.
processed = re.sub(r"[A-Z]", "", input_str)
processed = re.sub(r"[A-QS-Z]+\d*", "", input_str)
# Remove trailing '-' if any
processed = processed.strip("-")
logger.debug(f"Processed after stripping letters: {processed}")
try:
en_num = re.search(r"\-\d{1}$", processed).group()
processed = rreplace(processed, en_num, "")
except AttributeError:
en_num = "1"
en_num = en_num.strip("-")
# logger.debug(f"Processed after en-num: {processed}")
logger.debug(f"Processed after en-num: {processed}")
try:
plate_num = re.search(r"\-\d{1}$", processed).group()
plate_num = re.search(r"\-\d{1}R?\d?$", processed).group()
processed = rreplace(processed, plate_num, "")
except AttributeError:
plate_num = "1"
plate_num = plate_num.strip("-")
# logger.debug(f"Processed after plate-num: {processed}")
logger.debug(f"Processed after plate-num: {processed}")
day = re.search(r"\d{2}$", processed).group()
processed = rreplace(processed, day, "")
# logger.debug(f"Processed after day: {processed}")
logger.debug(f"Processed after day: {processed}")
month = re.search(r"\d{2}$", processed).group()
processed = rreplace(processed, month, "")
processed = processed.replace("--", "")
# logger.debug(f"Processed after month: {processed}")
logger.debug(f"Processed after month: {processed}")
year = re.search(r'^(?:\d{2})?\d{2}', processed).group()
year = f"20{year}"
return f"EN{year}{month}{day}-{en_num}"
final_en_name = f"EN{year}{month}{day}-{en_num}"
logger.debug(f"Final EN name: {final_en_name}")
return final_en_name
@classmethod
def get_regex(cls) -> str:
@@ -1599,7 +1640,23 @@ class WastewaterArtic(BasicSubmission):
# for jjj, value in enumerate(plate, start=3):
# worksheet.cell(row=iii, column=jjj, value=value)
logger.debug(f"Info:\n{pformat(info)}")
check = 'gel_info' in info.keys() and info['gel_info']['value'] != None
check = 'source_plates' in info.keys() and info['source_plates'] is not None
if check:
worksheet = input_excel['First Strand List']
start_row = 8
for iii, plate in enumerate(info['source_plates']['value']):
logger.debug(f"Plate: {plate}")
row = start_row + iii
try:
worksheet.cell(row=row, column=3, value=plate['plate'])
except TypeError:
pass
try:
worksheet.cell(row=row, column=4, value=plate['starting_sample'])
except TypeError:
pass
# sys.exit(f"Hardcoded stop: backend.models.submissions:1629")
check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None
if check:
# logger.debug(f"Gel info check passed.")
if info['gel_info'] != None:
@@ -1618,7 +1675,7 @@ class WastewaterArtic(BasicSubmission):
column = start_column + 2 + jjj
worksheet.cell(row=start_row, column=column, value=kj['name'])
worksheet.cell(row=row, column=column, value=kj['value'])
check = 'gel_image' in info.keys() and info['gel_image']['value'] != None
check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None
if check:
if info['gel_image'] != None:
worksheet = input_excel['Egel results']