From b45a125c51a8371f2a8d7310de267222d82f4b48 Mon Sep 17 00:00:00 2001 From: lwark Date: Mon, 25 Nov 2024 15:11:32 -0600 Subject: [PATCH] mid code cleanup --- src/submissions/backend/db/models/__init__.py | 33 +++- src/submissions/backend/db/models/controls.py | 162 +----------------- src/submissions/backend/db/models/kits.py | 65 +++---- src/submissions/backend/excel/parser.py | 3 +- src/submissions/backend/excel/writer.py | 2 +- 5 files changed, 55 insertions(+), 210 deletions(-) diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 5e4c218..2967fe0 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -5,7 +5,7 @@ from __future__ import annotations import sys, logging import pandas as pd -from sqlalchemy import Column, INTEGER, String, JSON, event, inspect +from sqlalchemy import Column, INTEGER, String, JSON, inspect from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.exc import ArgumentError @@ -23,11 +23,9 @@ logger = logging.getLogger(f"submissions.{__name__}") class LogMixin(Base): - __abstract__ = True - class BaseClass(Base): """ Abstract class to pass ctx values to all SQLAlchemy objects. @@ -111,15 +109,27 @@ class BaseClass(Base): return cls if " " in name: search = name.title().replace(" ", "") + else: + search = name logger.debug(f"Searching for subclass: {search}") return next((item for item in cls.__subclasses__() if item.__name__ == search), cls) @classmethod - def fuzzy_search(cls, **kwargs): + def fuzzy_search(cls, **kwargs) -> List[Any]: + """ + Uses approximation of fields to get list of query results. + + Args: + **kwargs (): + + Returns: + List[Any]: Results of sqlalchemy query. + """ query: Query = cls.__database_session__.query(cls) # logger.debug(f"Queried model. Now running searches in {kwargs}") for k, v in kwargs.items(): # logger.debug(f"Running fuzzy search for attribute: {k} with value {v}") + # NOTE: Not sure why this is necessary, but it is. search = f"%{v}%" try: attr = getattr(cls, k) @@ -130,8 +140,17 @@ class BaseClass(Base): return query.limit(50).all() @classmethod - def results_to_df(cls, objects: list, **kwargs): - records = [object.to_sub_dict(**kwargs) for object in objects] + def results_to_df(cls, objects: list, **kwargs) -> pd.DataFrame: + """ + + Args: + objects (list): Objects to be converted to dataframe. + **kwargs (): Arguments necessary for the to_sub_dict method. eg extraction_kit=X + + Returns: + pd.Dataframe + """ + records = [obj.to_sub_dict(**kwargs) for obj in objects] return pd.DataFrame.from_records(records) @classmethod @@ -203,8 +222,6 @@ class BaseClass(Base): return report - - class ConfigItem(BaseClass): """ Key:JSON objects to store config settings in database. diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index 972c19f..ce9c104 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -156,7 +156,8 @@ class Control(BaseClass): Lookup control objects in the database based on a number of parameters. Args: - submission_type (str | None, optional): Control archetype. Defaults to None. + submission_type (str | None, optional): Submission type associated with control. Defaults to None. + subtype (str | None, optional): Control subtype, eg IridaControl. Defaults to None. start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None. end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None. name (str | None, optional): Name of control. Defaults to None. @@ -198,7 +199,6 @@ class Control(BaseClass): end_date = date.today() if end_date is not None and start_date is None: logger.warning(f"End date with no start date, using 90 days ago.") - # start_date = date(2023, 1, 1) start_date = date.today() - timedelta(days=90) if start_date is not None: match start_date: @@ -249,8 +249,6 @@ class Control(BaseClass): """ if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] - # if isinstance(polymorphic_identity, ControlType): - # polymorphic_identity = polymorphic_identity.name model = cls match polymorphic_identity: case str(): @@ -304,10 +302,10 @@ class PCRControl(Control): id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True) subtype = Column(String(16)) #: PC or NC target = Column(String(16)) #: N1, N2, etc. - ct = Column(FLOAT) + ct = Column(FLOAT) #: PCR result reagent_lot = Column(String(64), ForeignKey("_reagent.name", ondelete="SET NULL", name="fk_reagent_lot")) - reagent = relationship("Reagent", foreign_keys=reagent_lot) + reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control __mapper_args__ = dict(polymorphic_identity="PCR Control", polymorphic_load="inline", @@ -323,83 +321,6 @@ class PCRControl(Control): return dict(name=self.name, ct=self.ct, subtype=self.subtype, target=self.target, reagent_lot=self.reagent_lot, submitted_date=self.submitted_date.date()) - # @classmethod - # @setup_lookup - # def query(cls, - # submission_type: str | None = None, - # start_date: date | str | int | None = None, - # end_date: date | str | int | None = None, - # name: str | None = None, - # limit: int = 0 - # ) -> Control | List[Control]: - # """ - # Lookup control objects in the database based on a number of parameters. - # - # Args: - # submission_type (str | None, optional): Control archetype. Defaults to None. - # start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None. - # end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None. - # control_name (str | None, optional): Name of control. Defaults to None. - # limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. - # - # Returns: - # PCRControl|List[PCRControl]: Control object of interest. - # """ - # from backend.db import SubmissionType - # query: Query = cls.__database_session__.query(cls) - # # NOTE: by date range - # if start_date is not None and end_date is None: - # logger.warning(f"Start date with no end date, using today.") - # end_date = date.today() - # if end_date is not None and start_date is None: - # logger.warning(f"End date with no start date, using 90 days ago.") - # # start_date = date(2023, 1, 1) - # start_date = date.today() - timedelta(days=90) - # if start_date is not None: - # match start_date: - # case date(): - # # logger.debug(f"Lookup control by start date({start_date})") - # start_date = start_date.strftime("%Y-%m-%d") - # case int(): - # # logger.debug(f"Lookup control by ordinal start date {start_date}") - # start_date = datetime.fromordinal( - # datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") - # case _: - # # logger.debug(f"Lookup control with parsed start date {start_date}") - # start_date = parse(start_date).strftime("%Y-%m-%d") - # match end_date: - # case date(): - # # logger.debug(f"Lookup control by end date({end_date})") - # end_date = end_date.strftime("%Y-%m-%d") - # case int(): - # # logger.debug(f"Lookup control by ordinal end date {end_date}") - # end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime( - # "%Y-%m-%d") - # case _: - # # logger.debug(f"Lookup control with parsed end date {end_date}") - # end_date = parse(end_date).strftime("%Y-%m-%d") - # # logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") - # query = query.filter(cls.submitted_date.between(start_date, end_date)) - # match submission_type: - # case str(): - # from backend import BasicSubmission, SubmissionType - # # logger.debug(f"Lookup controls by SubmissionType str: {submission_type}") - # query = query.join(BasicSubmission).join(SubmissionType).filter(SubmissionType.name == submission_type) - # case SubmissionType(): - # from backend import BasicSubmission - # # logger.debug(f"Lookup controls by SubmissionType: {submission_type}") - # query = query.join(BasicSubmission).filter(BasicSubmission.submission_type_name==submission_type.name) - # case _: - # pass - # match control_name: - # case str(): - # # logger.debug(f"Lookup control by name {control_name}") - # query = query.filter(cls.name.startswith(control_name)) - # limit = 1 - # case _: - # pass - # return cls.execute_query(query=query, limit=limit) - @classmethod @report_result def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]: @@ -418,7 +339,7 @@ class PCRControl(Control): parent.mode_typer.clear() parent.mode_typer.setEnabled(False) report = Report() - logger.debug(f"Chart settings: {pformat(chart_settings)}") + # logger.debug(f"Chart settings: {pformat(chart_settings)}") controls = cls.query(submission_type=chart_settings['sub_type'], start_date=chart_settings['start_date'], end_date=chart_settings['end_date']) data = [control.to_sub_dict() for control in controls] @@ -569,77 +490,6 @@ class IridaControl(Control): cols = [] return cols - # @classmethod - # @setup_lookup - # def query(cls, - # sub_type: str | None = None, - # start_date: date | str | int | None = None, - # end_date: date | str | int | None = None, - # control_name: str | None = None, - # limit: int = 0 - # ) -> Control | List[Control]: - # """ - # Lookup control objects in the database based on a number of parameters. - # - # Args: - # sub_type (models.ControlType | str | None, optional): Control archetype. Defaults to None. - # start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None. - # end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None. - # control_name (str | None, optional): Name of control. Defaults to None. - # limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. - # - # Returns: - # models.Control|List[models.Control]: Control object of interest. - # """ - # query: Query = cls.__database_session__.query(cls) - # # NOTE: by control type - # match sub_type: - # case str(): - # query = query.filter(cls.subtype == sub_type) - # case _: - # pass - # # NOTE: If one date exists, we need the other one to exist as well. - # if start_date is not None and end_date is None: - # logger.warning(f"Start date with no end date, using today.") - # end_date = date.today() - # if end_date is not None and start_date is None: - # logger.warning(f"End date with no start date, using 90 days ago.") - # # start_date = date(2023, 1, 1) - # start_date = date.today() - timedelta(days=90) - # if start_date is not None: - # match start_date: - # case date(): - # # logger.debug(f"Lookup control by start date({start_date})") - # start_date = start_date.strftime("%Y-%m-%d") - # case int(): - # # logger.debug(f"Lookup control by ordinal start date {start_date}") - # start_date = datetime.fromordinal( - # datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") - # case _: - # # logger.debug(f"Lookup control with parsed start date {start_date}") - # start_date = parse(start_date).strftime("%Y-%m-%d") - # match end_date: - # case date(): - # # logger.debug(f"Lookup control by end date({end_date})") - # end_date = end_date.strftime("%Y-%m-%d") - # case int(): - # # logger.debug(f"Lookup control by ordinal end date {end_date}") - # end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime( - # "%Y-%m-%d") - # case _: - # # logger.debug(f"Lookup control with parsed end date {end_date}") - # end_date = parse(end_date).strftime("%Y-%m-%d") - # # logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") - # query = query.filter(cls.submitted_date.between(start_date, end_date)) - # match control_name: - # case str(): - # # logger.debug(f"Lookup control by name {control_name}") - # query = query.filter(cls.name.startswith(control_name)) - # limit = 1 - # case _: - # pass - # return cls.execute_query(query=query, limit=limit) - @classmethod def make_parent_buttons(cls, parent: QWidget) -> None: """ @@ -650,7 +500,7 @@ class IridaControl(Control): """ super().make_parent_buttons(parent=parent) rows = parent.layout.rowCount() - logger.debug(f"Parent rows: {rows}") + # logger.debug(f"Parent rows: {rows}") checker = QCheckBox(parent) checker.setChecked(True) checker.setObjectName("irida_check") diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 5f649d8..90a0cd9 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -261,7 +261,7 @@ class KitType(BaseClass): Returns: dict: Dictionary containing relevant info for SubmissionType construction - """ + """ base_dict = dict(name=self.name) base_dict['reagent roles'] = [] base_dict['equipment roles'] = [] @@ -274,7 +274,8 @@ class KitType(BaseClass): for kk, vv in assoc.to_export_dict().items(): v[kk] = vv base_dict['reagent roles'].append(v) - for k, v in submission_type.construct_equipment_map(): + # for k, v in submission_type.construct_equipment_map(): + for k, v in submission_type.contstruct_field_map("equipment"): try: assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if item.equipment_role.name == k) @@ -392,7 +393,7 @@ class ReagentRole(BaseClass): Returns: dict: Dictionary containing relevant info for SubmissionType construction - """ + """ return dict(role=self.name, extension_of_life=self.eol_ext.days) @check_authorization @@ -485,8 +486,6 @@ class Reagent(BaseClass): output['editable'] = ['lot', 'expiry'] return output - - def update_last_used(self, kit: KitType) -> Report: """ Updates last used reagent lot for ReagentType/KitType @@ -591,7 +590,7 @@ class Reagent(BaseClass): continue case _: field_value = value - logger.debug(f"Setting reagent {key} to {field_value}") + # logger.debug(f"Setting reagent {key} to {field_value}") self.__setattr__(key, field_value) self.save() @@ -710,7 +709,7 @@ class SubmissionType(BaseClass): "SubmissionTypeTipRoleAssociation", back_populates="submission_type", cascade="all, delete-orphan" - ) #: Association of tiproles + ) #: Association of tiproles def __repr__(self) -> str: """ @@ -793,32 +792,12 @@ class SubmissionType(BaseClass): """ return self.sample_map - def construct_equipment_map(self) -> Generator[(str, dict), None, None]: - """ - Constructs map of equipment to excel cells. - - Returns: - Generator[(str, dict), None, None]: Map equipment locations in excel sheet - """ - # logger.debug("Iterating through equipment roles") - for item in self.submissiontype_equipmentrole_associations: - emap = item.uses - if emap is None: - emap = {} - yield item.equipment_role.name, emap - - def construct_tips_map(self) -> Generator[(str, dict), None, None]: - """ - Constructs map of tips to excel cells. - - Returns: - Generator[(str, dict), None, None]: Tip locations in the excel sheet. - """ - for item in self.submissiontype_tiprole_associations: - tmap = item.uses - if tmap is None: - tmap = {} - yield item.tip_role.name, tmap + def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: + for item in self.__getattribute__(f"submissiontype_{field}role_associations"): + fmap = item.uses + if fmap is None: + fmap = {} + yield getattr(item, f"{field}_role"), fmap def get_default_kit(self) -> KitType | None: if len(self.kit_types) == 1: @@ -912,7 +891,7 @@ class SubmissionType(BaseClass): Returns: dict: Dictionary containing relevant info for SubmissionType construction - """ + """ base_dict = dict(name=self.name) base_dict['info'] = self.construct_info_map(mode='export') base_dict['defaults'] = self.defaults @@ -953,7 +932,7 @@ class SubmissionType(BaseClass): import_dict = yaml.load(stream=f, Loader=yaml.Loader) else: raise Exception(f"Filetype {filepath.suffix} not supported.") - logger.debug(pformat(import_dict)) + # logger.debug(pformat(import_dict)) try: submission_type = cls.query(name=import_dict['name']) except KeyError: @@ -1009,7 +988,8 @@ class SubmissionType(BaseClass): ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type, equipment_role=new_role) try: - uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'], static=role['static']) + uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'], + static=role['static']) except KeyError: uses = None ster_assoc.uses = uses @@ -1236,13 +1216,12 @@ class KitTypeReagentRoleAssociation(BaseClass): Returns: dict: dictionary of Association and related reagent role """ - base_dict = {} - base_dict['required'] = self.required + base_dict = dict(required=self.required) for k, v in self.reagent_role.to_export_dict().items(): base_dict[k] = v return base_dict - def get_all_relevant_reagents(self, override:Reagent|None=None) -> Generator[Reagent, None, None]: + def get_all_relevant_reagents(self) -> Generator[Reagent, None, None]: """ Creates a generator that will resolve in to a list filling the role associated with this object. @@ -1354,6 +1333,7 @@ class SubmissionReagentAssociation(BaseClass): from backend.validators import PydReagent return PydReagent(**self.to_sub_dict(extraction_kit=extraction_kit)) + class Equipment(BaseClass): """ A concrete instance of equipment @@ -1400,8 +1380,6 @@ class Equipment(BaseClass): else: return {k: v for k, v in self.__dict__.items()} - - def get_processes(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None) -> List[str]: """ Get all processes associated with this Equipment for a given SubmissionType @@ -1669,7 +1647,7 @@ class EquipmentRole(BaseClass): """ return dict(role=self.name, processes=self.get_processes(submission_type=submission_type, extraction_kit=kit_type)) - + class SubmissionEquipmentAssociation(BaseClass): """ @@ -1690,7 +1668,6 @@ class SubmissionEquipmentAssociation(BaseClass): equipment = relationship(Equipment, back_populates="equipment_submission_associations") #: associated equipment - def __repr__(self) -> str: return f"" @@ -1805,7 +1782,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): Returns: dict: Dictionary containing relevant info for SubmissionType construction - """ + """ base_dict = {k: v for k, v in self.equipment_role.to_export_dict(submission_type=self.submission_type, kit_type=extraction_kit).items()} base_dict['static'] = self.static diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 66c6b60..2568fdc 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -564,7 +564,8 @@ class EquipmentParser(object): Returns: List[dict]: List of locations """ - return {k: v for k, v in self.submission_type.construct_equipment_map()} + # return {k: v for k, v in self.submission_type.construct_equipment_map()} + return {k: v for k, v in self.submission_type.construct_field_map("equipment")} def get_asset_number(self, input: str) -> str: """ diff --git a/src/submissions/backend/excel/writer.py b/src/submissions/backend/excel/writer.py index 120d00c..3d744f2 100644 --- a/src/submissions/backend/excel/writer.py +++ b/src/submissions/backend/excel/writer.py @@ -342,7 +342,7 @@ class EquipmentWriter(object): submission_type = SubmissionType.query(name=submission_type) self.submission_type = submission_type self.xl = xl - equipment_map = {k: v for k, v in self.submission_type.construct_equipment_map()} + equipment_map = {k: v for k, v in self.submission_type.construct_field_map("equipment")} self.equipment = self.reconcile_map(equipment_list=equipment_list, equipment_map=equipment_map) def reconcile_map(self, equipment_list: list, equipment_map: dict) -> Generator[dict, None, None]: