Mid code cleanup

This commit is contained in:
lwark
2024-09-26 08:26:37 -05:00
parent 4c7b737326
commit f4e930c64e
9 changed files with 135 additions and 94 deletions

View File

@@ -6,11 +6,10 @@ import sys, logging
from sqlalchemy import Column, INTEGER, String, JSON
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.exc import ArgumentError, IntegrityError as sqlalcIntegrityError
from sqlalchemy.exc import ArgumentError
from typing import Any, List
from pathlib import Path
from tools import report_result
# from sqlite3 import IntegrityError as sqliteIntegrityError
# Load testing environment
if 'pytest' in sys.modules:
@@ -155,7 +154,7 @@ class BaseClass(Base):
return query.limit(limit).all()
@report_result
def save(self):
def save(self) -> Report | None:
"""
Add the object to the database and commit
"""
@@ -167,21 +166,8 @@ class BaseClass(Base):
except Exception as e:
logger.critical(f"Problem saving object: {e}")
logger.error(f"Error message: {type(e)}")
# match e:
# case sqlalcIntegrityError():
# origin = e.orig.__str__().lower()
# logger.error(f"Exception origin: {origin}")
# if "unique constraint failed:" in origin:
# field = " ".join(origin.split(".")[1:]).replace("_", " ").upper()
# # logger.debug(field)
# msg = f"{field} doesn't have a unique value.\nIt must be changed."
# else:
# msg = f"Got unknown integrity error: {e}"
# case _:
# msg = f"Got generic error: {e}"
self.__database_session__.rollback()
report.add_result(Result(msg=e, status="Critical"))
return report
class ConfigItem(BaseClass):
@@ -192,8 +178,8 @@ class ConfigItem(BaseClass):
key = Column(String(32)) #: Name of the configuration item.
value = Column(JSON) #: Value associated with the config item.
def __repr__(self):
return f"ConfigItem({self.key} : {self.value})"
def __repr__(self) -> str:
return f"<ConfigItem({self.key} : {self.value})>"
@classmethod
def get_config_items(cls, *args) -> ConfigItem | List[ConfigItem]:

View File

@@ -5,17 +5,14 @@ from __future__ import annotations
import datetime
import json
from pprint import pprint, pformat
from pprint import pformat
import yaml
from jinja2 import TemplateNotFound
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date
import logging, re
from tools import check_authorization, setup_lookup, Report, Result, jinja_template_loading, check_regex_match, \
yaml_regex_creator
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator
from typing import List, Literal, Generator, Any
from pandas import ExcelFile
from pathlib import Path
@@ -204,7 +201,6 @@ class KitType(BaseClass):
yield assoc.reagent_role.name, assoc.uses
except TypeError:
continue
# return info_map
@classmethod
@setup_lookup
@@ -267,17 +263,20 @@ class KitType(BaseClass):
for k, v in self.construct_xl_map_for_use(submission_type=submission_type):
# logger.debug(f"Value: {v}")
try:
assoc = [item for item in self.kit_reagentrole_associations if item.reagent_role.name == k][0]
except IndexError as e:
# assoc = [item for item in self.kit_reagentrole_associations if item.reagent_role.name == k][0]
assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == k)
except StopIteration as e:
continue
for kk, vv in assoc.to_export_dict().items():
v[kk] = vv
base_dict['reagent roles'].append(v)
for k, v in submission_type.construct_equipment_map():
try:
assoc = [item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k][0]
except IndexError:
# assoc = [item for item in submission_type.submissiontype_equipmentrole_associations if
# item.equipment_role.name == k][0]
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k)
except StopIteration:
continue
for kk, vv in assoc.to_export_dict(kit_type=self).items():
# logger.debug(f"{kk}:{vv}")
@@ -858,10 +857,7 @@ class SubmissionType(BaseClass):
base_dict = dict(name=self.name)
base_dict['info'] = self.construct_info_map(mode='export')
base_dict['defaults'] = self.defaults
# base_dict['excel location maps']['kits'] = [{k: v for k, v in item.kit_type.construct_xl_map_for_use(submission_type=self)} for item in
# self.submissiontype_kit_associations]
base_dict['samples'] = self.construct_sample_map()
# base_dict['excel location maps']['equipment_roles'] = {k: v for k, v in self.construct_equipment_map()}
base_dict['kits'] = [item.to_export_dict() for item in self.submissiontype_kit_associations]
return base_dict
@@ -878,7 +874,6 @@ class SubmissionType(BaseClass):
yaml.add_constructor("!regex", yaml_regex_creator)
if isinstance(filepath, str):
filepath = Path(filepath)
with open(filepath, "r") as f:
if filepath.suffix == ".json":
import_dict = json.load(fp=f)
@@ -949,7 +944,7 @@ class SubmissionType(BaseClass):
new_process.equipment_roles.append(new_role)
if 'orgs' in import_dict.keys():
logger.info("Found Organizations to be imported.")
Organization.import_from_json(filepath=filepath)
Organization.import_from_yml(filepath=filepath)
return submission_type
@@ -1031,6 +1026,12 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
return cls.execute_query(query=query, limit=limit)
def to_export_dict(self):
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related kittype
"""
exclude = ['_sa_instance_state', 'submission_types_id', 'kits_id', 'submission_type', 'kit_type']
base_dict = {k: v for k, v in self.__dict__.items() if k not in exclude}
base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type)
@@ -1150,7 +1151,13 @@ class KitTypeReagentRoleAssociation(BaseClass):
limit = 1
return cls.execute_query(query=query, limit=limit)
def to_export_dict(self):
def to_export_dict(self) -> dict:
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related reagent role
"""
base_dict = {}
base_dict['required'] = self.required
for k, v in self.reagent_role.to_export_dict().items():
@@ -1418,6 +1425,15 @@ class Equipment(BaseClass):
@classmethod
def assign_equipment(cls, equipment_role: EquipmentRole|str) -> List[Equipment]:
"""
Creates a list of equipment from user input to be used in Submission Type creation
Args:
equipment_role (EquipmentRole): Equipment role to be added to.
Returns:
List[Equipment]: User selected equipment.
"""
if isinstance(equipment_role, str):
equipment_role = EquipmentRole.query(name=equipment_role)
equipment = cls.query()
@@ -1468,16 +1484,7 @@ class EquipmentRole(BaseClass):
Returns:
dict: This EquipmentRole dict
"""
# output = {}
return {key: value for key, value in self.__dict__.items() if key != "processes"}
# match key:
# case "processes":
# pass
# case _:
# value = value
# yield key, value
# # output[key] = value
# return output
def to_pydantic(self, submission_type: SubmissionType,
extraction_kit: str | KitType | None = None) -> "PydEquipmentRole":
@@ -1568,6 +1575,12 @@ class EquipmentRole(BaseClass):
return output
def to_export_dict(self, submission_type: SubmissionType, kit_type: KitType):
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related reagent role
"""
base_dict = {}
base_dict['role'] = self.name
base_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)

View File

@@ -79,7 +79,16 @@ class Organization(BaseClass):
@classmethod
@check_authorization
def import_from_json(cls, filepath: Path|str):
def import_from_yml(cls, filepath: Path | str):
"""
An ambitious project to create a Organization from a yml file
Args:
filepath (Path): Filepath of the yml.
Returns:
"""
if isinstance(filepath, str):
filepath = Path(filepath)
if not filepath.exists():

View File

@@ -12,7 +12,7 @@ from zipfile import ZipFile
from tempfile import TemporaryDirectory, TemporaryFile
from operator import itemgetter
from pprint import pformat
from . import BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, Tips
from . import BaseClass, Reagent, SubmissionType, KitType, Organization, Contact
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified
@@ -24,7 +24,8 @@ import pandas as pd
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \
report_result
from datetime import datetime, date
from typing import List, Any, Tuple, Literal
from dateutil.parser import parse
@@ -160,7 +161,18 @@ class BasicSubmission(BaseClass):
return output
@classmethod
def get_default_info(cls, *args, submission_type: SubmissionType | None = None):
def get_default_info(cls, *args, submission_type: SubmissionType | None = None) -> dict:
"""
Gets default info from the database for a given submission type.
Args:
*args (): List of fields to get
submission_type (SubmissionType): the submission type of interest. Necessary due to generic submission types.
Returns:
dict: Default info
"""
# NOTE: Create defaults for all submission_types
parent_defs = super().get_default_info()
recover = ['filepath', 'samples', 'csv', 'comment', 'equipment']
@@ -230,6 +242,7 @@ class BasicSubmission(BaseClass):
case _:
return SubmissionType.query(cls.__mapper_args__['polymorphic_identity'])
@classmethod
def construct_info_map(cls, submission_type: SubmissionType | None = None,
mode: Literal["read", "write"] = "read") -> dict:
@@ -408,8 +421,10 @@ class BasicSubmission(BaseClass):
except Exception as e:
logger.error(f"Column count error: {e}")
# NOTE: Get kit associated with this submission
assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if
item.submission_type == self.submission_type][0]
# assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if
# item.submission_type == self.submission_type][0]
assoc = next((item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type),
None)
# logger.debug(f"Came up with association: {assoc}")
# NOTE: If every individual cost is 0 this is probably an old plate.
if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]):
@@ -453,8 +468,9 @@ class BasicSubmission(BaseClass):
for column in range(1, plate_columns + 1):
for row in range(1, plate_rows + 1):
try:
well = [item for item in sample_list if item['row'] == row and item['column'] == column][0]
except IndexError:
# well = [item for item in sample_list if item['row'] == row and item['column'] == column][0]
well = next(item for item in sample_list if item['row'] == row and item['column'] == column)
except StopIteration:
well = dict(name="", row=row, column=column, background_color="#ffffff")
output_samples.append(well)
env = jinja_template_loading()
@@ -498,7 +514,7 @@ class BasicSubmission(BaseClass):
'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls',
'source_plates', 'pcr_technician', 'ext_technician', 'artic_technician', 'cost_centre',
'signed_by', 'artic_date', 'gel_barcode', 'gel_date', 'ngs_date', 'contact_phone', 'contact',
'tips', 'gel_image_path']
'tips', 'gel_image_path', 'custom']
for item in excluded:
try:
df = df.drop(item, axis=1)
@@ -553,7 +569,9 @@ class BasicSubmission(BaseClass):
return
case item if item in self.jsons():
match value:
case list():
case dict():
existing = value
case _:
# logger.debug(f"Setting JSON attribute.")
existing = self.__getattribute__(key)
if value is None or value in ['', 'null']:
@@ -573,8 +591,6 @@ class BasicSubmission(BaseClass):
existing = value
else:
existing.append(value)
case _:
existing = value
self.__setattr__(key, existing)
flag_modified(self, key)
return
@@ -600,7 +616,13 @@ class BasicSubmission(BaseClass):
Returns:
Result: _description_
"""
assoc = [item for item in self.submission_sample_associations if item.sample == sample][0]
# assoc = [item for item in self.submission_sample_associations if item.sample == sample][0]
try:
assoc = next(item for item in self.submission_sample_associations if item.sample == sample)
except StopIteration:
report = Report()
report.add_result(Result(msg=f"Couldn't find submission sample association for {sample.submitter_id}", status="Warning"))
return report
for k, v in input_dict.items():
try:
setattr(assoc, k, v)
@@ -716,8 +738,6 @@ class BasicSubmission(BaseClass):
case str():
try:
logger.info(f"Recruiting: {cls}")
# model = [item for item in cls.__subclasses__() if
# item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(
@@ -823,8 +843,8 @@ class BasicSubmission(BaseClass):
Workbook: Updated workbook
"""
logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} autofill")
logger.debug(f"Input dict: {info}")
logger.debug(f"Custom fields: {custom_fields}")
# logger.debug(f"Input dict: {info}")
# logger.debug(f"Custom fields: {custom_fields}")
for k, v in custom_fields.items():
try:
assert v['type'] in ['exempt', 'range', 'cell']
@@ -878,15 +898,9 @@ class BasicSubmission(BaseClass):
from backend.validators import RSLNamer
# logger.debug(f"instr coming into {cls}: {instr}")
logger.debug(f"data coming into {cls}: {data}")
# defaults = cls.get_default_info("abbreviation", "submission_type")
if "submission_type" not in data.keys():
data['submission_type'] = cls.__mapper_args__['polymorphic_identity']
data['abbreviation'] = cls.get_default_info("abbreviation", submission_type=data['submission_type'])
# logger.debug(f"Default info: {defaults}")
# data['abbreviation'] = defaults['abbreviation']
# if 'submission_type' not in data.keys() or data['submission_type'] in [None, ""]:
# data['submission_type'] = defaults['submission_type']
if instr in [None, ""]:
# logger.debug("Sending to RSLNamer to make new plate name.")
outstr = RSLNamer.construct_new_plate_name(data=data)
@@ -1386,8 +1400,8 @@ class BacterialCulture(BasicSubmission):
new_lot = matched.group()
try:
pos_control_reg = \
[reg for reg in input_dict['reagents'] if reg['role'] == "Bacterial-Positive Control"][0]
except IndexError:
next(reg for reg in input_dict['reagents'] if reg['role'] == "Bacterial-Positive Control")
except StopIteration:
logger.error(f"No positive control reagent listed")
return input_dict
pos_control_reg['lot'] = new_lot
@@ -1615,6 +1629,7 @@ class Wastewater(BasicSubmission):
events['Link PCR'] = self.link_pcr
return events
@report_result
def link_pcr(self, obj):
"""
Adds PCR info to this submission
@@ -1624,7 +1639,11 @@ class Wastewater(BasicSubmission):
"""
from backend.excel import PCRParser
from frontend.widgets import select_open_file
report = Report()
fname = select_open_file(obj=obj, file_extension="xlsx")
if not fname:
report.add_result(Result(msg="No file selected, cancelling.", status="Warning"))
return report
parser = PCRParser(filepath=fname)
self.set_attribute("pcr_info", parser.pcr)
self.save(original=False)
@@ -1633,8 +1652,9 @@ class Wastewater(BasicSubmission):
for sample in self.samples:
# logger.debug(f"Running update on: {sample}")
try:
sample_dict = [item for item in parser.samples if item['sample'] == sample.rsl_number][0]
except IndexError:
# sample_dict = [item for item in parser.samples if item['sample'] == sample.rsl_number][0]
sample_dict = next(item for item in parser.samples if item['sample'] == sample.rsl_number)
except StopIteration:
continue
self.update_subsampassoc(sample=sample, input_dict=sample_dict)
# self.report.add_result(Result(msg=f"We added PCR info to {sub.rsl_plate_num}.", status='Information'))
@@ -2119,7 +2139,11 @@ class WastewaterArtic(BasicSubmission):
"""
from frontend.widgets.gel_checker import GelBox
from frontend.widgets import select_open_file
report = Report()
fname = select_open_file(obj=obj, file_extension="jpg")
if not fname:
report.add_result(Result(msg="No file selected, cancelling.", status="Warning"))
return report
dlg = GelBox(parent=obj, img_path=fname, submission=self)
if dlg.exec():
self.dna_core_submission_number, self.gel_barcode, img_path, output, comment = dlg.parse_form()
@@ -2283,8 +2307,9 @@ class BasicSample(BaseClass):
polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity is not None:
try:
return [item for item in cls.__subclasses__() if
item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
# return [item for item in cls.__subclasses__() if
# item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
model = cls

View File

@@ -541,28 +541,30 @@ class DocxWriter(object):
@classmethod
def create_plate_map(self, sample_list: List[dict], rows: int = 0, columns: int = 0) -> List[list]:
sample_list = sorted(sample_list, key=itemgetter('column', 'row'))
# NOTE if rows or samples is default, set to maximum value in sample list
if rows == 0:
rows = max([sample['row'] for sample in sample_list])
if columns == 0:
columns = max([sample['column'] for sample in sample_list])
# output = []
for row in range(0, rows):
contents = [''] * columns
for column in range(0, columns):
try:
ooi = [item for item in sample_list if item['row'] == row + 1 and item['column'] == column + 1][0]
except IndexError:
continue
contents[column] = ooi['submitter_id']
# contents = [sample['submitter_id'] for sample in sample_list if sample['row'] == row + 1]
# contents = [f"{sample['row']},{sample['column']}" for sample in sample_list if sample['row'] == row + 1]
if len(contents) < columns:
contents += [''] * (columns - len(contents))
if not contents:
contents = [''] * columns
# NOTE: Create a list with length equal to columns length
# contents = [''] * columns
contents = [next((item['submitter_id'] for item in sample_list if item['row'] == row + 1 and
item['column'] == column + 1), '') for column in range(0, columns)]
# for column in range(0, columns):
# contents[column] = next((item['submitter_id'] for item in sample_list if item['row'] == row + 1 and item['column'] == column), '')
# try:
# # ooi = [item for item in sample_list if item['row'] == row + 1 and item['column'] == column + 1][0]
# ooi = next(item for item in sample_list if item['row'] == row + 1 and item['column'] == column)
# except StopIteration:
# continue
# contents[column] = ooi['submitter_id']
# NOTE: Pad length of contents to reflect columns
# if len(contents) < columns:
# contents += [''] * (columns - len(contents))
# if not contents:
# contents = [''] * columns
yield contents
# output.append(contents)
# return output
def create_merged_template(self, *args) -> BytesIO:
"""

View File

@@ -180,7 +180,11 @@ class ControlsViewer(QWidget):
safe = ['name', 'submitted_date', 'genus', 'target']
for column in df.columns:
if "percent" in column:
count_col = [item for item in df.columns if "count" in item][0]
# count_col = [item for item in df.columns if "count" in item][0]
try:
count_col = next(item for item in df.columns if "count" in item)
except StopIteration:
continue
# NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating.
df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
if column not in safe:

View File

@@ -129,7 +129,8 @@ class RoleComboBox(QWidget):
"""
equip = self.box.currentText()
# logger.debug(f"Updating equipment: {equip}")
equip2 = [item for item in self.role.equipment if item.name == equip][0]
# equip2 = [item for item in self.role.equipment if item.name == equip][0]
equip2 = next((item for item in self.role.equipment if item.name == equip), self.role.equipment[0])
# logger.debug(f"Using: {equip2}")
self.process.clear()
self.process.addItems([item for item in equip2.processes if item in self.role.processes])

View File

@@ -1,10 +1,10 @@
'''
functions used by all windows in the application's frontend
NOTE: Depreciated. Moved to functions.__init__
'''
from pathlib import Path
import logging
from PyQt6.QtWidgets import QMainWindow, QFileDialog
from tools import Result
logger = logging.getLogger(f"submissions.{__name__}")

View File

@@ -146,8 +146,9 @@ class ControlsForm(QWidget):
for le in self.findChildren(QComboBox):
label = [item.strip() for item in le.objectName().split(" : ")]
try:
dicto = [item for item in output if item['name'] == label[0]][0]
except IndexError:
# dicto = [item for item in output if item['name'] == label[0]][0]
dicto = next(item for item in output if item['name'] == label[0])
except StopIteration:
dicto = dict(name=label[0], values=[])
dicto['values'].append(dict(name=label[1], value=le.currentText()))
if label[0] not in [item['name'] for item in output]: