mid code cleanup

This commit is contained in:
lwark
2024-11-25 15:11:32 -06:00
parent 7f0b7feb5d
commit b45a125c51
5 changed files with 55 additions and 210 deletions

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import sys, logging
import pandas as pd
from sqlalchemy import Column, INTEGER, String, JSON, event, inspect
from sqlalchemy import Column, INTEGER, String, JSON, inspect
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.exc import ArgumentError
@@ -23,11 +23,9 @@ logger = logging.getLogger(f"submissions.{__name__}")
class LogMixin(Base):
__abstract__ = True
class BaseClass(Base):
"""
Abstract class to pass ctx values to all SQLAlchemy objects.
@@ -111,15 +109,27 @@ class BaseClass(Base):
return cls
if " " in name:
search = name.title().replace(" ", "")
else:
search = name
logger.debug(f"Searching for subclass: {search}")
return next((item for item in cls.__subclasses__() if item.__name__ == search), cls)
@classmethod
def fuzzy_search(cls, **kwargs):
def fuzzy_search(cls, **kwargs) -> List[Any]:
"""
Uses approximation of fields to get list of query results.
Args:
**kwargs ():
Returns:
List[Any]: Results of sqlalchemy query.
"""
query: Query = cls.__database_session__.query(cls)
# logger.debug(f"Queried model. Now running searches in {kwargs}")
for k, v in kwargs.items():
# logger.debug(f"Running fuzzy search for attribute: {k} with value {v}")
# NOTE: Not sure why this is necessary, but it is.
search = f"%{v}%"
try:
attr = getattr(cls, k)
@@ -130,8 +140,17 @@ class BaseClass(Base):
return query.limit(50).all()
@classmethod
def results_to_df(cls, objects: list, **kwargs):
records = [object.to_sub_dict(**kwargs) for object in objects]
def results_to_df(cls, objects: list, **kwargs) -> pd.DataFrame:
"""
Args:
objects (list): Objects to be converted to dataframe.
**kwargs (): Arguments necessary for the to_sub_dict method. eg extraction_kit=X
Returns:
pd.Dataframe
"""
records = [obj.to_sub_dict(**kwargs) for obj in objects]
return pd.DataFrame.from_records(records)
@classmethod
@@ -203,8 +222,6 @@ class BaseClass(Base):
return report
class ConfigItem(BaseClass):
"""
Key:JSON objects to store config settings in database.

View File

@@ -156,7 +156,8 @@ class Control(BaseClass):
Lookup control objects in the database based on a number of parameters.
Args:
submission_type (str | None, optional): Control archetype. Defaults to None.
submission_type (str | None, optional): Submission type associated with control. Defaults to None.
subtype (str | None, optional): Control subtype, eg IridaControl. Defaults to None.
start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None.
end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None.
name (str | None, optional): Name of control. Defaults to None.
@@ -198,7 +199,6 @@ class Control(BaseClass):
end_date = date.today()
if end_date is not None and start_date is None:
logger.warning(f"End date with no start date, using 90 days ago.")
# start_date = date(2023, 1, 1)
start_date = date.today() - timedelta(days=90)
if start_date is not None:
match start_date:
@@ -249,8 +249,6 @@ class Control(BaseClass):
"""
if isinstance(polymorphic_identity, dict):
polymorphic_identity = polymorphic_identity['value']
# if isinstance(polymorphic_identity, ControlType):
# polymorphic_identity = polymorphic_identity.name
model = cls
match polymorphic_identity:
case str():
@@ -304,10 +302,10 @@ class PCRControl(Control):
id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
subtype = Column(String(16)) #: PC or NC
target = Column(String(16)) #: N1, N2, etc.
ct = Column(FLOAT)
ct = Column(FLOAT) #: PCR result
reagent_lot = Column(String(64), ForeignKey("_reagent.name", ondelete="SET NULL",
name="fk_reagent_lot"))
reagent = relationship("Reagent", foreign_keys=reagent_lot)
reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control
__mapper_args__ = dict(polymorphic_identity="PCR Control",
polymorphic_load="inline",
@@ -323,83 +321,6 @@ class PCRControl(Control):
return dict(name=self.name, ct=self.ct, subtype=self.subtype, target=self.target, reagent_lot=self.reagent_lot,
submitted_date=self.submitted_date.date())
# @classmethod
# @setup_lookup
# def query(cls,
# submission_type: str | None = None,
# start_date: date | str | int | None = None,
# end_date: date | str | int | None = None,
# name: str | None = None,
# limit: int = 0
# ) -> Control | List[Control]:
# """
# Lookup control objects in the database based on a number of parameters.
#
# Args:
# submission_type (str | None, optional): Control archetype. Defaults to None.
# start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None.
# end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None.
# control_name (str | None, optional): Name of control. Defaults to None.
# limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
#
# Returns:
# PCRControl|List[PCRControl]: Control object of interest.
# """
# from backend.db import SubmissionType
# query: Query = cls.__database_session__.query(cls)
# # NOTE: by date range
# if start_date is not None and end_date is None:
# logger.warning(f"Start date with no end date, using today.")
# end_date = date.today()
# if end_date is not None and start_date is None:
# logger.warning(f"End date with no start date, using 90 days ago.")
# # start_date = date(2023, 1, 1)
# start_date = date.today() - timedelta(days=90)
# if start_date is not None:
# match start_date:
# case date():
# # logger.debug(f"Lookup control by start date({start_date})")
# start_date = start_date.strftime("%Y-%m-%d")
# case int():
# # logger.debug(f"Lookup control by ordinal start date {start_date}")
# start_date = datetime.fromordinal(
# datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
# case _:
# # logger.debug(f"Lookup control with parsed start date {start_date}")
# start_date = parse(start_date).strftime("%Y-%m-%d")
# match end_date:
# case date():
# # logger.debug(f"Lookup control by end date({end_date})")
# end_date = end_date.strftime("%Y-%m-%d")
# case int():
# # logger.debug(f"Lookup control by ordinal end date {end_date}")
# end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime(
# "%Y-%m-%d")
# case _:
# # logger.debug(f"Lookup control with parsed end date {end_date}")
# end_date = parse(end_date).strftime("%Y-%m-%d")
# # logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
# query = query.filter(cls.submitted_date.between(start_date, end_date))
# match submission_type:
# case str():
# from backend import BasicSubmission, SubmissionType
# # logger.debug(f"Lookup controls by SubmissionType str: {submission_type}")
# query = query.join(BasicSubmission).join(SubmissionType).filter(SubmissionType.name == submission_type)
# case SubmissionType():
# from backend import BasicSubmission
# # logger.debug(f"Lookup controls by SubmissionType: {submission_type}")
# query = query.join(BasicSubmission).filter(BasicSubmission.submission_type_name==submission_type.name)
# case _:
# pass
# match control_name:
# case str():
# # logger.debug(f"Lookup control by name {control_name}")
# query = query.filter(cls.name.startswith(control_name))
# limit = 1
# case _:
# pass
# return cls.execute_query(query=query, limit=limit)
@classmethod
@report_result
def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]:
@@ -418,7 +339,7 @@ class PCRControl(Control):
parent.mode_typer.clear()
parent.mode_typer.setEnabled(False)
report = Report()
logger.debug(f"Chart settings: {pformat(chart_settings)}")
# logger.debug(f"Chart settings: {pformat(chart_settings)}")
controls = cls.query(submission_type=chart_settings['sub_type'], start_date=chart_settings['start_date'],
end_date=chart_settings['end_date'])
data = [control.to_sub_dict() for control in controls]
@@ -569,77 +490,6 @@ class IridaControl(Control):
cols = []
return cols
# @classmethod
# @setup_lookup
# def query(cls,
# sub_type: str | None = None,
# start_date: date | str | int | None = None,
# end_date: date | str | int | None = None,
# control_name: str | None = None,
# limit: int = 0
# ) -> Control | List[Control]:
# """
# Lookup control objects in the database based on a number of parameters.
#
# Args:
# sub_type (models.ControlType | str | None, optional): Control archetype. Defaults to None.
# start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None.
# end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None.
# control_name (str | None, optional): Name of control. Defaults to None.
# limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
#
# Returns:
# models.Control|List[models.Control]: Control object of interest.
# """
# query: Query = cls.__database_session__.query(cls)
# # NOTE: by control type
# match sub_type:
# case str():
# query = query.filter(cls.subtype == sub_type)
# case _:
# pass
# # NOTE: If one date exists, we need the other one to exist as well.
# if start_date is not None and end_date is None:
# logger.warning(f"Start date with no end date, using today.")
# end_date = date.today()
# if end_date is not None and start_date is None:
# logger.warning(f"End date with no start date, using 90 days ago.")
# # start_date = date(2023, 1, 1)
# start_date = date.today() - timedelta(days=90)
# if start_date is not None:
# match start_date:
# case date():
# # logger.debug(f"Lookup control by start date({start_date})")
# start_date = start_date.strftime("%Y-%m-%d")
# case int():
# # logger.debug(f"Lookup control by ordinal start date {start_date}")
# start_date = datetime.fromordinal(
# datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d")
# case _:
# # logger.debug(f"Lookup control with parsed start date {start_date}")
# start_date = parse(start_date).strftime("%Y-%m-%d")
# match end_date:
# case date():
# # logger.debug(f"Lookup control by end date({end_date})")
# end_date = end_date.strftime("%Y-%m-%d")
# case int():
# # logger.debug(f"Lookup control by ordinal end date {end_date}")
# end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime(
# "%Y-%m-%d")
# case _:
# # logger.debug(f"Lookup control with parsed end date {end_date}")
# end_date = parse(end_date).strftime("%Y-%m-%d")
# # logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
# query = query.filter(cls.submitted_date.between(start_date, end_date))
# match control_name:
# case str():
# # logger.debug(f"Lookup control by name {control_name}")
# query = query.filter(cls.name.startswith(control_name))
# limit = 1
# case _:
# pass
# return cls.execute_query(query=query, limit=limit)
@classmethod
def make_parent_buttons(cls, parent: QWidget) -> None:
"""
@@ -650,7 +500,7 @@ class IridaControl(Control):
"""
super().make_parent_buttons(parent=parent)
rows = parent.layout.rowCount()
logger.debug(f"Parent rows: {rows}")
# logger.debug(f"Parent rows: {rows}")
checker = QCheckBox(parent)
checker.setChecked(True)
checker.setObjectName("irida_check")

View File

@@ -274,7 +274,8 @@ class KitType(BaseClass):
for kk, vv in assoc.to_export_dict().items():
v[kk] = vv
base_dict['reagent roles'].append(v)
for k, v in submission_type.construct_equipment_map():
# for k, v in submission_type.construct_equipment_map():
for k, v in submission_type.contstruct_field_map("equipment"):
try:
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k)
@@ -485,8 +486,6 @@ class Reagent(BaseClass):
output['editable'] = ['lot', 'expiry']
return output
def update_last_used(self, kit: KitType) -> Report:
"""
Updates last used reagent lot for ReagentType/KitType
@@ -591,7 +590,7 @@ class Reagent(BaseClass):
continue
case _:
field_value = value
logger.debug(f"Setting reagent {key} to {field_value}")
# logger.debug(f"Setting reagent {key} to {field_value}")
self.__setattr__(key, field_value)
self.save()
@@ -793,32 +792,12 @@ class SubmissionType(BaseClass):
"""
return self.sample_map
def construct_equipment_map(self) -> Generator[(str, dict), None, None]:
"""
Constructs map of equipment to excel cells.
Returns:
Generator[(str, dict), None, None]: Map equipment locations in excel sheet
"""
# logger.debug("Iterating through equipment roles")
for item in self.submissiontype_equipmentrole_associations:
emap = item.uses
if emap is None:
emap = {}
yield item.equipment_role.name, emap
def construct_tips_map(self) -> Generator[(str, dict), None, None]:
"""
Constructs map of tips to excel cells.
Returns:
Generator[(str, dict), None, None]: Tip locations in the excel sheet.
"""
for item in self.submissiontype_tiprole_associations:
tmap = item.uses
if tmap is None:
tmap = {}
yield item.tip_role.name, tmap
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
for item in self.__getattribute__(f"submissiontype_{field}role_associations"):
fmap = item.uses
if fmap is None:
fmap = {}
yield getattr(item, f"{field}_role"), fmap
def get_default_kit(self) -> KitType | None:
if len(self.kit_types) == 1:
@@ -953,7 +932,7 @@ class SubmissionType(BaseClass):
import_dict = yaml.load(stream=f, Loader=yaml.Loader)
else:
raise Exception(f"Filetype {filepath.suffix} not supported.")
logger.debug(pformat(import_dict))
# logger.debug(pformat(import_dict))
try:
submission_type = cls.query(name=import_dict['name'])
except KeyError:
@@ -1009,7 +988,8 @@ class SubmissionType(BaseClass):
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
equipment_role=new_role)
try:
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'], static=role['static'])
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'],
static=role['static'])
except KeyError:
uses = None
ster_assoc.uses = uses
@@ -1236,13 +1216,12 @@ class KitTypeReagentRoleAssociation(BaseClass):
Returns:
dict: dictionary of Association and related reagent role
"""
base_dict = {}
base_dict['required'] = self.required
base_dict = dict(required=self.required)
for k, v in self.reagent_role.to_export_dict().items():
base_dict[k] = v
return base_dict
def get_all_relevant_reagents(self, override:Reagent|None=None) -> Generator[Reagent, None, None]:
def get_all_relevant_reagents(self) -> Generator[Reagent, None, None]:
"""
Creates a generator that will resolve in to a list filling the role associated with this object.
@@ -1354,6 +1333,7 @@ class SubmissionReagentAssociation(BaseClass):
from backend.validators import PydReagent
return PydReagent(**self.to_sub_dict(extraction_kit=extraction_kit))
class Equipment(BaseClass):
"""
A concrete instance of equipment
@@ -1400,8 +1380,6 @@ class Equipment(BaseClass):
else:
return {k: v for k, v in self.__dict__.items()}
def get_processes(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None) -> List[str]:
"""
Get all processes associated with this Equipment for a given SubmissionType
@@ -1690,7 +1668,6 @@ class SubmissionEquipmentAssociation(BaseClass):
equipment = relationship(Equipment, back_populates="equipment_submission_associations") #: associated equipment
def __repr__(self) -> str:
return f"<SubmissionEquipmentAssociation({self.submission.rsl_plate_num} & {self.equipment.name})>"

View File

@@ -564,7 +564,8 @@ class EquipmentParser(object):
Returns:
List[dict]: List of locations
"""
return {k: v for k, v in self.submission_type.construct_equipment_map()}
# return {k: v for k, v in self.submission_type.construct_equipment_map()}
return {k: v for k, v in self.submission_type.construct_field_map("equipment")}
def get_asset_number(self, input: str) -> str:
"""

View File

@@ -342,7 +342,7 @@ class EquipmentWriter(object):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
equipment_map = {k: v for k, v in self.submission_type.construct_equipment_map()}
equipment_map = {k: v for k, v in self.submission_type.construct_field_map("equipment")}
self.equipment = self.reconcile_map(equipment_list=equipment_list, equipment_map=equipment_map)
def reconcile_map(self, equipment_list: list, equipment_map: dict) -> Generator[dict, None, None]: