Mid change in details templates

This commit is contained in:
lwark
2024-06-20 07:46:46 -05:00
parent 12e552800a
commit 337112a27d
20 changed files with 575 additions and 300 deletions

View File

@@ -147,10 +147,6 @@ class BaseClass(Base):
case _:
return query.limit(limit).all()
@classmethod
def default_info_return(cls, info, *args):
return info
def save(self):
"""
Add the object to the database and commit
@@ -191,7 +187,7 @@ class ConfigItem(BaseClass):
from .controls import *
# import order must go: orgs, kit, subs due to circular import issues
# NOTE: import order must go: orgs, kit, subs due to circular import issues
from .organizations import *
from .kits import *
from .submissions import *

View File

@@ -84,7 +84,7 @@ class ControlType(BaseClass):
Returns:
List[ControlType]: Control types that have targets
"""
return [item for item in cls.query() if item.targets != []]
return [item for item in cls.query() if item.targets]# != []]
@classmethod
def build_positive_regex(cls) -> Pattern:

View File

@@ -743,7 +743,7 @@ class SubmissionType(BaseClass):
item.equipment_role == equipment_role]
case _:
raise TypeError(f"Type {type(equipment_role)} is not allowed")
return list(set([item for items in relevant for item in items if item != None]))
return list(set([item for items in relevant for item in items if item is not None]))
def get_submission_class(self) -> "BasicSubmission":
"""
@@ -982,7 +982,7 @@ class KitTypeReagentRoleAssociation(BaseClass):
query = query.join(ReagentRole).filter(ReagentRole.name == reagent_role)
case _:
pass
if kit_type != None and reagent_role != None:
if kit_type is not None and reagent_role is not None:
limit = 1
return cls.execute_query(query=query, limit=limit)
@@ -1339,7 +1339,7 @@ class EquipmentRole(BaseClass):
if isinstance(submission_type, str):
# logger.debug(f"Checking if str {submission_type} exists")
submission_type = SubmissionType.query(name=submission_type)
if submission_type != None:
if submission_type is not None:
# logger.debug("Getting all processes for this EquipmentRole")
processes = [process for process in self.processes if submission_type in process.submission_types]
else:
@@ -1421,7 +1421,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass):
back_populates="equipmentrole_submissiontype_associations") #: associated equipment
@validates('static')
def validate_age(self, key, value):
def validate_static(self, key, value):
"""
Ensures only 1 & 0 used in 'static'
@@ -1451,7 +1451,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass):
"""
processes = [equipment.get_processes(self.submission_type) for equipment in self.equipment_role.instances]
# flatten list
processes = [item for items in processes for item in items if item != None]
processes = [item for items in processes for item in items if item is not None]
match extraction_kit:
case str():
# logger.debug(f"Filtering Processes by extraction_kit str {extraction_kit}")
@@ -1474,7 +1474,7 @@ class Process(BaseClass):
"""
id = Column(INTEGER, primary_key=True) #: Process id, primary key
name = Column(String(64)) #: Process name
name = Column(String(64), unique=True) #: Process name
submission_types = relationship("SubmissionType", back_populates='processes',
secondary=submissiontypes_processes) #: relation to SubmissionType
equipment = relationship("Equipment", back_populates='processes',
@@ -1497,7 +1497,10 @@ class Process(BaseClass):
@classmethod
@setup_lookup
def query(cls, name: str | None = None, limit: int = 0) -> Process | List[Process]:
def query(cls,
name: str | None = None,
id: int = 1,
limit: int = 0) -> Process | List[Process]:
"""
Lookup Processes
@@ -1516,9 +1519,19 @@ class Process(BaseClass):
limit = 1
case _:
pass
match id:
case int():
query = query.filter(cls.id == id)
limit = 1
case _:
pass
return cls.execute_query(query=query, limit=limit)
@check_authorization
def save(self):
super().save()
class TipRole(BaseClass):
"""
An abstract role that a tip fills during a process
@@ -1539,6 +1552,10 @@ class TipRole(BaseClass):
def __repr__(self):
return f"<TipRole({self.name})>"
@check_authorization
def save(self):
super().save()
class Tips(BaseClass):
@@ -1593,6 +1610,10 @@ class Tips(BaseClass):
case _:
pass
return cls.execute_query(query=query, limit=limit)
@check_authorization
def save(self):
super().save()
class SubmissionTypeTipRoleAssociation(BaseClass):
@@ -1609,6 +1630,10 @@ class SubmissionTypeTipRoleAssociation(BaseClass):
back_populates="submissiontype_tiprole_associations") #: associated submission
tip_role = relationship(TipRole,
back_populates="tiprole_submissiontype_associations") #: associated equipment
@check_authorization
def save(self):
super().save()
class SubmissionTipsAssociation(BaseClass):

View File

@@ -70,7 +70,6 @@ class Organization(BaseClass):
case _:
pass
return cls.execute_query(query=query, limit=limit)
# return query.first()
@check_authorization
def save(self):
@@ -117,7 +116,6 @@ class Contact(BaseClass):
Returns:
Contact|List[Contact]: Contact(s) of interest.
"""
# super().query(session)
query: Query = cls.__database_session__.query(cls)
match name:
case str():

View File

@@ -22,7 +22,7 @@ import pandas as pd
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr
from datetime import datetime, date
from typing import List, Any, Tuple, Literal
from dateutil.parser import parse
@@ -145,7 +145,6 @@ class BasicSubmission(BaseClass):
output += BasicSubmission.timestamps()
return output
# TODO: Beef up this to include info_map from DB
@classmethod
def get_default_info(cls, *args):
# NOTE: Create defaults for all submission_types
@@ -443,7 +442,7 @@ class BasicSubmission(BaseClass):
"""
# logger.debug(f"Querying Type: {submission_type}")
# logger.debug(f"Using limit: {limit}")
# use lookup function to create list of dicts
# NOTE: use lookup function to create list of dicts
subs = [item.to_dict() for item in
cls.query(submission_type=submission_type, limit=limit, chronologic=chronologic)]
# logger.debug(f"Got {len(subs)} submissions.")
@@ -498,7 +497,7 @@ class BasicSubmission(BaseClass):
case "submission_type":
field_value = SubmissionType.query(name=value)
case "sample_count":
if value == None:
if value is None:
field_value = len(self.samples)
else:
field_value = value
@@ -607,7 +606,7 @@ class BasicSubmission(BaseClass):
super().save()
@classmethod
def get_regex(cls):
def get_regex(cls) -> str:
return cls.construct_regex()
# Polymorphic functions
@@ -742,7 +741,6 @@ class BasicSubmission(BaseClass):
str: Updated name.
"""
# logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!")
# return instr
from backend.validators import RSLNamer
# logger.debug(f"instr coming into {cls}: {instr}")
# logger.debug(f"data coming into {cls}: {data}")
@@ -773,7 +771,7 @@ class BasicSubmission(BaseClass):
# logger.debug(f"After addition of plate number the plate name is: {outstr}")
try:
repeat = re.search(r"-\dR(?P<repeat>\d)?", outstr).groupdict()['repeat']
if repeat == None:
if repeat is None:
repeat = "1"
except AttributeError as e:
repeat = ""
@@ -835,6 +833,15 @@ class BasicSubmission(BaseClass):
@classmethod
def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]:
"""
Makes adjustments to samples before writing to excel.
Args:
samples (List[Any]): List of Samples
Returns:
List[Any]: Updated list of samples
"""
logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} sampler")
return samples
@@ -953,7 +960,7 @@ class BasicSubmission(BaseClass):
query = query.filter(model.submitted_date == start_date)
else:
query = query.filter(model.submitted_date.between(start_date, end_date))
# by reagent (for some reason)
# NOTE: by reagent (for some reason)
match reagent:
case str():
# logger.debug(f"Looking up BasicSubmission with reagent: {reagent}")
@@ -965,7 +972,7 @@ class BasicSubmission(BaseClass):
SubmissionSampleAssociation.reagent).filter(Reagent.lot == reagent)
case _:
pass
# by rsl number (returns only a single value)
# NOTE: by rsl number (returns only a single value)
match rsl_plate_num:
case str():
query = query.filter(model.rsl_plate_num == rsl_plate_num)
@@ -973,7 +980,7 @@ class BasicSubmission(BaseClass):
limit = 1
case _:
pass
# by id (returns only a single value)
# NOTE: by id (returns only a single value)
match id:
case int():
# logger.debug(f"Looking up BasicSubmission with id: {id}")
@@ -1051,7 +1058,7 @@ class BasicSubmission(BaseClass):
Performs backup and deletes this instance from database.
Args:
obj (_type_, optional): Parent Widget. Defaults to None.
obj (_type_, optional): Parent widget. Defaults to None.
Raises:
e: _description_
@@ -1075,7 +1082,7 @@ class BasicSubmission(BaseClass):
Creates Widget for showing submission details.
Args:
obj (_type_): parent widget
obj (_type_): Parent widget
"""
# logger.debug("Hello from details")
from frontend.widgets.submission_details import SubmissionDetails
@@ -1084,6 +1091,12 @@ class BasicSubmission(BaseClass):
pass
def edit(self, obj):
"""
Return submission to form widget for updating
Args:
obj (Widget): Parent widget
"""
from frontend.widgets.submission_widget import SubmissionFormWidget
for widg in obj.app.table_widget.formwidget.findChildren(SubmissionFormWidget):
# logger.debug(widg)
@@ -1224,9 +1237,9 @@ class BacterialCulture(BasicSubmission):
"""
from . import ControlType
input_dict = super().finalize_parse(input_dict, xl, info_map)
# build regex for all control types that have targets
# NOTE: build regex for all control types that have targets
regex = ControlType.build_positive_regex()
# search samples for match
# NOTE: search samples for match
for sample in input_dict['samples']:
matched = regex.match(sample['submitter_id'])
if bool(matched):
@@ -1311,7 +1324,7 @@ class Wastewater(BasicSubmission):
dict: Updated sample dictionary
"""
input_dict = super().custom_info_parser(input_dict)
if xl != None:
if xl is not None:
input_dict['csv'] = xl["Copy to import file"]
return input_dict
@@ -1355,7 +1368,7 @@ class Wastewater(BasicSubmission):
Extends parent
"""
try:
# Deal with PCR file.
# NOTE: Deal with PCR file.
instr = re.sub(r"PCR(-|_)", "", instr)
except (AttributeError, TypeError) as e:
logger.error(f"Problem using regex: {e}")
@@ -1413,6 +1426,15 @@ class Wastewater(BasicSubmission):
@classmethod
def finalize_details(cls, input_dict: dict) -> dict:
"""
Makes changes to information before display
Args:
input_dict (dict): Input information
Returns:
dict: Updated information
"""
input_dict = super().finalize_details(input_dict)
dummy_samples = []
for item in input_dict['samples']:
@@ -1430,11 +1452,23 @@ class Wastewater(BasicSubmission):
return input_dict
def custom_context_events(self) -> dict:
"""
Sets context events for main widget
Returns:
dict: Context menu items for this instance.
"""
events = super().custom_context_events()
events['Link PCR'] = self.link_pcr
return events
def link_pcr(self, obj):
"""
Adds PCR info to this submission
Args:
obj (_type_): Parent widget
"""
from backend.excel import PCRParser
from frontend.widgets import select_open_file
fname = select_open_file(obj=obj, file_extension="xlsx")
@@ -1562,7 +1596,7 @@ class WastewaterArtic(BasicSubmission):
"""
input_dict = super().parse_samples(input_dict)
input_dict['sample_type'] = "Wastewater Sample"
# Because generate_sample_object needs the submitter_id and the artic has the "({origin well})"
# NOTE: Because generate_sample_object needs the submitter_id and the artic has the "({origin well})"
# at the end, this has to be done here. No moving to sqlalchemy object :(
input_dict['submitter_id'] = re.sub(r"\s\(.+\)\s?$", "", str(input_dict['submitter_id'])).strip()
try:
@@ -1576,9 +1610,10 @@ class WastewaterArtic(BasicSubmission):
except KeyError:
logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}")
year = str(date.today().year)[-2:]
# if "ENC" in input_dict['submitter_id']:
# NOTE: Check for extraction negative control (Enterics)
if re.search(rf"^{year}-(ENC)", input_dict['submitter_id']):
input_dict['rsl_number'] = cls.en_adapter(input_str=input_dict['submitter_id'])
# NOTE: Check for extraction negative control (Robotics)
if re.search(rf"^{year}-(RSL)", input_dict['submitter_id']):
input_dict['rsl_number'] = cls.pbs_adapter(input_str=input_dict['submitter_id'])
return input_dict
@@ -1595,11 +1630,11 @@ class WastewaterArtic(BasicSubmission):
str: output name
"""
# logger.debug(f"input string raw: {input_str}")
# Remove letters.
# NOTE: Remove letters.
processed = input_str.replace("RSL", "")
processed = re.sub(r"\(.*\)$", "", processed).strip()
processed = re.sub(r"[A-QS-Z]+\d*", "", processed)
# Remove trailing '-' if any
# NOTE: Remove trailing '-' if any
processed = processed.strip("-")
# logger.debug(f"Processed after stripping letters: {processed}")
try:
@@ -1632,7 +1667,7 @@ class WastewaterArtic(BasicSubmission):
@classmethod
def pbs_adapter(cls, input_str):
"""
Stopgap solution because WW names their ENs different
Stopgap solution because WW names their controls different
Args:
input_str (str): input name
@@ -1641,20 +1676,13 @@ class WastewaterArtic(BasicSubmission):
str: output name
"""
# logger.debug(f"input string raw: {input_str}")
# Remove letters.
# NOTE: Remove letters.
processed = input_str.replace("RSL", "")
processed = re.sub(r"\(.*\)$", "", processed).strip()
processed = re.sub(r"[A-QS-Z]+\d*", "", processed)
# Remove trailing '-' if any
# NOTE: Remove trailing '-' if any
processed = processed.strip("-")
# logger.debug(f"Processed after stripping letters: {processed}")
# try:
# en_num = re.search(r"\-\d{1}$", processed).group()
# processed = rreplace(processed, en_num, "")
# except AttributeError:
# en_num = "1"
# en_num = en_num.strip("-")
# logger.debug(f"Processed after en_num: {processed}")
try:
plate_num = re.search(r"\-\d{1}R?\d?$", processed).group()
processed = rreplace(processed, plate_num, "")
@@ -1728,13 +1756,15 @@ class WastewaterArtic(BasicSubmission):
Workbook: Updated workbook
"""
input_excel = super().custom_info_writer(input_excel, info, backup)
logger.debug(f"Info:\n{pformat(info)}")
check = 'source_plates' in info.keys() and info['source_plates'] is not None
if check:
# logger.debug(f"Info:\n{pformat(info)}")
# NOTE: check for source plate information
# check = 'source_plates' in info.keys() and info['source_plates'] is not None
if check_key_or_attr(key='source_plates', interest=info, check_none=True):
worksheet = input_excel['First Strand List']
start_row = 8
# NOTE: write source plates to First strand list
for iii, plate in enumerate(info['source_plates']['value']):
logger.debug(f"Plate: {plate}")
# logger.debug(f"Plate: {plate}")
row = start_row + iii
try:
worksheet.cell(row=row, column=3, value=plate['plate'])
@@ -1744,41 +1774,45 @@ class WastewaterArtic(BasicSubmission):
worksheet.cell(row=row, column=4, value=plate['starting_sample'])
except TypeError:
pass
check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None
if check:
# NOTE: check for gel information
# check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None
if check_key_or_attr(key='gel_info', interest=info, check_none=True):
# logger.debug(f"Gel info check passed.")
if info['gel_info'] != None:
# logger.debug(f"Gel info not none.")
worksheet = input_excel['Egel results']
start_row = 21
start_column = 15
for row, ki in enumerate(info['gel_info']['value'], start=1):
# logger.debug(f"ki: {ki}")
# logger.debug(f"vi: {vi}")
row = start_row + row
worksheet.cell(row=row, column=start_column, value=ki['name'])
for jjj, kj in enumerate(ki['values'], start=1):
# logger.debug(f"kj: {kj}")
# logger.debug(f"vj: {vj}")
column = start_column + 2 + jjj
worksheet.cell(row=start_row, column=column, value=kj['name'])
# logger.debug(f"Writing {kj['name']} with value {kj['value']} to row {row}, column {column}")
try:
worksheet.cell(row=row, column=column, value=kj['value'])
except AttributeError:
logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}")
check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None
if check:
if info['gel_image'] != None:
worksheet = input_excel['Egel results']
# logger.debug(f"We got an image: {info['gel_image']}")
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name))
img = OpenpyxlImage(z)
img.height = 400 # insert image height in pixels as float or int (e.g. 305.5)
img.width = 600
img.anchor = 'B9'
worksheet.add_image(img)
# if info['gel_info'] is not None:
# logger.debug(f"Gel info not none.")
# NOTE: print json field gel results to Egel results
worksheet = input_excel['Egel results']
# TODO: Move all this into a seperate function?
#
start_row = 21
start_column = 15
for row, ki in enumerate(info['gel_info']['value'], start=1):
# logger.debug(f"ki: {ki}")
# logger.debug(f"vi: {vi}")
row = start_row + row
worksheet.cell(row=row, column=start_column, value=ki['name'])
for jjj, kj in enumerate(ki['values'], start=1):
# logger.debug(f"kj: {kj}")
# logger.debug(f"vj: {vj}")
column = start_column + 2 + jjj
worksheet.cell(row=start_row, column=column, value=kj['name'])
# logger.debug(f"Writing {kj['name']} with value {kj['value']} to row {row}, column {column}")
try:
worksheet.cell(row=row, column=column, value=kj['value'])
except AttributeError:
logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}")
# check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None
if check_key_or_attr(key='gel_image', interest=info, check_none=True):
# if info['gel_image'] is not None:
worksheet = input_excel['Egel results']
# logger.debug(f"We got an image: {info['gel_image']}")
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name))
img = OpenpyxlImage(z)
img.height = 400 # insert image height in pixels as float or int (e.g. 305.5)
img.width = 600
img.anchor = 'B9'
worksheet.add_image(img)
return input_excel
@classmethod
@@ -1796,55 +1830,35 @@ class WastewaterArtic(BasicSubmission):
base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates",
"gel_controls"]
base_dict['DNA Core ID'] = base_dict['dna_core_submission_number']
check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] != None
if check:
# check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] is not None
if check_key_or_attr(key='gel_info', interest=base_dict, check_none=True):
headers = [item['name'] for item in base_dict['gel_info'][0]['values']]
base_dict['headers'] = [''] * (4 - len(headers))
base_dict['headers'] += headers
# logger.debug(f"Gel info: {pformat(base_dict['headers'])}")
check = 'gel_image' in base_dict.keys() and base_dict['gel_image'] != None
if check:
# check = 'gel_image' in base_dict.keys() and base_dict['gel_image'] is not None
if check_key_or_attr(key='gel_image', interest=base_dict, check_none=True):
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
base_dict['gel_image'] = base64.b64encode(zipped.read(base_dict['gel_image'])).decode('utf-8')
return base_dict, template
def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]:
"""
Updates sample dictionaries with custom values
Args:
backup (bool, optional): Whether to perform backup. Defaults to False.
Returns:
List[dict]: Updated dictionaries
"""
# logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.")
output = []
# set_plate = None
for assoc in self.submission_sample_associations:
dicto = assoc.to_sub_dict()
# if self.source_plates is None:
# output.append(dicto)
# continue
# for item in self.source_plates:
# if assoc.sample.id is None:
# old_plate = None
# else:
# old_plate = WastewaterAssociation.query(submission=item['plate'], sample=assoc.sample, limit=1)
# if old_plate is not None:
# set_plate = old_plate.submission.rsl_plate_num
# # logger.debug(f"Dictionary: {pformat(dicto)}")
# if dicto['ww_processing_num'].startswith("NTC"):
# dicto['well'] = dicto['ww_processing_num']
# else:
# dicto['well'] = f"{row_map[old_plate.row]}{old_plate.column}"
# break
# elif dicto['ww_processing_num'].startswith("NTC"):
# dicto['well'] = dicto['ww_processing_num']
# dicto['plate_name'] = set_plate
# logger.debug(f"Here is our raw sample: {pformat(dicto)}")
output.append(dicto)
return output
# def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]:
# """
# Updates sample dictionaries with custom values
#
# Args:
# backup (bool, optional): Whether to perform backup. Defaults to False.
#
# Returns:
# List[dict]: Updated dictionaries
# """
# # logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.")
# output = []
#
# for assoc in self.submission_sample_associations:
# dicto = assoc.to_sub_dict()
# output.append(dicto)
# return output
def custom_context_events(self) -> dict:
"""
@@ -1880,7 +1894,7 @@ class WastewaterArtic(BasicSubmission):
self.gel_info = output
dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
com = dict(text=comment, name=getuser(), time=dt)
if com['text'] != None and com['text'] != "":
if com['text'] is not None and com['text'] != "":
if self.comment is not None:
self.comment.append(com)
else:
@@ -1938,7 +1952,7 @@ class BasicSample(BaseClass):
Returns:
str: new (or unchanged) submitter id
"""
if value == None:
if value is None:
return uuid.uuid4().hex.upper()
else:
return value
@@ -2334,7 +2348,7 @@ class BacterialCultureSample(BasicSample):
sample['name'] = self.submitter_id
sample['organism'] = self.organism
sample['concentration'] = self.concentration
if self.control != None:
if self.control is not None:
sample['colour'] = [0, 128, 0]
sample['tooltip'] = f"Control: {self.control.controltype.name} - {self.control.controltype.targets}"
# logger.debug(f"Done converting to {self} to dict after {time()-start}")
@@ -2480,7 +2494,7 @@ class SubmissionSampleAssociation(BaseClass):
"""
if isinstance(polymorphic_identity, dict):
polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity == None:
if polymorphic_identity is None:
output = cls
else:
try: