From fcda0d873c4fb9f2494a37d4325198930cd5af5e Mon Sep 17 00:00:00 2001 From: lwark Date: Wed, 3 Sep 2025 14:04:26 -0500 Subject: [PATCH] Code cleanup for db.models complete. --- src/submissions/backend/db/__init__.py | 4 - src/submissions/backend/db/models/__init__.py | 61 +- src/submissions/backend/db/models/audit.py | 8 +- src/submissions/backend/db/models/controls.py | 469 +----- .../backend/db/models/organizations.py | 23 +- .../backend/db/models/procedures.py | 1431 ++--------------- .../backend/db/models/submissions.py | 401 +---- src/submissions/backend/validators/pydant.py | 6 +- .../frontend/widgets/procedure_creation.py | 5 + 9 files changed, 208 insertions(+), 2200 deletions(-) diff --git a/src/submissions/backend/db/__init__.py b/src/submissions/backend/db/__init__.py index e362802..8d1cd16 100644 --- a/src/submissions/backend/db/__init__.py +++ b/src/submissions/backend/db/__init__.py @@ -22,7 +22,6 @@ def set_sqlite_pragma(dbapi_connection, connection_record): execution_phrase = "PRAGMA foreign_keys=ON" print(f"Executing '{execution_phrase}' in sql.") else: - # print("Nothing to execute, returning") cursor.close() return cursor.execute(execution_phrase) @@ -55,9 +54,6 @@ def update_log(mapper, connection, target): continue added = [str(item) for item in hist.added] # NOTE: Attributes left out to save space - # if attr.key in ['artic_technician', 'clientsubmissionsampleassociation', 'submission_reagent_associations', - # 'submission_equipment_associations', 'submission_tips_associations', 'contact_id', 'gel_info', - # 'gel_controls', 'source_plates']: if attr.key in LogMixin.tracking_exclusion: continue deleted = [str(item) for item in hist.deleted] diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 1bd44f4..ac69862 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -164,8 +164,7 @@ class BaseClass(Base): dict | list | str: Output of key:value dict or single (list, str) desired variable """ # NOTE: singles is a list of fields that need to be limited to 1 result. - singles = list(set(cls.singles + BaseClass.singles)) - return dict(singles=singles) + return dict(singles=list(set(cls.singles + BaseClass.singles))) @classmethod def find_regular_subclass(cls, name: str | None = None) -> Any: @@ -237,10 +236,8 @@ class BaseClass(Base): new = False allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute) or isinstance(v, hybrid_property)] - # and not isinstance(v.property, _RelationshipDeclared)] sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed} outside_kwargs = {k: v for k, v in kwargs.items() if k not in allowed} - logger.debug(f"Sanitized kwargs: {sanitized_kwargs}") instance = cls.query(limit=1, **sanitized_kwargs) if not instance or isinstance(instance, list): instance = cls() @@ -254,10 +251,8 @@ class BaseClass(Base): from backend.validators.pydant import PydBaseClass if issubclass(v.__class__, PydBaseClass): setattr(instance, k, v.to_sql()) - # else: - # logger.error(f"Could not set {k} due to {e}") instance._misc_info.update(outside_kwargs) - # logger.info(f"Instance from query or create: {instance}, new: {new}") + logger.info(f"Instance from query or create: {instance}, new: {new}") return instance, new @classmethod @@ -286,17 +281,10 @@ class BaseClass(Base): Returns: Any | List[Any]: Single result if limit = 1 or List if other. """ - # logger.debug(f"Kwargs: {kwargs}") - # if model is None: - # model = cls - # logger.debug(f"Model: {model}") if query is None: query: Query = cls.__database_session__.query(cls) - # else: - # logger.debug(f"Incoming query: {query}") singles = cls.get_default_info('singles') for k, v in kwargs.items(): - # logger.info(f"Using key: {k} with value: {v} against {cls}") try: attr = getattr(cls, k) except (ArgumentError, AttributeError) as e: @@ -314,7 +302,6 @@ class BaseClass(Base): except ArgumentError: continue else: - # logger.debug("Single item.") try: query = query.filter(attr == v) except ArgumentError: @@ -354,9 +341,6 @@ class BaseClass(Base): try: self.__database_session__.add(self) self.__database_session__.commit() - # except sqlalchemy.exc.IntegrityError as i: - # logger.error(f"Integrity error saving {self} due to: {i}") - # logger.error(pformat(self.__dict__)) except Exception as e: logger.critical(f"Problem saving {self} due to: {e}") logger.error(f"Error message: {type(e)}") @@ -434,7 +418,7 @@ class BaseClass(Base): try: template = env.get_template(temp_name) except TemplateNotFound as e: - # logger.error(f"Couldn't find template {e}") + logger.error(f"Couldn't find template {e}") template = env.get_template("details.html") return template @@ -448,14 +432,11 @@ class BaseClass(Base): Returns: bool: If a single unequivocal value is found will be false, else true. """ - # logger.debug(f"Incoming attributes: {attributes}") for key, value in attributes.items(): if value.lower() == "none": value = None - # logger.debug(f"Attempting to grab attribute: {key}") self_value = getattr(self, key) class_attr = getattr(self.__class__, key) - # logger.debug(f"Self value: {self_value}, class attr: {class_attr} of type: {type(class_attr)}") if isinstance(class_attr, property): filter = "property" else: @@ -475,7 +456,6 @@ class BaseClass(Base): case "property": pass case _RelationshipDeclared(): - # logger.debug(f"Checking {self_value}") try: self_value = self_value.name except AttributeError: @@ -483,18 +463,14 @@ class BaseClass(Base): if class_attr.property.uselist: self_value = self_value.__str__() try: - # logger.debug(f"Check if {self_value.__class__} is subclass of {self.__class__}") check = issubclass(self_value.__class__, self.__class__) except TypeError as e: logger.error(f"Couldn't check if {self_value.__class__} is subclass of {self.__class__} due to {e}") check = False if check: - # logger.debug(f"Checking for subclass name.") self_value = self_value.name - # logger.debug(f"Checking self_value {self_value} of type {type(self_value)} against attribute {value} of type {type(value)}") if self_value != value: output = False - # logger.debug(f"Value {key} is False, returning.") return output return True @@ -502,13 +478,9 @@ class BaseClass(Base): """ Custom dunder method to handle potential list relationship issues. """ - # logger.debug(f"Attempting to set: {key} to {value}") if key.startswith("_"): return super().__setattr__(key, value) - # try: check = not hasattr(self, key) - # except: - # return if check: try: value = json.dumps(value) @@ -524,27 +496,21 @@ class BaseClass(Base): except AttributeError: return super().__setattr__(key, value) if isinstance(field_type, InstrumentedAttribute): - # logger.debug(f"{key} is an InstrumentedAttribute.") match field_type.property: case ColumnProperty(): - # logger.debug(f"Setting ColumnProperty to {value}") return super().__setattr__(key, value) case _RelationshipDeclared(): - # logger.debug(f"{self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}") if field_type.property.uselist: - # logger.debug(f"Setting with uselist") existing = self.__getattribute__(key) # NOTE: This is causing problems with removal of items from lists. Have to overhaul it. if existing is not None: logger.debug(f"{key} Existing: {existing}, incoming: {value}") if isinstance(value, list): - # value = existing + value value = value else: value = existing + [value] else: if isinstance(value, list): - # value = value pass else: value = [value] @@ -552,7 +518,6 @@ class BaseClass(Base): value = list(set(value)) except TypeError: pass - # logger.debug(f"Final value for {key}: {value}") return super().__setattr__(key, value) else: if isinstance(value, list): @@ -608,7 +573,6 @@ class BaseClass(Base): relevant = {k: v for k, v in self.__class__.__dict__.items() if isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)} - # output = OrderedDict() output = dict(excluded=["excluded", "misc_info", "_misc_info", "id"]) for k, v in relevant.items(): try: @@ -621,15 +585,9 @@ class BaseClass(Base): value = getattr(self, k) except AttributeError: continue - # try: - # logger.debug(f"Setting {k} to {value} for details dict.") - # except AttributeError as e: - # logger.error(f"Can't log {k} value due to {type(e)}") - # continue output[k.strip("_")] = value if self._misc_info: for key, value in self._misc_info.items(): - # logger.debug(f"Misc info key {key}") output[key] = value return output @@ -669,28 +627,15 @@ class BaseClass(Base): pyd = getattr(pydant, pyd_model_name) except AttributeError: raise AttributeError(f"Could not get pydantic class {pyd_model_name}") - # logger.debug(f"Kwargs: {kwargs}") - # logger.debug(f"Dict: {pformat(self.details_dict())}") return pyd(**self.details_dict(**kwargs)) def show_details(self, obj): - logger.debug("Show Details") from frontend.widgets.submission_details import SubmissionDetails dlg = SubmissionDetails(parent=obj, sub=self) if dlg.exec(): pass def export(self, obj, output_filepath: str | Path | None = None): - # if not hasattr(self, "template_file"): - # logger.error(f"Export not implemented for {self.__class__.__name__}") - # return - # pyd = self.to_pydantic() - # if not output_filepath: - # from frontend import select_save_file - # output_filepath = select_save_file(obj=obj, default_name=pyd.construct_filename(), extension="xlsx") - # Writer = getattr(writers, f"{self.__class__.__name__}Writer") - # writer = Writer(output_filepath=output_filepath, pydant_obj=pyd, range_dict=self.range_dict) - # workbook = writer from backend import managers Manager = getattr(managers, f"Default{self.__class__.__name__}") manager = Manager(parent=obj, input_object=self) diff --git a/src/submissions/backend/db/models/audit.py b/src/submissions/backend/db/models/audit.py index d31ccca..e24bdf1 100644 --- a/src/submissions/backend/db/models/audit.py +++ b/src/submissions/backend/db/models/audit.py @@ -18,10 +18,10 @@ class AuditLog(Base): __tablename__ = "_auditlog" id = Column(INTEGER, primary_key=True, autoincrement=True) #: primary key - user = Column(String(64)) - time = Column(TIMESTAMP) - object = Column(String(64)) - changes = Column(JSON) + user = Column(String(64)) #: The user who made the change + time = Column(TIMESTAMP) #: When the change was made + object = Column(String(64)) #: What was changed + changes = Column(JSON) #: List of changes that were made def __repr__(self): return f"<{self.object}: {self.user} @ {self.time}>" diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index 923b6ee..919a2c1 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -2,17 +2,13 @@ All control related models. """ from __future__ import annotations -import itertools from pprint import pformat -from PyQt6.QtWidgets import QWidget, QCheckBox, QLabel -from pandas import DataFrame -from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, case, FLOAT -from sqlalchemy.orm import relationship, Query, validates +from PyQt6.QtWidgets import QWidget +from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, case +from sqlalchemy.orm import relationship, Query import logging, re -from operator import itemgetter from . import BaseClass -from tools import setup_lookup, report_result, Result, Report, Settings, get_unique_values_in_df_column, super_splitter, \ - flatten_list, timer +from tools import setup_lookup, Report, Settings, super_splitter from datetime import date, datetime, timedelta from typing import List, Literal, Tuple, Generator from re import Pattern @@ -131,16 +127,6 @@ class Control(BaseClass): procedure = relationship("Procedure", back_populates="control", foreign_keys=[procedure_id]) #: parent procedure - # __mapper_args__ = { - # "polymorphic_identity": "Basic Control", - # "polymorphic_on": case( - # (controltype_name == "PCR Control", "PCR Control"), - # (controltype_name == "Irida Control", "Irida Control"), - # else_="Basic Control" - # ), - # "with_polymorphic": "*", - # } - def __repr__(self) -> str: return f"<{self.controltype_name}({self.name})>" @@ -282,450 +268,3 @@ class Control(BaseClass): def delete(self): self.__database_session__.delete(self) self.__database_session__.commit() - - -# class PCRControl(Control): -# """ -# Class made to hold info from Design & Analysis software. -# """ -# -# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True) -# subtype = Column(String(16)) #: PC or NC -# target = Column(String(16)) #: N1, N2, etc. -# ct = Column(FLOAT) #: PCR result -# reagent_lot = Column(String(64), ForeignKey("_reagent.lot", ondelete="SET NULL", -# name="fk_reagent_lot")) -# reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control -# -# __mapper_args__ = dict(polymorphic_identity="PCR Control", -# polymorphic_load="inline", -# inherit_condition=(id == Control.id)) -# -# def to_sub_dict(self) -> dict: -# """ -# Creates dictionary of fields for this object. -# -# Returns: -# dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date -# """ -# return dict( -# name=self.name, -# ct=self.ct, -# subtype=self.subtype, -# target=self.target, -# reagent_lot=self.reagent_lot, -# submitted_date=self.submitted_date.date() -# ) -# -# @classmethod -# @report_result -# def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]: -# """ -# Creates a PCRFigure. Overrides parent -# -# Args: -# parent (__type__): Widget to contain the chart. -# chart_settings (dict): settings passed down from chart widget -# ctx (Settings): settings passed down from gui. Not used here. -# -# Returns: -# Tuple[Report, "PCRFigure"]: Report of status and resulting figure. -# """ -# from frontend.visualizations.pcr_charts import PCRFigure -# parent.mode_typer.clear() -# parent.mode_typer.setEnabled(False) -# report = Report() -# control = cls.query(proceduretype=chart_settings['submissiontype'], start_date=chart_settings['start_date'], -# end_date=chart_settings['end_date']) -# data = [control.to_sub_dict() for control in control] -# df = DataFrame.from_records(data) -# # NOTE: Get all PCR control with ct over 0 -# try: -# df = df[df.ct > 0.0] -# except AttributeError: -# df = df -# fig = PCRFigure(df=df, modes=[], settings=chart_settings) -# return report, fig -# -# def to_pydantic(self): -# from backend.validators import PydPCRControl -# return PydPCRControl(**self.to_sub_dict(), -# controltype_name=self.controltype_name, -# clientsubmission_id=self.clientsubmission_id) -# -# -# class IridaControl(Control): -# subtyping_allowed = ['kraken'] -# -# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True) -# contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism -# matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism -# kraken = Column(JSON) #: unstructured output from kraken_report -# subtype = Column(String(16), nullable=False) #: EN-NOS, MCS-NOS, etc -# refseq_version = Column(String(16)) #: version of refseq used in fastq parsing -# kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing -# kraken2_db_version = Column(String(32)) #: folder name of kraken2 db -# sample_id = Column(INTEGER, -# ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key -# -# __mapper_args__ = dict(polymorphic_identity="Irida Control", -# polymorphic_load="inline", -# inherit_condition=(id == Control.id)) -# -# @property -# def targets(self): -# if self.controltype.targets: -# return list(itertools.chain.from_iterable([value for key, value in self.controltype.targets.items() -# if key == self.subtype])) -# else: -# return ["None"] -# -# @validates("subtype") -# def enforce_subtype_literals(self, key: str, value: str) -> str: -# """ -# Validates submissiontype field with acceptable values -# -# Args: -# key (str): Field name -# value (str): Field Value -# -# Raises: -# KeyError: Raised if value is not in the acceptable list. -# -# Returns: -# str: Validated string. -# """ -# acceptables = ['ATCC49226', 'ATCC49619', 'EN-NOS', "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"] -# if value.upper() not in acceptables: -# raise KeyError(f"Sub-type must be in {acceptables}") -# return value -# -# def to_sub_dict(self) -> dict: -# """ -# Converts object into convenient dictionary for use in procedure summary -# -# Returns: -# dict: output dictionary containing: Name, Type, Targets, Top Kraken results -# """ -# try: -# kraken = self.kraken -# except TypeError: -# kraken = {} -# try: -# kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()]) -# except AttributeError: -# kraken_cnt_total = 0 -# try: -# new_kraken = [dict(name=key, kraken_count=value['kraken_count'], -# kraken_percent=f"{value['kraken_count'] / kraken_cnt_total:0.2%}", -# target=key in self.controltype.targets) -# for key, value in kraken.items()] -# new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)[0:10] -# except (AttributeError, ZeroDivisionError): -# new_kraken = [] -# output = dict( -# name=self.name, -# type=self.controltype.name, -# targets=", ".join(self.targets), -# kraken=new_kraken -# ) -# return output -# -# def convert_by_mode(self, control_sub_type: str, mode: Literal['kraken', 'matches', 'contains'], -# consolidate: bool = False) -> Generator[dict, None, None]: -# """ -# split this instance into analysis types ('kraken', 'matches', 'contains') for control graphs -# -# Args: -# consolidate (bool): whether to merge all off-target genera. Defaults to False -# control_sub_type (str): control subtype, 'MCS-NOS', etc. -# mode (Literal['kraken', 'matches', 'contains']): analysis type, 'contains', etc. -# -# Returns: -# List[dict]: list of records -# """ -# try: -# data = self.__getattribute__(mode) -# except TypeError: -# data = {} -# if data is None: -# data = {} -# # NOTE: Data truncation and consolidation. -# if "kraken" in mode: -# data = {k: v for k, v in sorted(data.items(), key=lambda d: d[1][f"{mode}_count"], reverse=True)[:50]} -# else: -# if consolidate: -# on_tar = {k: v for k, v in data.items() if k.strip("*") in self.controltype.targets[control_sub_type]} -# off_tar = sum(v[f'{mode}_ratio'] for k, v in data.items() if -# k.strip("*") not in self.controltype.targets[control_sub_type]) -# on_tar['Off-target'] = {f"{mode}_ratio": off_tar} -# data = on_tar -# for genus in data: -# _dict = dict( -# name=self.name, -# submitted_date=self.submitted_date, -# genus=genus, -# target='Target' if genus.strip("*") in self.controltype.targets[control_sub_type] else "Off-target" -# ) -# for key in data[genus]: -# _dict[key] = data[genus][key] -# yield _dict -# -# @classproperty -# def modes(cls) -> List[str]: -# """ -# Get all control modes from database -# -# Returns: -# List[str]: List of control mode names. -# """ -# try: -# cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)] -# except AttributeError as e: -# logger.error(f"Failed to get available modes from db: {e}") -# cols = [] -# return cols -# -# @classmethod -# def make_parent_buttons(cls, parent: QWidget) -> None: -# """ -# Creates buttons for controlling -# -# Args: -# parent (QWidget): chart holding widget to add buttons to. -# -# """ -# super().make_parent_buttons(parent=parent) -# rows = parent.layout.rowCount() - 2 -# # NOTE: check box for consolidating off-target items -# checker = QCheckBox(parent) -# checker.setChecked(True) -# checker.setObjectName("irida_check") -# checker.setToolTip("Pools off-target genera to save time.") -# parent.layout.addWidget(QLabel("Consolidate Off-targets"), rows, 0, 1, 1) -# parent.layout.addWidget(checker, rows, 1, 1, 2) -# checker.checkStateChanged.connect(parent.update_data) -# -# @classmethod -# @report_result -# def make_chart(cls, chart_settings: dict, parent, ctx) -> Tuple[Report, "IridaFigure" | None]: -# """ -# Creates a IridaFigure. Overrides parent -# -# Args: -# parent (__type__): Widget to contain the chart. -# chart_settings (dict): settings passed down from chart widget -# ctx (Settings): settings passed down from gui. -# -# Returns: -# Tuple[Report, "IridaFigure"]: Report of status and resulting figure. -# """ -# from frontend.visualizations import IridaFigure -# try: -# checker = parent.findChild(QCheckBox, name="irida_check") -# if chart_settings['mode'] == "kraken": -# checker.setEnabled(False) -# checker.setChecked(False) -# else: -# checker.setEnabled(True) -# consolidate = checker.isChecked() -# except AttributeError: -# consolidate = False -# report = Report() -# control = cls.query(subtype=chart_settings['submissiontype'], start_date=chart_settings['start_date'], -# end_date=chart_settings['end_date']) -# if not control: -# report.add_result(Result(status="Critical", msg="No control found in given date range.")) -# return report, None -# # NOTE: change each control to list of dictionaries -# data = [control.convert_by_mode(control_sub_type=chart_settings['submissiontype'], mode=chart_settings['mode'], -# consolidate=consolidate) for -# control in control] -# # NOTE: flatten data to one dimensional list -# # data = [item for sublist in data for item in sublist] -# data = flatten_list(data) -# if not data: -# report.add_result(Result(status="Critical", msg="No data found for control in given date range.")) -# return report, None -# df = cls.convert_data_list_to_df(input_df=data, sub_mode=chart_settings['sub_mode']) -# if chart_settings['sub_mode'] is None: -# title = chart_settings['sub_mode'] -# else: -# title = f"{chart_settings['mode']} - {chart_settings['sub_mode']}" -# # NOTE: send dataframe to chart maker -# df, modes = cls.prep_df(ctx=ctx, df=df) -# fig = IridaFigure(df=df, ytitle=title, modes=modes, parent=parent, -# settings=chart_settings) -# return report, fig -# -# @classmethod -# def convert_data_list_to_df(cls, input_df: list[dict], sub_mode) -> DataFrame: -# """ -# Convert list of control records to dataframe -# -# Args: -# input_df (list[dict]): list of dictionaries containing records -# sub_mode (str | None, optional): submissiontype of procedure type. Defaults to None. -# -# Returns: -# DataFrame: dataframe of control -# """ -# df = DataFrame.from_records(input_df) -# safe = ['name', 'submitted_date', 'genus', 'target'] -# for column in df.columns: -# if column not in safe: -# if sub_mode is not None and column != sub_mode: -# continue -# else: -# safe.append(column) -# if "percent" in column: -# try: -# count_col = next(item for item in df.columns if "count" in item) -# except StopIteration: -# continue -# # NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating. -# df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum') -# df = df[[c for c in df.columns if c in safe]] -# # NOTE: move date of sample submitted on same date as previous ahead one. -# df = cls.displace_date(df=df) -# # NOTE: ad hoc method to make data labels more accurate. -# df = cls.df_column_renamer(df=df) -# return df -# -# @classmethod -# def df_column_renamer(cls, df: DataFrame) -> DataFrame: -# """ -# Ad hoc function I created to clarify some fields -# -# Args: -# df (DataFrame): input dataframe -# -# Returns: -# DataFrame: dataframe with 'clarified' column names -# """ -# df = df[df.columns.drop(list(df.filter(regex='_hashes')))] -# return df.rename(columns={ -# "contains_ratio": "contains_shared_hashes_ratio", -# "matches_ratio": "matches_shared_hashes_ratio", -# "kraken_count": "kraken2_read_count_(top_50)", -# "kraken_percent": "kraken2_read_percent_(top_50)" -# }) -# -# @classmethod -# def displace_date(cls, df: DataFrame) -> DataFrame: -# """ -# This function serves to split sample that were submitted on the same date by incrementing dates. -# It will shift the date forward by one day if it is the same day as an existing date in a list. -# -# Args: -# df (DataFrame): input dataframe composed of control records -# -# Returns: -# DataFrame: output dataframe with dates incremented. -# """ -# # NOTE: get submitted dates for each control -# dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in -# sorted(df['name'].unique())] -# previous_dates = set() -# for item in dict_list: -# df, previous_dates = cls.check_date(df=df, item=item, previous_dates=previous_dates) -# return df -# -# @classmethod -# def check_date(cls, df: DataFrame, item: dict, previous_dates: set) -> Tuple[DataFrame, list]: -# """ -# Checks if an items date is already present in df and adjusts df accordingly -# -# Args: -# df (DataFrame): input dataframe -# item (dict): control for checking -# previous_dates (list): list of dates found in previous control -# -# Returns: -# Tuple[DataFrame, list]: Output dataframe and appended list of previous dates -# """ -# try: -# check = item['date'] in previous_dates -# except IndexError: -# check = False -# previous_dates.add(item['date']) -# if check: -# # NOTE: get df locations where name == item name -# mask = df['name'] == item['name'] -# # NOTE: increment date in dataframe -# df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1)) -# item['date'] += timedelta(days=1) -# passed = False -# else: -# passed = True -# # NOTE: if procedure didn't lead to changed date, return values -# if passed: -# return df, previous_dates -# # NOTE: if date was changed, rerun with new date -# else: -# logger.warning(f"Date check failed, running recursion.") -# df, previous_dates = cls.check_date(df, item, previous_dates) -# return df, previous_dates -# -# @classmethod -# def prep_df(cls, ctx: Settings, df: DataFrame) -> Tuple[DataFrame | None, list]: -# """ -# Constructs figures based on parsed pandas dataframe. -# -# Args: -# ctx (Settings): settings passed down from gui -# df (pd.DataFrame): input dataframe -# ytitle (str | None, optional): title for the y-axis. Defaults to None. -# -# Returns: -# Figure: Plotly figure -# """ -# # NOTE: converts starred genera to normal and splits off list of starred -# if df.empty: -# return None, [] -# df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"}) -# df['genera'] = [item[-1] if item and item[-1] == "*" else "" for item in df['genus'].to_list()] -# # NOTE: remove original run, using reruns if applicable -# df = cls.drop_reruns_from_df(ctx=ctx, df=df) -# # NOTE: sort by and exclude from -# sorts = ['submitted_date', "target", "genus"] -# exclude = ['name', 'genera'] -# modes = [item for item in df.columns if item not in sorts and item not in exclude] -# # NOTE: Set descending for any columns that have "{mode}" in the header. -# ascending = [False if item == "target" else True for item in sorts] -# df = df.sort_values(by=sorts, ascending=ascending) -# # NOTE: actual chart construction is done by -# return df, modes -# -# @classmethod -# def drop_reruns_from_df(cls, ctx: Settings, df: DataFrame) -> DataFrame: -# """ -# Removes semi-duplicates from dataframe after finding sequencing repeats. -# -# Args: -# ctx (Settings): settings passed from gui -# df (DataFrame): initial dataframe -# -# Returns: -# DataFrame: dataframe with originals removed in favour of repeats. -# """ -# if 'rerun_regex' in ctx.model_extra: -# sample_names = get_unique_values_in_df_column(df, column_name="name") -# rerun_regex = re.compile(fr"{ctx.rerun_regex}") -# exclude = [re.sub(rerun_regex, "", sample) for sample in sample_names if rerun_regex.search(sample)] -# df = df[~df.name.isin(exclude)] -# return df -# -# def to_pydantic(self) -> "PydIridaControl": -# """ -# Constructs a pydantic version of this object. -# -# Returns: -# PydIridaControl: This object as a pydantic model. -# """ -# from backend.validators import PydIridaControl -# return PydIridaControl(**self.__dict__) -# -# @property -# def is_positive_control(self): -# return not self.subtype.lower().startswith("en") diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py index e72dae8..032d8b4 100644 --- a/src/submissions/backend/db/models/organizations.py +++ b/src/submissions/backend/db/models/organizations.py @@ -3,14 +3,11 @@ All client organization related models. ''' from __future__ import annotations import logging -from pathlib import Path -from pprint import pformat from sqlalchemy import Column, String, INTEGER, ForeignKey, Table -from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, Query from . import Base, BaseClass from tools import check_authorization, setup_lookup -from typing import List, Tuple +from typing import List logger = logging.getLogger(f"submissions.{__name__}") @@ -31,7 +28,7 @@ class ClientLab(BaseClass): id = Column(INTEGER, primary_key=True) #: primary key name = Column(String(64)) #: clientlab name - clientsubmission = relationship("ClientSubmission", back_populates="clientlab") #: procedure this clientlab has submitted + clientsubmission = relationship("ClientSubmission", back_populates="clientlab") #: submission this clientlab has submitted cost_centre = Column(String()) #: cost centre used by org for payment contact = relationship("Contact", back_populates="clientlab", secondary=clientlab_contact) #: contact involved with this org @@ -47,6 +44,7 @@ class ClientLab(BaseClass): Lookup clientlabs in the database by a number of parameters. Args: + id (int | None, optional): id integer of the clientlab. Defaults to None. name (str | None, optional): Name of the clientlab. Defaults to None. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. @@ -104,20 +102,6 @@ class Contact(BaseClass): def searchables(cls): return [] - # @classmethod - # def query_or_create(cls, **kwargs) -> Tuple[Contact, bool]: - # new = False - # disallowed = [] - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(**sanitized_kwargs) - # if not instance or isinstance(instance, list): - # instance = cls() - # new = True - # for k, v in sanitized_kwargs.items(): - # setattr(instance, k, v) - # logger.info(f"Instance from contact query or create: {instance}") - # return instance, new - @classmethod @setup_lookup def query(cls, @@ -131,6 +115,7 @@ class Contact(BaseClass): Lookup contact in the database by a number of parameters. Args: + id (int | None, optional): id integer of the contact. Defaults to None. name (str | None, optional): Name of the contact. Defaults to None. email (str | None, optional): Email of the contact. Defaults to None. phone (str | None, optional): Phone number of the contact. Defaults to None. diff --git a/src/submissions/backend/db/models/procedures.py b/src/submissions/backend/db/models/procedures.py index edb3611..40332f2 100644 --- a/src/submissions/backend/db/models/procedures.py +++ b/src/submissions/backend/db/models/procedures.py @@ -2,13 +2,10 @@ All kittype and reagent related models """ from __future__ import annotations - -import sys -import zipfile, logging, re +import zipfile, logging, re, numpy as np from operator import itemgetter from pprint import pformat -import numpy as np -from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB, func +from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, func from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.ext.associationproxy import association_proxy @@ -16,17 +13,13 @@ from datetime import date, datetime, timedelta from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, timezone, \ jinja_template_loading, flatten_list from typing import List, Literal, Generator, Any, Tuple, TYPE_CHECKING -from pandas import ExcelFile -from pathlib import Path from . import Base, BaseClass, ClientLab, LogMixin -from io import BytesIO -from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError, \ - ArgumentError +from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError if TYPE_CHECKING: from backend.db.models.submissions import Run, ProcedureSampleAssociation - from backend.validators.pydant import PydSample, PydResults + from backend.validators.pydant import PydSample logger = logging.getLogger(f'submissions.{__name__}') @@ -38,14 +31,6 @@ reagentrole_reagent = Table( extend_existing=True ) -# equipmentrole_equipment = Table( -# "_equipmentrole_equipment", -# Base.metadata, -# Column("equipment_id", INTEGER, ForeignKey("_equipment.id")), -# Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), -# extend_existing=True -# ) - equipment_process = Table( "_equipment_process", Base.metadata, @@ -54,30 +39,6 @@ equipment_process = Table( extend_existing=True ) -# equipmentrole_process = Table( -# "_equipmentrole_process", -# Base.metadata, -# Column("process_id", INTEGER, ForeignKey("_process.id")), -# Column("equipmentrole_id", INTEGER, ForeignKey("_equipmentrole.id")), -# extend_existing=True -# ) - -# kittype_process = Table( -# "_kittype_process", -# Base.metadata, -# Column("process_id", INTEGER, ForeignKey("_process.id")), -# Column("kittype_id", INTEGER, ForeignKey("_kittype.id")), -# extend_existing=True -# ) - -# tiprole_tips = Table( -# "_tiprole_tips", -# Base.metadata, -# Column("tiprole_id", INTEGER, ForeignKey("_tiprole.id")), -# Column("tips_id", INTEGER, ForeignKey("_tips.id")), -# extend_existing=True -# ) - process_tips = Table( "_process_tips", Base.metadata, @@ -86,30 +47,6 @@ process_tips = Table( extend_existing=True ) -# equipment_tips = Table( -# "_equipment_tips", -# Base.metadata, -# Column("equipment_id", INTEGER, ForeignKey("_equipment.id")), -# Column("tips_id", INTEGER, ForeignKey("_tips.id")), -# extend_existing=True -# ) - -# kittype_procedure = Table( -# "_kittype_procedure", -# Base.metadata, -# Column("procedure_id", INTEGER, ForeignKey("_procedure.id")), -# Column("kittype_id", INTEGER, ForeignKey("_kittype.id")), -# extend_existing=True -# ) - -# proceduretype_process = Table( -# "_proceduretype_process", -# Base.metadata, -# Column("process_id", INTEGER, ForeignKey("_process.id")), -# Column("proceduretype_id", INTEGER, ForeignKey("_proceduretype.id")), -# extend_existing=True -# ) - submissiontype_proceduretype = Table( "_submissiontype_proceduretype", Base.metadata, @@ -119,335 +56,6 @@ submissiontype_proceduretype = Table( ) -# class KitType(BaseClass): -# """ -# Base of kits used in procedure processing -# """ -# -# omni_sort = BaseClass.omni_sort + ["kittypesubmissiontypeassociations", "kittypereagentroleassociation", -# "process"] -# -# id = Column(INTEGER, primary_key=True) #: primary key -# name = Column(String(64), unique=True) #: name of kittype -# procedure = relationship("Procedure", back_populates="kittype", -# secondary=kittype_procedure) #: run this kittype was used for -# process = relationship("Process", back_populates="kittype", -# secondary=kittype_process) #: equipment process used by this kittype -# -# proceduretypeequipmentroleassociation = relationship("ProcedureTypeEquipmentRoleAssociation", back_populates="kittype", -# cascade="all, delete-orphan",) -# -# equipmentrole = association_proxy("proceduretypeequipmentroleassociation", "equipmentrole") -# -# kittypereagentroleassociation = relationship( -# "KitTypeReagentRoleAssociation", -# back_populates="kittype", -# cascade="all, delete-orphan", -# ) -# -# # NOTE: creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291 -# reagentrole = association_proxy("kittypereagentroleassociation", "reagentrole", -# creator=lambda RT: KitTypeReagentRoleAssociation( -# reagentrole=RT)) #: Association proxy to KitTypeReagentRoleAssociation -# -# kittypeproceduretypeassociation = relationship( -# "ProcedureTypeKitTypeAssociation", -# back_populates="kittype", -# cascade="all, delete-orphan", -# ) #: Relation to SubmissionType -# -# proceduretype = association_proxy("kittypeproceduretypeassociation", "proceduretype", -# creator=lambda PT: ProcedureTypeKitTypeAssociation( -# proceduretype=PT)) #: Association proxy to SubmissionTypeKitTypeAssociation -# -# -# -# @classproperty -# def aliases(cls) -> List[str]: -# """ -# Gets other names the sql object of this class might go by. -# -# Returns: -# List[str]: List of names -# """ -# return super().aliases + [cls.query_alias, "kittype", "kittype"] -# -# def get_reagents(self, -# required_only: bool = False, -# proceduretype: str | ProcedureType | None = None -# ) -> Generator[ReagentRole, None, None]: -# """ -# Return ReagentTypes linked to kittype through KitTypeReagentTypeAssociation. -# -# Args: -# required_only (bool, optional): If true only return required types. Defaults to False. -# proceduretype (str | Submissiontype | None, optional): Submission type to narrow results. Defaults to None. -# -# Returns: -# Generator[ReagentRole, None, None]: List of reagent roles linked to this kittype. -# """ -# match proceduretype: -# case ProcedureType(): -# relevant_associations = [assoc for assoc in self.kittypereagentroleassociation if -# assoc.proceduretype == proceduretype] -# case str(): -# relevant_associations = [assoc for assoc in self.kittypereagentroleassociation if -# assoc.proceduretype.name == proceduretype] -# case _: -# relevant_associations = [assoc for assoc in self.kittypereagentroleassociation] -# if required_only: -# return (assoc.reagentrole for assoc in relevant_associations if assoc.required == 1) -# else: -# return (assoc.reagentrole for assoc in relevant_associations) -# -# def get_equipmentroles(self, proceduretype: str| ProcedureType | None = None) -> Generator[ReagentRole, None, None]: -# match proceduretype: -# case ProcedureType(): -# relevant_associations = [item for item in self.proceduretypeequipmentroleassociation if -# item.proceduretype == proceduretype] -# case str(): -# relevant_associations = [item for item in self.proceduretypeequipmentroleassociation if -# item.proceduretype.name == proceduretype] -# case _: -# relevant_associations = [item for item in self.proceduretypeequipmentroleassociation] -# return (assoc.equipmentrole for assoc in relevant_associations) -# -# -# def construct_xl_map_for_use(self, proceduretype: str | SubmissionType) -> Tuple[dict | None, KitType]: -# """ -# Creates map of locations in Excel workbook for a SubmissionType -# -# Args: -# proceduretype (str | SubmissionType): Submissiontype.name -# -# Returns: -# Generator[(str, str), None, None]: Tuple containing information locations. -# """ -# new_kit = self -# # NOTE: Account for proceduretype variable type. -# match proceduretype: -# case str(): -# # logger.debug(f"Query for {proceduretype}") -# proceduretype = ProcedureType.query(name=proceduretype) -# case SubmissionType(): -# pass -# case _: -# raise ValueError(f"Wrong variable type: {type(proceduretype)} used!") -# # logger.debug(f"Submission type: {proceduretype}, Kit: {self}") -# assocs = [item for item in self.kittypereagentroleassociation if item.proceduretype == proceduretype] -# # logger.debug(f"Associations: {assocs}") -# # NOTE: rescue with procedure type's default kittype. -# if not assocs: -# logger.error( -# f"No associations found with {self}. Attempting rescue with default kittype: {proceduretype.default_kit}") -# new_kit = proceduretype.default_kit -# if not new_kit: -# from frontend.widgets.pop_ups import ObjectSelector -# dlg = ObjectSelector( -# title="Select Kit", -# message="Could not find reagents for this procedure type/kittype type combo.\nSelect new kittype.", -# obj_type=self.__class__, -# values=[kit.name for kit in proceduretype.kittype] -# ) -# if dlg.exec(): -# dlg_result = dlg.parse_form() -# # logger.debug(f"Dialog result: {dlg_result}") -# new_kit = self.__class__.query(name=dlg_result) -# # logger.debug(f"Query result: {new_kit}") -# else: -# return None, new_kit -# assocs = [item for item in new_kit.kittypereagentroleassociation if item.proceduretype == proceduretype] -# output = {assoc.reagentrole.name: assoc.uses for assoc in assocs} -# # logger.debug(f"Output: {output}") -# return output, new_kit -# -# @classmethod -# def query_or_create(cls, **kwargs) -> Tuple[KitType, bool]: -# from backend.validators.pydant import PydKitType -# new = False -# disallowed = ['expiry'] -# sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} -# instance = cls.query(**sanitized_kwargs) -# if not instance or isinstance(instance, list): -# instance = PydKitType(**kwargs) -# new = True -# instance = instance.to_sql() -# logger.info(f"Instance from query or create: {instance}") -# return instance, new -# -# @classmethod -# @setup_lookup -# def query(cls, -# name: str = None, -# proceduretype: str | ProcedureType | None = None, -# id: int | None = None, -# limit: int = 0, -# **kwargs -# ) -> KitType | List[KitType]: -# """ -# Lookup a list of or single KitType. -# -# Args: -# name (str, optional): Name of desired kittype (returns single instance). Defaults to None. -# proceduretype (str | ProcedureType | None, optional): Submission type the kittype is used for. Defaults to None. -# id (int | None, optional): Kit id in the database. Defaults to None. -# limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. -# -# Returns: -# KitType|List[KitType]: KitType(s) of interest. -# """ -# query: Query = cls.__database_session__.query(cls) -# match proceduretype: -# case str(): -# query = query.filter(cls.proceduretype.any(name=proceduretype)) -# case ProcedureType(): -# query = query.filter(cls.proceduretype.contains(proceduretype)) -# case _: -# pass -# match name: -# case str(): -# query = query.filter(cls.name == name) -# limit = 1 -# case _: -# pass -# match id: -# case int(): -# query = query.filter(cls.id == id) -# limit = 1 -# case str(): -# query = query.filter(cls.id == int(id)) -# limit = 1 -# case _: -# pass -# return cls.execute_query(query=query, limit=limit, **kwargs) -# -# @check_authorization -# def save(self): -# super().save() - -# def to_export_dict(self, proceduretype: SubmissionType) -> dict: -# """ -# Creates dictionary for exporting to yml used in new SubmissionType Construction -# -# Args: -# proceduretype (SubmissionType): SubmissionType of interest. -# -# Returns: -# dict: Dictionary containing relevant info for SubmissionType construction -# """ -# base_dict = dict(name=self.name, reagent_roles=[], equipmentrole=[]) -# for key, value in self.construct_xl_map_for_use(proceduretype=proceduretype): -# try: -# assoc = next(item for item in self.kit_reagentrole_associations if item.reagentrole.name == key) -# except StopIteration as e: -# continue -# for kk, vv in assoc.to_export_dict().items(): -# value[kk] = vv -# base_dict['reagent_roles'].append(value) -# for key, value in proceduretype.construct_field_map("equipment"): -# try: -# assoc = next(item for item in proceduretype.proceduretypeequipmentroleassociation if -# item.equipmentrole.name == key) -# except StopIteration: -# continue -# for kk, vv in assoc.to_export_dict(kittype=self).items(): -# value[kk] = vv -# base_dict['equipmentrole'].append(value) -# return base_dict - -# @classmethod -# def import_from_yml(cls, proceduretype: str | SubmissionType, filepath: Path | str | None = None, -# import_dict: dict | None = None) -> KitType: -# if isinstance(proceduretype, str): -# proceduretype = SubmissionType.query(name=proceduretype) -# if filepath: -# yaml.add_constructor("!regex", yaml_regex_creator) -# if isinstance(filepath, str): -# filepath = Path(filepath) -# if not filepath.exists(): -# logging.critical(f"Given file could not be found.") -# return None -# with open(filepath, "r") as f: -# if filepath.suffix == ".json": -# import_dict = json.load(fp=f) -# elif filepath.suffix == ".yml": -# import_dict = yaml.load(stream=f, Loader=yaml.Loader) -# else: -# raise Exception(f"Filetype {filepath.suffix} not supported.") -# new_kit = KitType.query(name=import_dict['kittype']['name']) -# if not new_kit: -# new_kit = KitType(name=import_dict['kittype']['name']) -# for reagentrole in import_dict['kittype']['reagent_roles']: -# new_role = ReagentRole.query(name=reagentrole['reagentrole']) -# if new_role: -# check = input(f"Found existing reagentrole: {new_role.name}. Use this? [Y/n]: ") -# if check.lower() == "n": -# new_role = None -# else: -# pass -# if not new_role: -# eol = timedelta(reagentrole['extension_of_life']) -# new_role = ReagentRole(name=reagentrole['reagentrole'], eol_ext=eol) -# uses = dict(expiry=reagentrole['expiry'], lot=reagentrole['lot'], name=reagentrole['name'], sheet=reagentrole['sheet']) -# ktrr_assoc = KitTypeReagentRoleAssociation(kittype=new_kit, reagentrole=new_role, uses=uses) -# ktrr_assoc.proceduretype = proceduretype -# ktrr_assoc.required = reagentrole['required'] -# ktst_assoc = SubmissionTypeKitTypeAssociation( -# kittype=new_kit, -# proceduretype=proceduretype, -# mutable_cost_sample=import_dict['mutable_cost_sample'], -# mutable_cost_column=import_dict['mutable_cost_column'], -# constant_cost=import_dict['constant_cost'] -# ) -# for reagentrole in import_dict['kittype']['equipmentrole']: -# new_role = EquipmentRole.query(name=reagentrole['reagentrole']) -# if new_role: -# check = input(f"Found existing reagentrole: {new_role.name}. Use this? [Y/n]: ") -# if check.lower() == "n": -# new_role = None -# else: -# pass -# if not new_role: -# new_role = EquipmentRole(name=reagentrole['reagentrole']) -# for equipment in Equipment.assign_equipment(equipmentrole=new_role): -# new_role.control.append(equipment) -# ster_assoc = ProcedureTypeEquipmentRoleAssociation(proceduretype=proceduretype, -# equipmentrole=new_role) -# try: -# uses = dict(name=reagentrole['name'], process=reagentrole['process'], sheet=reagentrole['sheet'], -# static=reagentrole['static']) -# except KeyError: -# uses = None -# ster_assoc.uses = uses -# for process in reagentrole['process']: -# new_process = Process.query(name=process) -# if not new_process: -# new_process = Process(name=process) -# new_process.proceduretype.append(proceduretype) -# new_process.kittype.append(new_kit) -# new_process.equipmentrole.append(new_role) -# return new_kit - -# def to_omni(self, expand: bool = False) -> "OmniKitType": -# from backend.validators.omni_gui_objects import OmniKitType -# if expand: -# processes = [item.to_omni() for item in self.process] -# kittypereagentroleassociation = [item.to_omni() for item in self.kittypereagentroleassociation] -# kittypeproceduretypeassociation = [item.to_omni() for item in self.kittypeproceduretypeassociation] -# else: -# processes = [item.name for item in self.processes] -# kittypereagentroleassociation = [item.name for item in self.kittypereagentroleassociation] -# kittypeproceduretypeassociation = [item.name for item in self.kittypeproceduretypeassociation] -# data = dict( -# name=self.name, -# processes=processes, -# kit_reagentrole_associations=kittypereagentroleassociation, -# kit_submissiontype_associations=kittypeproceduretypeassociation -# ) -# # logger.debug(f"Creating omni for {pformat(data)}") -# return OmniKitType(instance_object=self, **data) - - class ReagentRole(BaseClass): """ Base of reagent type abstract @@ -458,13 +66,11 @@ class ReagentRole(BaseClass): name = Column(String(64)) #: name of reagentrole reagent plays reagent = relationship("Reagent", back_populates="reagentrole", secondary=reagentrole_reagent) #: concrete control of this reagent type - reagentroleproceduretypeassociation = relationship( "ProcedureTypeReagentRoleAssociation", back_populates="reagentrole", cascade="all, delete-orphan", ) #: Relation to KitTypeReagentTypeAssociation - # creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291 proceduretype = association_proxy("reagentroleproceduretypeassociation", "proceduretype", creator=lambda proceduretype: ProcedureTypeReagentRoleAssociation( @@ -500,7 +106,7 @@ class ReagentRole(BaseClass): Args: id (id | None, optional): Id of the object. Defaults to None. name (str | None, optional): Reagent type name. Defaults to None. - kittype (KitType | str | None, optional): Kit the type of interest belongs to. Defaults to None. + proceduretype (ProcedureType | None, optional): Procedure the type of interest belongs to. Defaults to None. reagent (Reagent | str | None, optional): Concrete instance of the type of interest. Defaults to None. limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0. @@ -560,12 +166,10 @@ class ReagentRole(BaseClass): def to_omni(self, expand: bool = False): from backend.validators.omni_gui_objects import OmniReagentRole - logger.debug(f"Constructing OmniReagentRole with name {self.name}") return OmniReagentRole(instance_object=self, name=self.name, eol_ext=self.eol_ext) def get_reagents(self, proceduretype: str | ProcedureType | None = None): if not proceduretype: - # return [f"{reagent.name} - {reagent.lot} - {reagent.expiry}" for reagent in self.reagent] return [reagent.to_pydantic() for reagent in self.reagent] if isinstance(proceduretype, str): proceduretype = ProcedureType.query(name=proceduretype) @@ -578,12 +182,11 @@ class ReagentRole(BaseClass): last_used = None if last_used: reagents.insert(0, reagents.pop(reagents.index(last_used))) - # return [f"{reagent.name} - {reagent.lot} - {reagent.expiry}" for reagent in reagents] return [reagent.to_pydantic(reagentrole=self.name) for reagent in reagents] - def details_dict(self, **kwargs): - output = super().details_dict(**kwargs) - return output + # def details_dict(self, **kwargs): + # output = super().details_dict(**kwargs) + # return output class Reagent(BaseClass, LogMixin): @@ -593,12 +196,12 @@ class Reagent(BaseClass, LogMixin): id = Column(INTEGER, primary_key=True) #: primary key reagentrole = relationship("ReagentRole", back_populates="reagent", - secondary=reagentrole_reagent) #: joined parent reagent type + secondary=reagentrole_reagent) #: joined parent ReagentRole reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id", ondelete='SET NULL', - name="fk_REG_reagent_role_id")) #: id of parent reagent type + name="fk_REG_reagent_role_id")) #: id of parent ReagentRole eol_ext = Column(Interval()) #: extension of life interval name = Column(String(64)) #: reagent name - cost_per_ml = Column(FLOAT) + cost_per_ml = Column(FLOAT(2)) #: amount a millilitre of reagent costs reagentlot = relationship("ReagentLot", back_populates="reagent") def __repr__(self): @@ -612,67 +215,17 @@ class Reagent(BaseClass, LogMixin): super().__init__(*args, **kwargs) self.name = name self.eol_ext = eol_ext - # for key, value in kwargs.items(): - # setattr(self, key, value) @classproperty def searchables(cls): return [dict(label="Lot", field="lot")] - # def to_sub_dict(self, kittype: KitType = None, full_data: bool = False, **kwargs) -> dict: - # """ - # dictionary containing values necessary for gui - # - # Args: - # kittype (KitType, optional): KitType to use to get reagent type. Defaults to None. - # full_data (bool, optional): Whether to include procedure in data for details. Defaults to False. - # - # Returns: - # dict: representation of the reagent's attributes - # """ - # if kittype is not None: - # # NOTE: Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType - # reagent_role = next((item for item in set(self.reagentrole).intersection(kittype.reagentrole)), - # self.reagentrole[0]) - # else: - # try: - # reagent_role = self.reagentrole[0] - # except IndexError: - # reagent_role = None - # try: - # rtype = reagent_role.name.replace("_", " ") - # except AttributeError: - # rtype = "Unknown" - # # NOTE: Calculate expiry with EOL from ReagentType - # try: - # place_holder = self.expiry + reagent_role.eol_ext - # except (TypeError, AttributeError) as e: - # place_holder = date.today() - # logger.error(f"We got a type error setting {self.lot} expiry: {e}. setting to today for testing") - # # NOTE: The notation for not having an expiry is 1970.01.01 - # if self.expiry.year == 1970: - # place_holder = "NA" - # else: - # place_holder = place_holder.strftime("%Y-%m-%d") - # output = dict( - # name=self.name, - # reagentrole=rtype, - # lot=self.lot, - # expiry=place_holder, - # missing=False - # ) - # if full_data: - # output['procedure'] = [sub.rsl_plate_number for sub in self.procedures] - # output['excluded'] = ['missing', 'procedure', 'excluded', 'editable'] - # output['editable'] = ['lot', 'expiry'] - # return output - def update_last_used(self, proceduretype: ProcedureType) -> Report: """ Updates last used reagent lot for ReagentType/KitType Args: - kit (KitType): Kit this instance is used in. + proceduretype (ProcedureType): ProcedureType this instance is used in. Returns: Report: Result of operation @@ -690,25 +243,6 @@ class Reagent(BaseClass, LogMixin): report.add_result(Result(msg=f"Updating last used {rt} was not performed.", status="Information")) return report - # @classmethod - # def query_or_create(cls, **kwargs) -> Reagent: - # from backend.validators.pydant import PydReagent - # new = False - # disallowed = ['expiry'] - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(**sanitized_kwargs) - # if not instance or isinstance(instance, list): - # if "reagentrole" not in kwargs: - # try: - # kwargs['reagentrole'] = kwargs['name'] - # except KeyError: - # pass - # instance = PydReagent(**kwargs) - # new = True - # instance = instance.to_sql() - # logger.info(f"Instance from query or create: {instance}") - # return instance, new - @classmethod @setup_lookup def query(cls, @@ -724,8 +258,8 @@ class Reagent(BaseClass, LogMixin): Args: id (int | None, optional): reagent id number - reagent_role (str | models.ReagentType | None, optional): Reagent type. Defaults to None. - lot_number (str | None, optional): Reagent lot number. Defaults to None. + reagentrole (str | models.ReagentType | None, optional): Reagent type. Defaults to None. + lot (str | None, optional): Reagent lot number. Defaults to None. name (str | None, optional): Reagent name. Defaults to None. limit (int, optional): limit of results returned. Defaults to 0. @@ -778,16 +312,8 @@ class Reagent(BaseClass, LogMixin): return case "comment": return - # case "expiry": - # if isinstance(value, str): - # value = date(year=1970, month=1, day=1) - # # NOTE: if min time is used, any reagent set to expire today (Bac postive control, eg) will have expired at midnight and therefore be flagged. - # # NOTE: Make expiry at date given, plus maximum time = end of day - # value = datetime.combine(value, datetime.max.time()) - # value = value.replace(tzinfo=timezone) case _: pass - logger.debug(f"Role to be set to: {value}") try: self.__setattr__(key, value) except AttributeError as e: @@ -796,7 +322,6 @@ class Reagent(BaseClass, LogMixin): @check_authorization def edit_from_search(self, obj, **kwargs): from frontend.widgets.omni_add_edit import AddEdit - # logger.debug(f"Calling edit_from_search for {self.name}") dlg = AddEdit(parent=None, instance=self) if dlg.exec(): pyd = dlg.parse_form() @@ -852,6 +377,18 @@ class ReagentLot(BaseClass): name: str | None = None, limit: int = 1, **kwargs) -> ReagentLot | List[ReagentLot]: + """ + + Args: + lot ( str | None, optional): Lot number of this reagent instance. Defaults to None. + name ( str | None, optional): Name of this reagent instance. Defaults to None. + limit ( int ): Limit of number of query results. + **kwargs (): + + Returns: + ReagentLot | List[ReagentLot] + + """ query: Query = cls.__database_session__.query(cls) match lot: case str(): @@ -890,9 +427,9 @@ class Discount(BaseClass): skip_on_edit = True id = Column(INTEGER, primary_key=True) #: primary key - proceduretype = relationship("ProcedureType") #: joined parent reagent type + proceduretype = relationship("ProcedureType") #: joined parent proceduretype proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id", ondelete='SET NULL', - name="fk_DIS_procedure_type_id")) #: id of joined kittype + name="fk_DIS_procedure_type_id")) #: id of joined proceduretype clientlab = relationship("ClientLab") #: joined client lab clientlab_id = Column(INTEGER, ForeignKey("_clientlab.id", ondelete='SET NULL', @@ -918,7 +455,7 @@ class Discount(BaseClass): Args: clientlab (models.ClientLab | str | int): ClientLab receiving discount. - kittype (models.KitType | str | int): Kit discount received on. + proceduretype (models.ProcedureType | str | int): Kit discount received on. Returns: models.Discount|List[models.Discount]: Discount(s) of interest. @@ -956,14 +493,11 @@ class SubmissionType(BaseClass): id = Column(INTEGER, primary_key=True) #: primary key name = Column(String(128), unique=True) #: name of procedure type - # info_map = Column(JSON) #: Where parsable information is found in the excel workbook corresponding to this type. defaults = Column(JSON) #: Basic information about this procedure type clientsubmission = relationship("ClientSubmission", - back_populates="submissiontype") #: Concrete control of this type. - # template_file = Column(BLOB) #: Blank form for this type stored as binary. - # sample_map = Column(JSON) #: Where sample information is found in the excel sheet corresponding to this type. + back_populates="submissiontype") #: Instances of this submission type proceduretype = relationship("ProcedureType", back_populates="submissiontype", - secondary=submissiontype_proceduretype) #: run this kittype was used for + secondary=submissiontype_proceduretype) #: Procedures associated with this submission type def __repr__(self) -> str: """ @@ -982,95 +516,21 @@ class SubmissionType(BaseClass): """ return super().aliases + ["submissiontypes"] - # @classproperty - # def omni_removes(cls): - # return super().omni_removes + ["defaults"] - # - # @classproperty - # def basic_template(cls) -> bytes: + # def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: # """ - # Grabs the default excel template file. - # - # Returns: - # bytes: The Excel sheet. - # """ - # submission_type = cls.query(name="Bacterial Culture") - # return submission_type.template_file - # - # @property - # def template_file_sheets(self) -> List[str]: - # """ - # Gets names of sheet in the stored blank form. - # - # Returns: - # List[str]: List of sheet names - # """ - # try: - # return ExcelFile(BytesIO(self.template_file), engine="openpyxl").sheet_names - # except zipfile.BadZipfile: - # return [] - - # def set_template_file(self, filepath: Path | str): - # """ - # - # Sets the binary store to an Excel file. + # Make a map of all locations for tips or equipment. # # Args: - # filepath (Path | str): Path to the template file. - # - # Raises: - # ValueError: Raised if file is not Excel file. - # """ - # if isinstance(filepath, str): - # filepath = Path(filepath) - # try: - # ExcelFile(filepath) - # except ValueError: - # raise ValueError(f"File {filepath} is not of appropriate type.") - # with open(filepath, "rb") as f: - # data = f.read() - # self.template_file = data - # self.save() - # - # def construct_info_map(self, mode: Literal['read', 'write', 'export']) -> dict: - # """ - # Make of map of where all fields are located in Excel sheet - # - # Args: - # mode (Literal["read", "write"]): Which mode to get locations for + # field (Literal['equipment', 'tip']): the field to construct a map for # # Returns: - # dict: Map of locations + # Generator[(str, dict), None, None]: Generator composing key, locations for each item in the map # """ - # info = {k: v for k, v in self.info_map.items() if k != "custom"} - # match mode: - # case "read": - # output = {k: v[mode] for k, v in info.items() if v[mode]} - # case "write": - # output = {k: v[mode] + v['read'] for k, v in info.items() if v[mode] or v['read']} - # output = {k: v for k, v in output.items() if all([isinstance(item, dict) for item in v])} - # case "export": - # return self.info_map - # case _: - # output = {} - # output['custom'] = self.info_map['custom'] - # return output - - def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: - """ - Make a map of all locations for tips or equipment. - - Args: - field (Literal['equipment', 'tip']): the field to construct a map for - - Returns: - Generator[(str, dict), None, None]: Generator composing key, locations for each item in the map - """ - for item in self.__getattribute__(f"submissiontype_{field}role_associations"): - fmap = item.uses - if fmap is None: - fmap = {} - yield getattr(item, f"{field}_role").name, fmap + # for item in self.__getattribute__(f"submissiontype_{field}role_associations"): + # fmap = item.uses + # if fmap is None: + # fmap = {} + # yield getattr(item, f"{field}_role").name, fmap @classmethod def query_or_create(cls, **kwargs) -> Tuple[SubmissionType, bool]: @@ -1171,8 +631,6 @@ class SubmissionType(BaseClass): Returns: str: String from which regex will be compiled. """ - # logger.debug(f"Class for regex: {cls}") - # logger.debug(f"Looking for {submission_type}") if not isinstance(submission_type, SubmissionType): submission_type = cls.query(name=submission_type['name']) if isinstance(submission_type, list): @@ -1190,42 +648,23 @@ class SubmissionType(BaseClass): regex = re.compile(rf"{regex}", flags=re.IGNORECASE | re.VERBOSE) except re.error as e: regex = None - # logger.debug(f"Returning regex: {regex}") return regex class ProcedureType(BaseClass): id = Column(INTEGER, primary_key=True) name = Column(String(64)) - # reagent_map = Column(JSON) - # info_map = Column(JSON) - # sample_map = Column(JSON) - # equipment_map = Column(JSON) plate_columns = Column(INTEGER, default=0) plate_rows = Column(INTEGER, default=0) allowed_result_methods = Column(JSON) - # template_file = Column(BLOB) - plate_cost = Column(FLOAT) + plate_cost = Column(FLOAT(2)) procedure = relationship("Procedure", back_populates="proceduretype") #: Concrete control of this type. - # process = relationship("Process", back_populates="proceduretype", - # secondary=proceduretype_process) #: Relation to equipment process used for this type. - submissiontype = relationship("SubmissionType", back_populates="proceduretype", secondary=submissiontype_proceduretype) #: run this kittype was used for - # proceduretypekittypeassociation = relationship( - # "ProcedureTypeKitTypeAssociation", - # back_populates="proceduretype", - # cascade="all, delete-orphan", - # ) #: Association of kittypes - # - # kittype = association_proxy("proceduretypekittypeassociation", "kittype", - # creator=lambda kit: ProcedureTypeKitTypeAssociation( - # kittype=kit)) #: Proxy of kittype association - proceduretypeequipmentroleassociation = relationship( "ProcedureTypeEquipmentRoleAssociation", back_populates="proceduretype", @@ -1246,51 +685,10 @@ class ProcedureType(BaseClass): creator=lambda reagentrole: ProcedureTypeReagentRoleAssociation( reagentrole=reagentrole)) #: Proxy of equipmentrole associations - # proceduretypetiproleassociation = relationship( - # "ProcedureTypeTipRoleAssociation", - # back_populates="proceduretype", - # cascade="all, delete-orphan" - # ) #: Association of tiproles - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.allowed_result_methods = dict() - # @property - # def template_file_sheets(self) -> List[str]: - # """ - # Gets names of sheet in the stored blank form. - # - # Returns: - # List[str]: List of sheet names - # """ - # try: - # return ExcelFile(BytesIO(self.template_file), engine="openpyxl").sheet_names - # except zipfile.BadZipfile: - # return [] - # - # def set_template_file(self, filepath: Path | str): - # """ - # - # Sets the binary store to an Excel file. - # - # Args: - # filepath (Path | str): Path to the template file. - # - # Raises: - # ValueError: Raised if file is not Excel file. - # """ - # if isinstance(filepath, str): - # filepath = Path(filepath) - # try: - # ExcelFile(filepath) - # except ValueError: - # raise ValueError(f"File {filepath} is not of appropriate type.") - # with open(filepath, "rb") as f: - # data = f.read() - # self.template_file = data - # self.save() - def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: """ Make a map of all locations for tips or equipment. @@ -1307,19 +705,6 @@ class ProcedureType(BaseClass): fmap = {} yield getattr(item, f"{field}_role").name, fmap - # @property - # def default_kit(self) -> KitType | None: - # """ - # If only one kits exists for this Submission Type, return it. - # - # Returns: - # KitType | None: - # """ - # if len(self.kittype) == 1: - # return self.kittype[0] - # else: - # return None - def get_equipment(self) -> Generator['PydEquipmentRole', None, None]: """ Returns PydEquipmentRole of all equipment associated with this SubmissionType @@ -1354,15 +739,6 @@ class ProcedureType(BaseClass): raise TypeError(f"Type {type(equipmentrole)} is not allowed") return list(set([item for items in relevant for item in items if item is not None])) - # @property - # def as_dict(self): - # return dict( - # name=self.name, - # kittype=[item.name for item in self.kittype], - # plate_rows=self.plate_rows, - # plate_columns=self.plate_columns - # ) - def details_dict(self, **kwargs): output = super().details_dict(**kwargs) output['reagentrole'] = [item.details_dict() for item in output['reagentrole']] @@ -1398,7 +774,6 @@ class ProcedureType(BaseClass): if self.plate_rows == 0 or self.plate_columns == 0: return "
" sample_dicts = self.pad_sample_dicts(sample_dicts=sample_dicts) - # logger.debug(f"Sample dicts: {pformat(sample_dicts)}") vw = round((-0.07 * len(sample_dicts)) + 12.2, 1) # NOTE: An overly complicated list comprehension create a list of sample locations # NOTE: next will return a blank cell if no value found for row/column @@ -1411,14 +786,11 @@ class ProcedureType(BaseClass): def pad_sample_dicts(self, sample_dicts: List["PydSample"]): from backend.validators.pydant import PydSample output = [] - # logger.debug(f"Rows: {self.plate_rows}") - # logger.debug(f"Columns: {self.plate_columns}") for row, column in self.ranked_plate.values(): sample = next((sample for sample in sample_dicts if sample.row == row and sample.column == column), PydSample(**dict(sample_id="", row=row, column=column, enabled=False))) sample.background_color = "#6ffe1d" if sample.enabled else "#ffffff" output.append(sample) - # logger.debug(f"Appending {sample} at row {row}, column {column}") return output @property @@ -1432,13 +804,12 @@ class ProcedureType(BaseClass): class Procedure(BaseClass): - id = Column(INTEGER, primary_key=True) - name = Column(String, unique=True) - repeat = Column(INTEGER, nullable=False) - repeat_of = Column(String) + id = Column(INTEGER, primary_key=True) #: Primary key + name = Column(String, unique=True) #: Name of the procedure (RSL number) + repeat_of_id = Column(INTEGER, ForeignKey("_procedure.id", name="fk_repeat_id")) + repeat_of = relationship("Procedure", remote_side=[id]) started_date = Column(TIMESTAMP) completed_date = Column(TIMESTAMP) - technician = Column(String(64)) #: name of processing tech(s) results = relationship("Results", back_populates="procedure", uselist=True) proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id", ondelete="SET NULL", @@ -1447,9 +818,6 @@ class Procedure(BaseClass): run_id = Column(INTEGER, ForeignKey("_run.id", ondelete="SET NULL", name="fk_PRO_basicrun_id")) #: client lab id from _organizations)) run = relationship("Run", back_populates="procedure") - # kittype_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete="SET NULL", - # name="fk_PRO_kittype_id")) #: client lab id from _organizations)) - # kittype = relationship("KitType", back_populates="procedure") control = relationship("Control", back_populates="procedure", uselist=True) #: A control sample added to procedure proceduresampleassociation = relationship( @@ -1481,21 +849,9 @@ class Procedure(BaseClass): equipment = association_proxy("procedureequipmentassociation", "equipment") #: Association proxy to RunEquipmentAssociation.equipment - # proceduretipsassociation = relationship( - # "ProcedureTipsAssociation", - # back_populates="procedure", - # cascade="all, delete-orphan") - # - # tips = association_proxy("proceduretipsassociation", - # "tips") - - @validates('repeat') - def validate_repeat(self, key, value): - if value > 1: - value = 1 - if value < 0: - value = 0 - return value + @hybrid_property + def repeat(self) -> bool: + return self.repeat_of is not None @classmethod @setup_lookup @@ -1514,7 +870,6 @@ class Procedure(BaseClass): if start_date is not None: start_date = cls.rectify_query_date(start_date) end_date = cls.rectify_query_date(end_date, eod=True) - logger.debug(f"Start date: {start_date}, end date: {end_date}") query = query.filter(cls.started_date.between(start_date, end_date)) match id: case int(): @@ -1530,11 +885,6 @@ class Procedure(BaseClass): pass return cls.execute_query(query=query, limit=limit) - # def to_dict(self, full_data: bool = False): - # output = dict() - # output['name'] = self.name - # return output - @property def custom_context_events(self) -> dict: """ @@ -1547,7 +897,7 @@ class Procedure(BaseClass): return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names} def add_results(self, obj, resultstype_name: str): - logger.debug(f"Add Results! {resultstype_name}") + logger.info(f"Add Results! {resultstype_name}") from backend.managers import results results_manager = getattr(results, f"{resultstype_name}Manager") rs = results_manager(procedure=self, parent=obj) @@ -1566,28 +916,10 @@ class Procedure(BaseClass): Args: obj (_type_): parent widget """ - logger.debug(f"Add equipment") + logger.info(f"Add equipment") from frontend.widgets.equipment_usage_2 import EquipmentUsage dlg = EquipmentUsage(parent=obj, procedure=self.to_pydantic()) if dlg.exec(): - # equipment = dlg.parse_form() - # for equip in equipment: - # logger.debug(f"Parsed equipment: {equip}") - # _, assoc = equip.to_sql(procedure=self) - # logger.debug(f"Got equipment association: {assoc} for {equip}") - # try: - # assoc.save() - # except AttributeError as e: - # logger.error(f"Couldn't save association with {equip} due to {e}") - # if equip.tips: - # for tips in equip.tips: - # # logger.debug(f"Attempting to add tips assoc: {tips} (pydantic)") - # tassoc, _ = tips.to_sql(procedure=self) - # # logger.debug(f"Attempting to add tips assoc: {tips.__dict__} (sql)") - # if tassoc not in self.proceduretipsassociation: - # tassoc.save() - # else: - # logger.error(f"Tips already found in submission, skipping.") dlg.save_procedure() def edit(self, obj): @@ -1595,23 +927,17 @@ class Procedure(BaseClass): logger.debug("Edit!") dlg = ProcedureCreation(parent=obj, procedure=self.to_pydantic(), edit=True) if dlg.exec(): - logger.debug("Edited") sql, _ = dlg.return_sql() sql.save() def add_comment(self, obj): logger.debug("Add Comment!") - # def show_details(self, obj): - # logger.debug("Show Details!") - def delete(self, obj): logger.debug("Delete!") def details_dict(self, **kwargs): output = super().details_dict() - # output['kittype'] = output['kittype'].details_dict() - # output['kit_type'] = self.kittype.name output['proceduretype'] = output['proceduretype'].details_dict()['name'] output['results'] = [result.details_dict() for result in output['results']] run_samples = [sample for sample in self.run.sample] @@ -1621,29 +947,22 @@ class Procedure(BaseClass): sample['active'] = True inactive_samples = [sample.details_dict() for sample in run_samples if sample.name not in [s['sample_id'] for s in active_samples]] - # logger.debug(f"Inactive samples:{pformat(inactive_samples)}") for sample in inactive_samples: sample['active'] = False - # output['sample'] = [sample.details_dict() for sample in output['runsampleassociation']] output['sample'] = active_samples + inactive_samples - logger.debug(f"Procedure samples: \n\n{pformat(output['sample'])}\n\n") - # output['sample'] = [sample.details_dict() for sample in output['sample']] output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']] output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']] - # output['tips'] = [tips.details_dict() for tips in output['proceduretipsassociation']] - output['repeat'] = bool(output['repeat']) + output['repeat'] = self.repeat output['run'] = self.run.name output['excluded'] += self.get_default_info("details_ignore") output['sample_count'] = len(active_samples) output['clientlab'] = self.run.clientsubmission.clientlab.name output['cost'] = 0.00 - # output = self.clean_details_dict(output) return output def to_pydantic(self, **kwargs): - from backend.validators.pydant import PydResults, PydReagent + from backend.validators.pydant import PydReagent output = super().to_pydantic() - logger.debug(f"Pydantic output: \n\n{pformat(output.__dict__)}\n\n") try: output.kittype = dict(value=output.kittype['name'], missing=False) except KeyError: @@ -1657,29 +976,15 @@ class Procedure(BaseClass): for reagent in output.reagent: match reagent: case dict(): - # reagent['reagentrole'] = next((reagentrole.name for reagentrole in self.kittype.reagentrole if reagentrole == reagent['reagentrole']), None) reagents.append(PydReagent(**reagent)) case PydReagent(): reagents.append(reagent) case _: pass - # output.reagent = [PydReagent(**item) for item in output.reagent] output.reagent = reagents - # results = [] - # for result in output.results: - # match result: - # case dict(): - # results.append(PydResults(**result)) - # case PydResults(): - # results.append(result) - # case _: - # pass - # output.results = results output.result = [item.to_pydantic() for item in self.results] output.sample_results = flatten_list( [[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation]) - # for sample in output.sample: - # sample.enabled = True return output def create_proceduresampleassociations(self, sample): @@ -1690,10 +995,6 @@ class Procedure(BaseClass): def get_default_info(cls, *args) -> dict | list | str: dicto = super().get_default_info() recover = ['filepath', 'sample', 'csv', 'comment', 'equipment'] - # ['id', "results", "proceduresampleassociation", "sample", - # "procedurereagentlotassociation", - # "procedureequipmentassociation", "proceduretipsassociation", "reagent", "equipment", - # "tips", "control", "kittype"] dicto.update(dict( details_ignore=['excluded', 'reagents', 'sample', 'extraction_info', 'comment', 'barcode', 'platemap', 'export_map', 'equipment', 'tips', 'custom', 'reagentlot', 'reagent_lot', @@ -1718,157 +1019,19 @@ class Procedure(BaseClass): return output -# class ProcedureTypeKitTypeAssociation(BaseClass): -# """ -# Abstract of relationship between kits and their procedure type. -# """ -# -# omni_removes = BaseClass.omni_removes + ["proceduretype_id", "kittype_id"] -# omni_sort = ["proceduretype", "kittype"] -# level = 2 -# -# proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), -# primary_key=True) #: id of joined procedure type -# kittype_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of joined kittype -# mutable_cost_column = Column( -# FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc) -# mutable_cost_sample = Column( -# FLOAT(2)) #: dollar amount that can change with number of sample (reagents, tips, etc) -# constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc) -# -# kittype = relationship(KitType, back_populates="kittypeproceduretypeassociation") #: joined kittype -# -# # reference to the "SubmissionType" object -# proceduretype = relationship(ProcedureType, -# back_populates="proceduretypekittypeassociation") #: joined procedure type -# -# def __init__(self, kittype=None, proceduretype=None, -# mutable_cost_column: int = 0.00, mutable_cost_sample: int = 0.00, constant_cost: int = 0.00): -# self.kittype = kittype -# self.proceduretype = proceduretype -# self.mutable_cost_column = mutable_cost_column -# self.mutable_cost_sample = mutable_cost_sample -# self.constant_cost = constant_cost -# -# def __repr__(self) -> str: -# """ -# Returns: -# str: Representation of this object -# """ -# try: -# proceduretype_name = self.proceduretype.name -# except AttributeError: -# proceduretype_name = "None" -# try: -# kittype_name = self.kittype.name -# except AttributeError: -# kittype_name = "None" -# return f"" -# -# @property -# def name(self): -# try: -# return f"{self.proceduretype.name} -> {self.kittype.name}" -# except AttributeError: -# return "Blank SubmissionTypeKitTypeAssociation" -# -# @classmethod -# def query_or_create(cls, **kwargs) -> Tuple[ProcedureTypeKitTypeAssociation, bool]: -# new = False -# disallowed = ['expiry'] -# sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} -# instance = cls.query(**sanitized_kwargs) -# if not instance or isinstance(instance, list): -# instance = cls() -# new = True -# for k, v in sanitized_kwargs.items(): -# setattr(instance, k, v) -# logger.info(f"Instance from ProcedureTypeKitTypeAssociation query or create: {instance}") -# return instance, new -# -# @classmethod -# @setup_lookup -# def query(cls, -# proceduretype: ProcedureType | str | int | None = None, -# kittype: KitType | str | int | None = None, -# limit: int = 0, -# **kwargs -# ) -> ProcedureTypeKitTypeAssociation | List[ProcedureTypeKitTypeAssociation]: -# """ -# Lookup SubmissionTypeKitTypeAssociations of interest. -# -# Args: -# proceduretype (ProcedureType | str | int | None, optional): Identifier of procedure type. Defaults to None. -# kittype (KitType | str | int | None, optional): Identifier of kittype type. Defaults to None. -# limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. -# -# Returns: -# SubmissionTypeKitTypeAssociation|List[SubmissionTypeKitTypeAssociation]: SubmissionTypeKitTypeAssociation(s) of interest -# """ -# query: Query = cls.__database_session__.query(cls) -# match proceduretype: -# case ProcedureType(): -# query = query.filter(cls.proceduretype == proceduretype) -# case str(): -# query = query.join(ProcedureType).filter(ProcedureType.name == proceduretype) -# case int(): -# query = query.join(ProcedureType).filter(ProcedureType.id == proceduretype) -# match kittype: -# case KitType(): -# query = query.filter(cls.kittype == kittype) -# case str(): -# query = query.join(KitType).filter(KitType.name == kittype) -# case int(): -# query = query.join(KitType).filter(KitType.id == kittype) -# if kittype is not None and proceduretype is not None: -# limit = 1 -# return cls.execute_query(query=query, limit=limit) -# -# def to_omni(self, expand: bool = False): -# from backend.validators.omni_gui_objects import OmniSubmissionTypeKitTypeAssociation -# if expand: -# try: -# submissiontype = self.submission_type.to_omni() -# except AttributeError: -# submissiontype = "" -# try: -# kittype = self.kit_type.to_omni() -# except AttributeError: -# kittype = "" -# else: -# submissiontype = self.submission_type.name -# kittype = self.kit_type.name -# return OmniSubmissionTypeKitTypeAssociation( -# instance_object=self, -# submissiontype=submissiontype, -# kittype=kittype, -# mutable_cost_column=self.mutable_cost_column, -# mutable_cost_sample=self.mutable_cost_sample, -# constant_cost=self.constant_cost -# ) -# - class ProcedureTypeReagentRoleAssociation(BaseClass): """ table containing reagenttype/kittype associations DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html """ - # omni_removes = BaseClass.omni_removes + ["submission_type_id", "kits_id", "reagent_roles_id", "last_used"] - # omni_sort = ["proceduretype", "kittype", "reagentrole", "required", "uses"] - # omni_inheritable = ["proceduretype", "kittype"] - reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id"), - primary_key=True) #: id of associated reagent type - # kittype_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of associated reagent type - proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) + primary_key=True) #: id of associated reagentrole + proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated proceduretype uses = Column(JSON) #: map to location on excel sheets of different procedure types required = Column(INTEGER) #: whether the reagent type is required for the kittype (Boolean 1 or 0) last_used = Column(String(32)) #: last used lot number of this type of reagent - ml_used_per_sample = Column(FLOAT) - - # kittype = relationship(KitType, - # back_populates="kittypereagentroleassociation") #: relationship to associated KitType + ml_used_per_sample = Column(FLOAT(2)) #: amount of reagent used in the procedure # NOTE: reference to the "ReagentType" object reagentrole = relationship(ReagentRole, @@ -1879,7 +1042,6 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): back_populates="proceduretypereagentroleassociation") #: relationship to associated SubmissionType def __init__(self, proceduretype=None, reagentrole=None, uses=None, required=1): - # self.kittype = kittype self.proceduretype = proceduretype self.reagentrole = reagentrole self.uses = uses @@ -1893,7 +1055,7 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): try: return f"{self.proceduretype.name} -> {self.reagentrole.name}" except AttributeError: - return "Blank KitTypeReagentRole" + return "Blank ProcedureTypeReagentRole" @validates('required') def validate_required(self, key, value): @@ -1947,11 +1109,6 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): for k, v in sanitized_kwargs.items(): logger.debug(f"Key: {k} has value: {v}") match k: - # case "kittype": - # if isinstance(v, str): - # v = KitType.query(name=v) - # else: - # v = v.instance_object case "proceduretype": if isinstance(v, str): v = SubmissionType.query(name=v) @@ -1971,7 +1128,6 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): @classmethod @setup_lookup def query(cls, - # kittype: KitType | str | None = None, reagentrole: ReagentRole | str | None = None, proceduretype: ProcedureType | str | None = None, limit: int = 0, @@ -1981,21 +1137,14 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): Lookup junction of ReagentType and KitType Args: - kittype (models.KitType | str | None): KitType of interest. - reagentrole (models.ReagentType | str | None): ReagentType of interest. + proceduretype (models.ProcedureType | str | None, optional): KitType of interest. Defaults to None. + reagentrole (models.ReagentRole | str | None, optional): ReagentRole of interest. Defaults to None. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. Returns: models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest. """ query: Query = cls.__database_session__.query(cls) - # match kittype: - # case KitType(): - # query = query.filter(cls.kit_type == kittype) - # case str(): - # query = query.join(KitType).filter(KitType.name == kittype) - # case _: - # pass match reagentrole: case ReagentRole(): query = query.filter(cls.reagent_role == reagentrole) @@ -2011,8 +1160,6 @@ class ProcedureTypeReagentRoleAssociation(BaseClass): case _: pass pass - # if kittype is not None and reagentrole is not None: - # limit = 1 return cls.execute_query(query=query, limit=limit) def get_all_relevant_reagents(self) -> Generator[Reagent, None, None]: @@ -2092,7 +1239,7 @@ class ProcedureReagentLotAssociation(BaseClass): reagentlot_id = Column(INTEGER, ForeignKey("_reagentlot.id"), primary_key=True) #: id of associated reagent procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure - reagentrole = Column(String(64)) + reagentrole = Column(String(64)) #: Name of associated reagentrole (for some reason can't be relationship). comments = Column(String(1024)) #: Comments about reagents procedure = relationship("Procedure", @@ -2135,6 +1282,7 @@ class ProcedureReagentLotAssociation(BaseClass): Args: procedure (Procedure | str | int | None, optional): Identifier of joined procedure. Defaults to None. + reagentlot (ReagentLot | str | None, optional): Identifier of joined reagent. Defaults to None. reagent (Reagent | str | None, optional): Identifier of joined reagent. Defaults to None. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. @@ -2143,9 +1291,9 @@ class ProcedureReagentLotAssociation(BaseClass): """ query: Query = cls.__database_session__.query(cls) match reagentlot: - case Reagent() | str(): + case ReagentLot() | str(): if isinstance(reagentlot, str): - reagent = ReagentLot.query(lot=reagentlot) + reagentlot = ReagentLot.query(lot=reagentlot) query = query.filter(cls.reagentlot == reagentlot) case _: pass @@ -2155,7 +1303,6 @@ class ProcedureReagentLotAssociation(BaseClass): procedure = Procedure.query(name=procedure) query = query.filter(cls.procedure == procedure) case int(): - # procedure = Procedure.query(id=procedure) query = query.join(Procedure).filter(Procedure.id == procedure) case _: pass @@ -2163,20 +1310,6 @@ class ProcedureReagentLotAssociation(BaseClass): query = query.filter(cls.reagentrole == reagentrole) return cls.execute_query(query=query, limit=limit) - # def to_sub_dict(self, kittype) -> dict: - # """ - # Converts this RunReagentAssociation (and associated Reagent) to dict - # - # Args: - # kittype (_type_): Extraction kittype of interest - # - # Returns: - # dict: This RunReagentAssociation as dict - # """ - # output = self.reagent.to_sub_dict(kittype) - # output['comments'] = self.comments - # return output - def to_pydantic(self): from backend.validators import PydReagent return PydReagent(**self.details_dict()) @@ -2210,9 +1343,6 @@ class EquipmentRole(BaseClass): id = Column(INTEGER, primary_key=True) #: Role id, primary key name = Column(String(32)) #: Common name - # equipment = relationship("Equipment", back_populates='equipmentrole', secondary=equipmentrole_equipment) - # process = relationship("Process", back_populates='equipmentrole', - # secondary=equipmentrole_process) #: Associated Processes equipmentroleproceduretypeassociation = relationship( "ProcedureTypeEquipmentRoleAssociation", @@ -2230,7 +1360,7 @@ class EquipmentRole(BaseClass): ) equipment = association_proxy("equipmentroleequipmentassociation", - "equipmentrole", creator=lambda equipment: EquipmentRoleEquipmentAssociation( + "equipmentrole", creator=lambda equipment: EquipmentRoleEquipmentAssociation( equipment=equipment)) def to_dict(self) -> dict: @@ -2321,13 +1451,9 @@ class EquipmentRole(BaseClass): """ if isinstance(proceduretype, str): proceduretype = SubmissionType.query(name=proceduretype) - # if isinstance(kittype, str): - # kittype = KitType.query(name=kittype) for process in self.process: if proceduretype and proceduretype not in process.proceduretype: continue - # if kittype and kittype not in process.kittype: - # continue yield process.name def to_omni(self, expand: bool = False) -> "OmniEquipmentRole": @@ -2347,44 +1473,21 @@ class EquipmentRole(BaseClass): case _: proceduretype = None output = super().details_dict(**kwargs) - # sys.exit(f"Equipment: {pformat(output)}") - # Note con - # output['equipment'] = [item.details_dict() for item in output['equipment']] - # output['equipment_json'] = [] output['equipment'] = [item.details_dict()['equipment'] for item in self.equipmentroleequipmentassociation] equip = [] for eq in output['equipment']: - # logger.debug(eq) dicto = dict(name=eq['name'], asset_number=eq['asset_number']) - dicto['process'] = [ {'name': process['name'], 'tips': process['tips']} for process in eq['process'] - # if proceduretype in process.proceduretype - # for version in process.processversion ] - for process in dicto['process']: - # logger.debug(process['tips']) - try: - process['tips'] = [tr['name'] for tr in process['tips']] - except KeyError: - logger.debug(f"process: {pformat(process)}") - raise KeyError() - # del process['tiprole'] + # try: + process['tips'] = [tr['name'] for tr in process['tips']] + # except KeyError: + # raise KeyError("Problem ") equip.append(dicto) output['equipment'] = equip - # sys.exit(pformat(output['equipment'])) - # output['equipment_json'].append(dict(name=self.name, equipment=equip)) - # output['process'] = [item.details_dict() for item in output['process']] - # output['process'] = [version.details_dict() for version in - # flatten_list([process.processversion for process in self.process])] - # logger.debug(f"\n\nProcess: {pformat(output['process'])}") - # try: - # output['tips'] = [item.details_dict() for item in output['tips']] - # except KeyError: - # # logger.error(pformat(output)) - # pass return output @@ -2397,10 +1500,6 @@ class Equipment(BaseClass, LogMixin): name = Column(String(64)) #: equipment name nickname = Column(String(64)) #: equipment nickname asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will) - # equipmentrole = relationship(EquipmentRole, back_populates="equipment", secondary=equipmentrole_equipment) - - # tips = relationship("Tips", back_populates="equipment", - # secondary=equipment_tips) #: relation to Processes equipmentprocedureassociation = relationship( "ProcedureEquipmentAssociation", @@ -2418,8 +1517,9 @@ class Equipment(BaseClass, LogMixin): ) equipmentrole = association_proxy("equipmentequipmentroleassociation", - "equipmentrole", creator=lambda equipmentrole: EquipmentRoleEquipmentAssociation(equipmentrole=equipmentrole) - ) + "equipmentrole", creator=lambda equipmentrole: EquipmentRoleEquipmentAssociation( + equipmentrole=equipmentrole) + ) def __init__(self, name: str, nickname: str | None = None, asset_number: str = ""): self.name = name @@ -2444,25 +1544,6 @@ class Equipment(BaseClass, LogMixin): else: return {k: v for k, v in self.__dict__.items()} - # def get_processes(self, #proceduretype: str | ProcedureType | None = None, - # # kittype: str | KitType | None = None, - # equipmentrole: str | EquipmentRole | None = None) -> Generator[Process, None, None]: - # """ - # Get all process associated with this Equipment for a given SubmissionType - # - # Args: - # proceduretype (ProcedureType): SubmissionType of interest - # kittype (str | KitType | None, optional): KitType to filter by. Defaults to None. - # - # Returns: - # List[Process]: List of process names - # """ - # # if isinstance(proceduretype, str): - # # proceduretype = ProcedureType.query(name=proceduretype) - # for er in self.equipmentrole: - # for process in - # logger.debug(f"Getting process: {process}") - # yield process @classmethod @setup_lookup @@ -2526,13 +1607,8 @@ class Equipment(BaseClass, LogMixin): from backend.validators.pydant import PydEquipment creation_dict = self.details_dict() processes = self.get_processes(equipmentrole=equipmentrole) - logger.debug(f"Processes: {processes}") creation_dict['process'] = processes - logger.debug(f"EquipmentRole: {equipmentrole}") creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole'] - # return PydEquipment(process=processes, equipmentrole=equipmentrole, - # **self.to_dict(processes=False)) - logger.debug(f"Creating pydequipment with {pformat(creation_dict)}") return PydEquipment(**creation_dict) @classproperty @@ -2581,87 +1657,29 @@ class Equipment(BaseClass, LogMixin): for assoc in self.equipmentequipmentroleassociation: if assoc.equipmentrole.name != equipmentrole: continue - # logger.debug(pformat(assoc.process.details_dict())) output.append(assoc.process.to_pydantic()) return output - # def details_dict(self, **kwargs): - # output = super().details_dict(**kwargs) - # for key in ["proceduretype", "equipmentroleproceduretypeassociation", "equipmentrole"]: - # try: - # del output[key] - # except KeyError: - # pass - # return output - - # def to_sub_dict(self, full_data: bool = False, **kwargs) -> dict: - # """ - # dictionary containing values necessary for gui - # - # Args: - # full_data (bool, optional): Whether to include procedure in data for details. Defaults to False. - # - # Returns: - # dict: representation of the equipment's attributes - # """ - # if self.nickname: - # nickname = self.nickname - # else: - # nickname = self.name - # output = dict( - # name=self.name, - # nickname=nickname, - # asset_number=self.asset_number - # ) - # if full_data: - # subs = [dict(plate=item.procedure.procedure.rsl_plate_number, process=item.process.name, - # sub_date=item.procedure.procedure.start_date) - # if item.process else dict(plate=item.procedure.procedure.rsl_plate_number, process="NA") - # for item in self.equipmentprocedureassociation] - # output['procedure'] = sorted(subs, key=itemgetter("sub_date"), reverse=True) - # output['excluded'] = ['missing', 'procedure', 'excluded', 'editable'] - # return output - - # @classproperty - # def details_template(cls) -> Template: - # """ - # Get the details jinja template for the correct class - # - # Args: - # base_dict (dict): incoming dictionary of Submission fields - # - # Returns: - # Tuple(dict, Template): (Updated dictionary, Template to be rendered) - # """ - # env = jinja_template_loading() - # temp_name = f"{cls.__name__.lower()}_details.html" - # try: - # template = env.get_template(temp_name) - # except TemplateNotFound as e: - # logger.error(f"Couldn't find template {e}") - # template = env.get_template("equipment_details.html") - # return template - class EquipmentRoleEquipmentAssociation(BaseClass): - equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated reagent equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated procedure process_id = Column(INTEGER, ForeignKey("_process.id")) equipmentrole = relationship("EquipmentRole", - back_populates="equipmentroleequipmentassociation") #: associated procedure + back_populates="equipmentroleequipmentassociation") #: associated procedure equipment = relationship("Equipment", - back_populates="equipmentequipmentroleassociation") #: associated procedure + back_populates="equipmentequipmentroleassociation") #: associated procedure process = relationship("Process", - back_populates="equipmentroleeequipmentassociation") #: associated procedure + back_populates="equipmentroleeequipmentassociation") #: associated procedure def details_dict(self, **kwargs) -> dict: output = super().details_dict(**kwargs) output['equipment'] = self.equipment.details_dict() - output['equipment']['process'] = [item.details_dict() for item in self.process.processversion if bool(item.active)] + output['equipment']['process'] = [item.details_dict() for item in self.process.processversion if + bool(item.active)] return output @@ -2670,21 +1688,10 @@ class Process(BaseClass): A Process is a method used by a piece of equipment. """ - - id = Column(INTEGER, primary_key=True) #: Process id, primary key name = Column(String(64), unique=True) #: Process name - # proceduretype = relationship("ProcedureType", back_populates='process', - # secondary=proceduretype_process) #: relation to SubmissionType - # equipment = relationship("Equipment", back_populates='process', - # secondary=equipment_process) #: relation to Equipment - # equipment = relationship("EquipmentRole", back_populates='process', - # secondary=equipmentrole_process) #: relation to EquipmentRoles - - # kittype = relationship("KitType", back_populates='process', - # secondary=kittype_process) #: relation to KitType tips = relationship("Tips", back_populates='process', - secondary=process_tips) #: relation to KitType + secondary=process_tips) #: relation to KitType processversion = relationship("ProcessVersion", back_populates="process") @@ -2699,20 +1706,6 @@ class Process(BaseClass): if value not in field: field.append(value) - # @classmethod - # def query_or_create(cls, **kwargs) -> Tuple[Process, bool]: - # new = False - # disallowed = ['expiry'] - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(**sanitized_kwargs) - # if not instance or isinstance(instance, list): - # instance = cls() - # new = True - # for k, v in sanitized_kwargs.items(): - # setattr(instance, k, v) - # logger.info(f"Instance from query or create: {instance}") - # return instance, new - @classmethod @setup_lookup def query(cls, @@ -2791,40 +1784,57 @@ class Process(BaseClass): def details_dict(self, **kwargs): output = super().details_dict(**kwargs) output['processversion'] = [item.details_dict() for item in self.processversion] - # logger.debug(f"Process output dict: {pformat(output)}") tips = flatten_list([tipslot for tipslot in [tips.tipslot for tips in self.tips]]) output['tips'] = [tipslot.details_dict() for tipslot in tips] return output def to_pydantic(self): output = super().to_pydantic() - # output.tips = [[tipslot.to_pydantic() for tipslot in tips] for tips in self.tips] return output class ProcessVersion(BaseClass): id = Column(INTEGER, primary_key=True) #: Process id, primary key - version = Column(FLOAT(2), default=1.00) - date_verified = Column(TIMESTAMP) - project = Column(String(128)) - active = Column(INTEGER, default=1) + version = Column(FLOAT(2), default=1.00) #: Version number + date_verified = Column(TIMESTAMP) #: Date this version was deemed worthy + project = Column(String(128)) #: Name of the project this belonds to. + active = Column(INTEGER, default=1) #: Is this version in use? process = relationship("Process", back_populates="processversion") process_id = Column(INTEGER, ForeignKey("_process.id", ondelete="SET NULL", name="fk_version_process_id")) procedureequipmentassociation = relationship("ProcedureEquipmentAssociation", - back_populates ='processversion') #: relation to RunEquipmentAssociation + back_populates='processversion') #: relation to RunEquipmentAssociation @property def name(self) -> str: return f"{self.process.name}-v{str(self.version)}" + @validates('active') + def validate_active(self, key, value): + """ + Ensures only 1 & 0 used in 'active' + + Args: + key (str): name of attribute + value (_type_): value of attribute + + Raises: + ValueError: Raised if bad value given + + Returns: + _type_: value + """ + if not 0 <= value < 2: + raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') + return value + def details_dict(self, **kwargs): output = super().details_dict(**kwargs) output['name'] = self.name if not output['project']: output['project'] = "" - output['tips'] = flatten_list([[lot.details_dict() for lot in tips.tipslot if bool(lot.active)] for tips in self.process.tips]) - # logger.debug(f"Tips for {self.name} -\n\n {pformat(output['tips'])}") + output['tips'] = flatten_list( + [[lot.details_dict() for lot in tips.tipslot if bool(lot.active)] for tips in self.process.tips]) return output def set_attribute(self, key, value): @@ -2855,42 +1865,16 @@ class Tips(BaseClass): An abstract reagentrole that a tip fills during a process """ id = Column(INTEGER, primary_key=True) #: primary key - # name = Column(String(64)) #: name of reagent type - tipslot = relationship("TipsLot", back_populates="tips") #: concrete control of this reagent type - manufacturer = Column(String(64)) - capacity = Column(INTEGER) + tipslot = relationship("TipsLot", back_populates="tips") #: concrete instance of this tip type + manufacturer = Column(String(64)) #: Name of manufacturer + capacity = Column(INTEGER) #: How many uL the tip can hold. ref = Column(String(64)) #: tip reference number - process = relationship("Process", back_populates="tips", secondary=process_tips) + process = relationship("Process", back_populates="tips", secondary=process_tips) #: Associated process - - @property + @hybrid_property def name(self): return f"{self.manufacturer}-{self.ref}" - # tiproleproceduretypeassociation = relationship( - # "ProcedureTypeTipRoleAssociation", - # back_populates="tiprole", - # cascade="all, delete-orphan" - # ) #: associated procedure - # - # proceduretype = association_proxy("tiproleproceduretypeassociation", "proceduretype", - # creator=lambda proceduretype: ProcedureTypeTipRoleAssociation( - # proceduretype=proceduretype)) - - # @classmethod - # def query_or_create(cls, **kwargs) -> Tuple[TipRole, bool]: - # new = False - # disallowed = ['expiry'] - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(**sanitized_kwargs) - # if not instance or isinstance(instance, list): - # instance = cls() - # new = True - # for k, v in sanitized_kwargs.items(): - # setattr(instance, k, v) - # logger.info(f"Instance from query or create: {instance}") - # return instance, new - @classmethod @setup_lookup def query(cls, @@ -2922,32 +1906,37 @@ class Tips(BaseClass): tips=tips ) - # def details_dict(self, **kwargs) -> dict: - # output = super().details_dict(**kwargs) - # return output class TipsLot(BaseClass, LogMixin): """ A concrete instance of tips. """ id = Column(INTEGER, primary_key=True) #: primary key - tips = relationship("Tips", back_populates="tipslot") #: joined parent reagent type + tips = relationship("Tips", back_populates="tipslot") #: joined parent tip type tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete='SET NULL', - name="fk_tips_id")) #: id of parent reagent type - lot = Column(String(64), unique=True) - expiry = Column(TIMESTAMP) - active = Column(INTEGER, default=1) + name="fk_tips_id")) #: id of parent tip type + lot = Column(String(64), unique=True) #: lot number + expiry = Column(TIMESTAMP) #: date of expiry + active = Column(INTEGER, default=1) #: whether or not these tips are currently in use. - # lot = Column(String(64)) #: lot number of tips - # equipment = relationship("Equipment", back_populates="tips", - # secondary=equipment_tips) #: associated procedure - # tipsprocedureassociation = relationship( - # "ProcedureTipsAssociation", - # back_populates="tips", - # cascade="all, delete-orphan" - # ) #: associated procedure - # - # procedure = association_proxy("tipsprocedureassociation", 'procedure') + @validates('active') + def validate_active(self, key, value): + """ + Ensures only 1 & 0 used in 'active' + + Args: + key (str): name of attribute + value (_type_): value of attribute + + Raises: + ValueError: Raised if bad value given + + Returns: + _type_: value + """ + if not 0 <= value < 2: + raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') + return value @property def size(self) -> str: @@ -2957,20 +1946,6 @@ class TipsLot(BaseClass, LogMixin): def name(self) -> str: return f"{self.tips.manufacturer}-{self.tips.capacity}-{self.lot}" - # @classmethod - # def query_or_create(cls, **kwargs) -> Tuple[Tips, bool]: - # new = False - # disallowed = ['expiry'] - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(**sanitized_kwargs) - # if not instance or isinstance(instance, list): - # instance = cls() - # new = True - # for k, v in sanitized_kwargs.items(): - # setattr(instance, k, v) - # logger.info(f"Instance from query or create: {instance}") - # return instance, new - @classmethod def query(cls, name: str | None = None, lot: str | None = None, limit: int = 0, **kwargs) -> Tips | List[Tips]: """ @@ -3032,35 +2007,12 @@ class TipsLot(BaseClass, LogMixin): output['excluded'] = ['missing', 'procedure', 'excluded', 'editable'] return output - # @classproperty - # def details_template(cls) -> Template: - # """ - # Get the details jinja template for the correct class - # - # Args: - # base_dict (dict): incoming dictionary of Submission fields - # - # Returns: - # Tuple(dict, Template): (Updated dictionary, Template to be rendered) - # """ - # env = jinja_template_loading() - # temp_name = f"{cls.__name__.lower()}_details.html" - # try: - # template = env.get_template(temp_name) - # except TemplateNotFound as e: - # logger.error(f"Couldn't find template {e}") - # template = env.get_template("tips_details.html") - # return template - - # def to_pydantic(self, **kwargs): - # output = super().to_pydantic() - # return output - def details_dict(self, **kwargs) -> dict: output = super().details_dict() output['name'] = self.name return output + class ProcedureEquipmentAssociation(BaseClass): """ Abstract association between BasicRun and Equipment @@ -3068,7 +2020,7 @@ class ProcedureEquipmentAssociation(BaseClass): equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated equipment procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure - equipmentrole = Column(String(64), primary_key=True) #: name of the reagentrole the equipment fills + equipmentrole = Column(String(64), primary_key=True) #: name of the role the equipment fills processversion_id = Column(INTEGER, ForeignKey("_processversion.id", ondelete="SET NULL", name="SEA_Process_id")) #: Foreign key of process id start_time = Column(TIMESTAMP) #: start time of equipment use @@ -3080,10 +2032,10 @@ class ProcedureEquipmentAssociation(BaseClass): equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment - processversion = relationship(ProcessVersion, back_populates="procedureequipmentassociation") + processversion = relationship(ProcessVersion, back_populates="procedureequipmentassociation") #: Associated process version tipslot_id = Column(INTEGER, ForeignKey("_tipslot.id", ondelete="SET NULL", - name="SEA_Tipslot_id")) + name="SEA_Tipslot_id")) tipslot = relationship(TipsLot) @@ -3161,6 +2113,18 @@ class ProcedureEquipmentAssociation(BaseClass): equipmentrole: str | None = None, limit: int = 0, **kwargs) \ -> Any | List[Any]: + """ + + Args: + equipment ( int | Equipment | None, optional): The associated equipment of interest. Defaults to None. + procedure ( int | Procedure | None, optional): The associated procedure of interest. Defaults to None. + equipmentrole ( str | None, optional): The associated equipmentrole. Defaults to None. + limit ( int ): Maximum number of results to return (0=all). Defaults to 0. + **kwargs (): + + Returns: + Any | List[Any] + """ query: Query = cls.__database_session__.query(cls) match equipment: case int(): @@ -3204,7 +2168,8 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated procedure uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. - static = Column(INTEGER, default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? + static = Column(INTEGER, + default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? proceduretype = relationship(ProcedureType, back_populates="proceduretypeequipmentroleassociation", foreign_keys=[proceduretype_id]) #: associated procedure @@ -3212,9 +2177,6 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): back_populates="equipmentroleproceduretypeassociation", foreign_keys=[equipmentrole_id]) #: associated equipment - # equipment = relationship("Equipment", back_populates="equipmentroleproceduretypeassociation", - # secondary=proceduretypeequipmentroleassociation_equipment) #: Concrete control (Equipment) of reagentrole - @validates('static') def validate_static(self, key, value): """ @@ -3239,100 +2201,10 @@ class ProcedureTypeEquipmentRoleAssociation(BaseClass): super().save() -# class ProcedureTypeTipRoleAssociation(BaseClass): -# """ -# Abstract association between SubmissionType and TipRole -# """ -# tiprole_id = Column(INTEGER, ForeignKey("_tiprole.id"), primary_key=True) #: id of associated equipment -# proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), -# primary_key=True) #: id of associated procedure -# uses = Column(JSON) #: locations of equipment on the procedure type excel sheet. -# static = Column(INTEGER, -# default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? -# proceduretype = relationship(ProcedureType, -# back_populates="proceduretypetiproleassociation") #: associated procedure -# tiprole = relationship(TipRole, -# back_populates="tiproleproceduretypeassociation") #: associated equipment -# -# @check_authorization -# def save(self): -# super().save() -# -# def to_omni(self): -# pass - - -# class ProcedureTipsAssociation(BaseClass): -# """ -# Association between a concrete procedure instance and concrete tips -# """ -# tips_id = Column(INTEGER, ForeignKey("_tips.id"), primary_key=True) #: id of associated equipment -# procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure -# procedure = relationship("Procedure", -# back_populates="proceduretipsassociation") #: associated procedure -# tips = relationship(Tips, -# back_populates="tipsprocedureassociation") #: associated equipment -# tiprole = Column(String(32), primary_key=True) #, ForeignKey("_tiprole.name")) -# -# def to_sub_dict(self) -> dict: -# """ -# This item as a dictionary -# -# Returns: -# dict: Values of this object -# """ -# return dict(role=self.role_name, name=self.tips.name, lot=self.tips.lot) -# -# @classmethod -# @setup_lookup -# def query(cls, tips: int | Tips, tiprole: str, procedure: int | Procedure | None = None, limit: int = 0, **kwargs) \ -# -> Any | List[Any]: -# query: Query = cls.__database_session__.query(cls) -# match tips: -# case int(): -# query = query.filter(cls.tips_id == tips) -# case Tips(): -# query = query.filter(cls.tips == tips) -# case _: -# pass -# match procedure: -# case int(): -# query = query.filter(cls.procedure_id == procedure) -# case Procedure(): -# query = query.filter(cls.procedure == procedure) -# case _: -# pass -# query = query.filter(cls.tiprole == tiprole) -# return cls.execute_query(query=query, limit=limit, **kwargs) -# -# # TODO: fold this into the BaseClass.query_or_create ? -# # @classmethod -# # def query_or_create(cls, tips, procedure, role: str, **kwargs): -# # kwargs['limit'] = 1 -# # instance = cls.query(tips_id=tips.id, role_name=role, procedure_id=procedure.id, **kwargs) -# # if instance is None: -# # instance = cls(procedure=procedure, tips=tips, role_name=role) -# # return instance -# -# def to_pydantic(self): -# from backend.validators import PydTips -# return PydTips(name=self.tips.name, lot=self.tips.lot, role=self.role_name) -# -# def details_dict(self, **kwargs): -# output = super().details_dict() -# # NOTE: Figure out how to merge the misc_info if doing .update instead. -# relevant = {k: v for k, v in output.items() if k not in ['tips']} -# output = output['tips'].details_dict() -# misc = output['misc_info'] -# output.update(relevant) -# output['misc_info'] = misc -# return output - - class Results(BaseClass): - id = Column(INTEGER, primary_key=True) - result_type = Column(String(32)) - result = Column(JSON) + id = Column(INTEGER, primary_key=True) #: primary key + result_type = Column(String(32)) #: Name of the type of this result. + result = Column(JSON) #: date_analyzed = Column(TIMESTAMP) procedure_id = Column(INTEGER, ForeignKey("_procedure.id", ondelete='SET NULL', name="fk_RES_procedure_id")) @@ -3356,7 +2228,6 @@ class Results(BaseClass): assert dir.exists() except AssertionError: return None - logger.debug(f"Getting image from {self.__directory_path__}") with zipfile.ZipFile(dir) as zf: with zf.open(self._img) as f: return f.read() diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 6fc0dea..418eb6b 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -2,43 +2,26 @@ Models for the main procedure and sample types. """ from __future__ import annotations - -import itertools -import pickle -from copy import deepcopy from getpass import getuser -import logging, uuid, tempfile, re, base64, numpy as np, pandas as pd, types, sys +import logging, tempfile, re, numpy as np, pandas as pd, types, sys, itertools from inspect import isclass -from io import BytesIO -from zipfile import ZipFile, BadZipfile -from tempfile import TemporaryDirectory, TemporaryFile +from zipfile import BadZipfile from operator import itemgetter from pprint import pformat - -import openpyxl from pandas import DataFrame from sqlalchemy.ext.hybrid import hybrid_property - from frontend.widgets.functions import select_save_file -from . import Base, BaseClass, Reagent, SubmissionType, ClientLab, Contact, LogMixin, Procedure -from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence -from sqlalchemy.orm import relationship, validates, Query +from . import Base, BaseClass, SubmissionType, ClientLab, Contact, LogMixin, Procedure +from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func +from sqlalchemy.orm import relationship, Query from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError, \ - ArgumentError +from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError -from openpyxl import Workbook -from openpyxl.drawing.image import Image as OpenpyxlImage -from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \ - report_result, create_holidays_for_year, check_dictionary_inclusion_equality, is_power_user +from tools import setup_lookup, jinja_template_loading, create_holidays_for_year, check_dictionary_inclusion_equality, is_power_user from datetime import datetime, date -from typing import List, Any, Tuple, Literal, Generator, Type, TYPE_CHECKING +from typing import List, Literal, Generator, TYPE_CHECKING from pathlib import Path -from jinja2.exceptions import TemplateNotFound -from jinja2 import Template -from PIL import Image - if TYPE_CHECKING: from backend.db.models.procedures import ProcedureType, Procedure @@ -51,21 +34,21 @@ class ClientSubmission(BaseClass, LogMixin): """ id = Column(INTEGER, primary_key=True) #: primary key - submitter_plate_id = Column(String(127), unique=True) #: The number given to the procedure by the submitting lab - submitted_date = Column(TIMESTAMP) #: Date procedure received + submitter_plate_id = Column(String(127), unique=True) #: The number given to the submission by the submitting lab + submitted_date = Column(TIMESTAMP) #: Date submission received clientlab = relationship("ClientLab", back_populates="clientsubmission") #: client org clientlab_id = Column(INTEGER, ForeignKey("_clientlab.id", ondelete="SET NULL", name="fk_BS_sublab_id")) #: client lab id from _organizations - submission_category = Column(String(64)) + submission_category = Column(String(64)) #: i.e. Surveillance sample_count = Column(INTEGER) #: Number of sample in the procedure full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate. - comment = Column(JSON) + comment = Column(JSON) #: comment objects from users. run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship - contact = relationship("Contact", back_populates="clientsubmission") #: client org + contact = relationship("Contact", back_populates="clientsubmission") #: contact representing submitting lab. contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL", - name="fk_BS_contact_id")) #: client lab id from _organizations + name="fk_BS_contact_id")) #: contact id from _organizations submissiontype_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", - name="fk_BS_subtype_name")) #: name of joined procedure type + name="fk_BS_subtype_name")) #: name of joined submission type submissiontype = relationship("SubmissionType", back_populates="clientsubmission") #: archetype of this procedure cost_centre = Column( String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. @@ -93,7 +76,7 @@ class ClientSubmission(BaseClass, LogMixin): @setup_lookup def query(cls, submissiontype: str | SubmissionType | None = None, - submissiontype_name: str | None = None, + # submissiontype_name: str | None = None, id: int | str | None = None, submitter_plate_id: str | None = None, start_date: date | datetime | str | int | None = None, @@ -108,7 +91,7 @@ class ClientSubmission(BaseClass, LogMixin): Lookup procedure based on a number of parameters. Overrides parent. Args: - submission_type (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None. + submissiontype (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None. id (int | str | None, optional): Submission id in the database (limits results to 1). Defaults to None. rsl_plate_number (str | None, optional): Submission name in the database (limits results to 1). Defaults to None. start_date (date | str | int | None, optional): Beginning date to search by. Defaults to None. @@ -142,9 +125,11 @@ class ClientSubmission(BaseClass, LogMixin): limit = 1 case _: pass - match submissiontype_name: + match submissiontype: + case SubmissionType(): + query = query.filter(cls.submissiontype == submissiontype) case str(): - query = query.filter(cls.submissiontype_name == submissiontype_name) + query = query.filter(cls.submissiontype_name == submissiontype) case _: pass # NOTE: by id (returns only a single value) @@ -157,7 +142,6 @@ class ClientSubmission(BaseClass, LogMixin): limit = 1 case _: pass - # query = query.order_by(cls.submitted_date.desc()) # NOTE: Split query results into pages of size {page_size} if page_size > 0 and limit == 0: limit = page_size @@ -249,11 +233,8 @@ class ClientSubmission(BaseClass, LogMixin): if report: return output if full_data: - # dicto, _ = self.kittype.construct_xl_map_for_use(self.proceduretype) - # sample = self.generate_associations(name="clientsubmissionsampleassociation") samples = None runs = [item.to_dict(full_data=True) for item in self.run] - # custom = self.custom else: samples = None custom = None @@ -280,7 +261,6 @@ class ClientSubmission(BaseClass, LogMixin): output["comment"] = comments output["contact"] = contact output["contact_phone"] = contact_phone - # output["custom"] = custom output["run"] = runs output['name'] = self.name return output @@ -291,7 +271,6 @@ class ClientSubmission(BaseClass, LogMixin): except AssertionError: logger.warning(f"Converting {sample} to sql.") sample = sample.to_sql() - # logger.debug(sample.__dict__) try: row = sample._misc_info['row'] except (KeyError, AttributeError): @@ -300,7 +279,6 @@ class ClientSubmission(BaseClass, LogMixin): column = sample._misc_info['column'] except KeyError: column = 0 - # logger.debug(f"Sample: {sample}") submission_rank = sample._misc_info['submission_rank'] if sample in self.sample: return @@ -311,7 +289,6 @@ class ClientSubmission(BaseClass, LogMixin): row=row, column=column ) - # assoc.save() return assoc @property @@ -333,13 +310,10 @@ class ClientSubmission(BaseClass, LogMixin): if checker.exec(): run = Run(clientsubmission=self, rsl_plate_number=checker.rsl_plate_number) active_samples = [sample for sample in samples if sample.enabled] - logger.debug(active_samples) for sample in active_samples: sample = sample.to_sql() - logger.debug(f"Sample: {sample.id}") if sample not in run.sample: assoc = run.add_sample(sample) - # assoc.save() run.save() else: logger.warning("Run cancelled.") @@ -351,13 +325,6 @@ class ClientSubmission(BaseClass, LogMixin): def add_comment(self, obj): logger.debug("Add Comment") - # def show_details(self, obj): - # logger.debug("Show Details") - # from frontend.widgets.submission_details import SubmissionDetails - # dlg = SubmissionDetails(parent=obj, sub=self) - # if dlg.exec(): - # pass - def details_dict(self, **kwargs): output = super().details_dict(**kwargs) output['clientlab'] = output['clientlab'].details_dict() @@ -377,7 +344,6 @@ class ClientSubmission(BaseClass, LogMixin): def to_pydantic(self, filepath: Path | str | None = None, **kwargs): output = super().to_pydantic(filepath=filepath, **kwargs) - # output.template_file = self.template_file return output @@ -389,18 +355,16 @@ class Run(BaseClass, LogMixin): id = Column(INTEGER, primary_key=True) #: primary key rsl_plate_number = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012) clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id", ondelete="SET NULL", - name="fk_BS_clientsub_id")) #: client lab id from _organizations) - clientsubmission = relationship("ClientSubmission", back_populates="run") + name="fk_BS_clientsub_id")) #: id of parent clientsubmission + clientsubmission = relationship("ClientSubmission", back_populates="run") #: parent clientsubmission _started_date = Column(TIMESTAMP) #: Date this procedure was started. run_cost = Column( FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kittype costs at time of creation. signed_by = Column(String(32)) #: user name of person who submitted the procedure to the database. comment = Column(JSON) #: user notes - custom = Column(JSON) - - _completed_date = Column(TIMESTAMP) - - procedure = relationship("Procedure", back_populates="run", uselist=True) + custom = Column(JSON) #: unknown + _completed_date = Column(TIMESTAMP) #: Date this procedure was finished. + procedure = relationship("Procedure", back_populates="run", uselist=True) #: children procedures runsampleassociation = relationship( "RunSampleAssociation", @@ -412,20 +376,6 @@ class Run(BaseClass, LogMixin): "sample", creator=lambda sample: RunSampleAssociation( sample=sample)) #: Association proxy to ClientSubmissionSampleAssociation.sample - # NOTE: Allows for subclassing into ex. BacterialCulture, Wastewater, etc. - # __mapper_args__ = { - # "polymorphic_identity": "Basic Submission", - # "polymorphic_on": case( - # - # (submissiontype_name == "Wastewater", "Wastewater"), - # (submissiontype_name == "Wastewater Artic", "Wastewater Artic"), - # (submissiontype_name == "Bacterial Culture", "Bacterial Culture"), - # - # else_="Basic Submission" - # ), - # "with_polymorphic": "*", - # } - def __repr__(self) -> str: return f"" @@ -556,7 +506,6 @@ class Run(BaseClass, LogMixin): case SubmissionType(): return submissiontype case _: - # return SubmissionType.query(cls.__mapper_args__['polymorphic_identity']) return None @classmethod @@ -712,14 +661,12 @@ class Run(BaseClass, LogMixin): query_out = [] for sub_type in submissiontype: subs = cls.query(page_size=0, start_date=start_date, end_date=end_date, submissiontype=sub_type) - # logger.debug(f"Sub results: {run}") query_out.append(subs) query_out = list(itertools.chain.from_iterable(query_out)) else: query_out = cls.query(page_size=0, start_date=start_date, end_date=end_date) records = [] for sub in query_out: - # output = sub.to_dict(full_data=True) output = sub.details_dict() for k, v in output.items(): if isinstance(v, types.GeneratorType): @@ -746,29 +693,6 @@ class Run(BaseClass, LogMixin): Calculates cost of the plate """ # NOTE: Calculate number of columns based on largest column number - # try: - # cols_count_96 = self.column_count - # except Exception as e: - # logger.error(f"Column count error: {e}") - # # NOTE: Get kittype associated with this procedure - # # logger.debug(f"Checking associations with procedure type: {self.submissiontype_name}") - # assoc = next((item for item in self.kittype.kit_submissiontype_associations if - # item.proceduretype == self.submission_type), - # None) - # # logger.debug(f"Got association: {assoc}") - # # NOTE: If every individual cost is 0 this is probably an old plate. - # if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]): - # try: - # self.run_cost = self.kittype.cost_per_run - # except Exception as e: - # logger.error(f"Calculation error: {e}") - # else: - # try: - # self.run_cost = assoc.constant_cost + (assoc.mutable_cost_column * cols_count_96) + ( - # assoc.mutable_cost_sample * int(self.sample_count)) - # except Exception as e: - # logger.error(f"Calculation error: {e}") - # self.run_cost = round(self.run_cost, 2) pass @property @@ -802,7 +726,6 @@ class Run(BaseClass, LogMixin): """ rows = range(1, plate_rows + 1) columns = range(1, plate_columns + 1) - # logger.debug(f"sample list for plate map: {pformat(sample_list)}") # NOTE: An overly complicated list comprehension create a list of sample locations # NOTE: next will return a blank cell if no value found for row/column output_samples = [next((item for item in sample_list if item['row'] == row and item['column'] == column), @@ -841,7 +764,6 @@ class Run(BaseClass, LogMixin): pd.DataFrame: Pandas Dataframe of all relevant procedure """ # NOTE: use lookup function to create list of dicts - # subs = [item.to_dict() for item in subs = [item.details_dict() for item in cls.query(submissiontype=submission_type, limit=limit, chronologic=chronologic, page=page, page_size=page_size)] @@ -872,8 +794,6 @@ class Run(BaseClass, LogMixin): value (_type_): value of attribute """ match key: - # case "kittype": - # field_value = KitType.query(name=value) case "clientlab": field_value = ClientLab.query(name=value) case "contact": @@ -900,13 +820,11 @@ class Run(BaseClass, LogMixin): existing = value case _: existing = self.__getattribute__(key) - logger.debug(f"Existing value is {pformat(existing)}") if value in ['', 'null', None]: logger.error(f"No value given, not setting.") return if existing is None: existing = [] - # if value in existing: if check_dictionary_inclusion_equality(existing, value): logger.warning("Value already exists. Preventing duplicate addition.") return @@ -955,17 +873,6 @@ class Run(BaseClass, LogMixin): pass return assoc - # def update_reagentassoc(self, reagent: Reagent, role: str): - # # NOTE: get the first reagent assoc that fills the given reagentrole. - # try: - # assoc = next(item for item in self.submission_reagent_associations if - # item.reagent and role in [role.name for role in item.reagent.equipmentrole]) - # assoc.reagent = reagent - # except StopIteration as e: - # logger.error(f"Association for {role} not found, creating new association.") - # assoc = ProcedureReagentAssociation(procedure=self, reagent=reagent) - # self.submission_reagent_associations.append(assoc) - def to_pydantic(self, backup: bool = False) -> "PydSubmission": """ Converts this instance into a PydSubmission @@ -1028,7 +935,6 @@ class Run(BaseClass, LogMixin): Returns: str: String from which regex will be compiled. """ - # logger.debug(f"Class for regex: {cls}") try: regex = cls.get_submission_type(submission_type).defaults['regex'] except AttributeError as e: @@ -1038,7 +944,6 @@ class Run(BaseClass, LogMixin): regex = re.compile(rf"{regex}", flags=re.IGNORECASE | re.VERBOSE) except re.error as e: regex = cls.construct_regex() - # logger.debug(f"Returning regex: {regex}") return regex # NOTE: Polymorphic functions @@ -1089,15 +994,6 @@ class Run(BaseClass, LogMixin): Returns: models.Run | List[models.Run]: Run(s) of interest """ - # from ... import RunReagentAssociation - # NOTE: if you go back to using 'model' change the appropriate cls to model in the query filters - # if submissiontype is not None: - # model = cls.find_polymorphic_subclass(polymorphic_identity=submissiontype) - # elif len(kwargs) > 0: - # # NOTE: find the subclass containing the relevant attributes - # model = cls.find_polymorphic_subclass(attrs=kwargs) - # else: - # model = cls query: Query = cls.__database_session__.query(cls) if start_date is not None and end_date is None: logger.warning(f"Start date with no end date, using today.") @@ -1107,38 +1003,8 @@ class Run(BaseClass, LogMixin): start_date = cls.__database_session__.query(cls, func.min(cls.submitted_date)).first()[1] logger.warning(f"End date with no start date, using first procedure date: {start_date}") if start_date is not None: - # match start_date: - # case date(): - # pass - # case datetime(): - # start_date = start_date.date() - # case int(): - # start_date = datetime.fromordinal( - # datetime(1900, 1, 1).toordinal() + start_date - 2).date() - # case _: - # start_date = parse(start_date).date() - # # start_date = start_date.strftime("%Y-%m-%d") - # match end_date: - # case date(): - # pass - # case datetime(): - # end_date = end_date # + timedelta(days=1) - # # pass - # case int(): - # end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date() # \ - # # + timedelta(days=1) - # case _: - # end_date = parse(end_date).date() # + timedelta(days=1) - # # end_date = end_date.strftime("%Y-%m-%d") - # start_date = datetime.combine(start_date, datetime.min.time()).strftime("%Y-%m-%d %H:%M:%S.%f") - # end_date = datetime.combine(end_date, datetime.max.time()).strftime("%Y-%m-%d %H:%M:%S.%f") - # # if start_date == end_date: - # # start_date = start_date.strftime("%Y-%m-%d %H:%M:%S.%f") - # # query = query.filter(model.submitted_date == start_date) - # # else: start_date = cls.rectify_query_date(start_date) end_date = cls.rectify_query_date(end_date, eod=True) - logger.debug(f"Start date: {start_date}, end date: {end_date}") query = query.join(ClientSubmission).filter(ClientSubmission.submitted_date.between(start_date, end_date)) # NOTE: by rsl number (returns only a single value) match name: @@ -1164,7 +1030,6 @@ class Run(BaseClass, LogMixin): limit = 1 case _: pass - # query = query.order_by(cls.submitted_date.desc()) # NOTE: Split query results into pages of size {page_size} if page_size > 0: query = query.limit(page_size) @@ -1173,58 +1038,6 @@ class Run(BaseClass, LogMixin): query = query.offset(page * page_size) return cls.execute_query(query=query, limit=limit, **kwargs) - # @classmethod - # def query_or_create(cls, submissiontype: str | SubmissionType | None = None, **kwargs) -> Run: - # """ - # Returns object from db if exists, else, creates new. Due to need for user input, doesn't see much use ATM. - # - # Args: - # submissiontype (str | SubmissionType | None, optional): Submission type to be created. Defaults to None. - # - # Raises: - # ValueError: Raised if no kwargs passed. - # ValueError: Raised if disallowed key is passed. - # - # Returns: - # cls: A Run subclass instance. - # """ - # code = 0 - # msg = "" - # report = Report() - # disallowed = ["id"] - # if kwargs == {}: - # raise ValueError("Need to narrow down query or the first available instance will be returned.") - # sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed} - # instance = cls.query(submissiontype=submissiontype, limit=1, **sanitized_kwargs) - # if instance is None: - # used_class = cls.find_polymorphic_subclass(attrs=kwargs, polymorphic_identity=submissiontype) - # instance = used_class(**sanitized_kwargs) - # match submissiontype: - # case str(): - # submissiontype = SubmissionType.query(name=submissiontype) - # case _: - # pass - # instance.proceduretype = submissiontype - # instance.submissiontype_name = submissiontype.name - # if "submitted_date" not in kwargs.keys(): - # instance.submitted_date = date.today() - # else: - # from frontend.widgets.pop_ups import QuestionAsker - # logger.warning(f"Found existing instance: {instance}, asking to overwrite.") - # # code = 1 - # # msg = "This procedure already exists.\nWould you like to overwrite?" - # # report.add_result(Result(msg=msg, code=code)) - # dlg = QuestionAsker(title="Overwrite?", - # message="This procedure already exists.\nWould you like to overwrite?") - # if dlg.exec(): - # pass - # else: - # code = 1 - # msg = "This procedure already exists.\nWould you like to overwrite?" - # report.add_result(Result(msg=msg, code=code)) - # return None, report - # return instance, report - # NOTE: Custom context events for the ui @property @@ -1237,18 +1050,15 @@ class Run(BaseClass, LogMixin): """ names = ["Add Procedure", "Edit", "Export", "Add Comment", "Show Details", "Delete"] output = {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names} - logger.debug(output) return output def add_procedure(self, obj, proceduretype_name: str): from frontend.widgets.procedure_creation import ProcedureCreation procedure_type: ProcedureType = next( (proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name)) - logger.debug(f"Got ProcedureType: {procedure_type}") dlg = ProcedureCreation(parent=obj, procedure=procedure_type.construct_dummy_procedure(run=self)) if dlg.exec(): sql, _ = dlg.return_sql(new=True) - # sys.exit(pformat(sql.__dict__)) sql.save() obj.set_data() @@ -1282,18 +1092,6 @@ class Run(BaseClass, LogMixin): except AttributeError: logger.error("App will not refresh data at this time.") - # def show_details(self, obj): - # """ - # Creates Widget for showing procedure details. - # - # Args: - # obj (Widget): Parent widget - # """ - # from frontend.widgets.submission_details import SubmissionDetails - # dlg = SubmissionDetails(parent=obj, sub=self) - # if dlg.exec(): - # pass - def edit(self, obj): """ Return procedure to form widget for updating @@ -1315,7 +1113,6 @@ class Run(BaseClass, LogMixin): Args: obj (_type_): parent widget """ - logger.debug(obj) from frontend.widgets.submission_details import SubmissionComment dlg = SubmissionComment(parent=obj, submission=self) if dlg.exec(): @@ -1437,8 +1234,6 @@ class Run(BaseClass, LogMixin): unranked_samples.append(sample) possible_ranks = (item for item in list(plate_dict.keys()) if item not in [sample['submission_rank'] for sample in ranked_samples]) - # logger.debug(possible_ranks) - # possible_ranks = (plate_dict[idx] for idx in possible_ranks) for sample in unranked_samples: try: submission_rank = next(possible_ranks) @@ -1457,17 +1252,9 @@ class Run(BaseClass, LogMixin): background_color="#ffffff", enabled=False) ) padded_list.append(sample) - # logger.debug(f"Final padded list:\n{pformat(list(sorted(padded_list, key=itemgetter('submission_rank'))))}") return list(sorted(padded_list, key=itemgetter('submission_rank'))) -# class SampleType(BaseClass): -# id = Column(INTEGER, primary_key=True) #: primary key -# name = Column(String(64), nullable=False, unique=True) #: identification from submitter -# -# sample = relationship("Sample", back_populates="sampletype", uselist=True) - - # NOTE: Sample Classes class Sample(BaseClass, LogMixin): @@ -1477,11 +1264,7 @@ class Sample(BaseClass, LogMixin): id = Column(INTEGER, primary_key=True) #: primary key sample_id = Column(String(64), nullable=False, unique=True) #: identification from submitter - # sampletype_id = Column(INTEGER, ForeignKey("_sampletype.id", ondelete="SET NULL", - # name="fk_SAMP_sampletype_id")) - # sampletype = relationship("SampleType", back_populates="sample") - # misc_info = Column(JSON) - control = relationship("Control", back_populates="sample", uselist=False) + control = relationship("Control", back_populates="sample", uselist=False) #: Control function this sample fills. sampleclientsubmissionassociation = relationship( "ClientSubmissionSampleAssociation", @@ -1529,13 +1312,8 @@ class Sample(BaseClass, LogMixin): Returns: dict: submitter id and sample type and linked procedure if full data """ - # try: - # sample_type = self.sampletype.name - # except AttributeError: - # sample_type = "NA" sample = dict( sample_id=self.sample_id - # sampletype=sample_type ) if full_data: sample['clientsubmission'] = sorted([item.to_sub_dict() for item in self.sampleclientsubmissionassociation], @@ -1563,7 +1341,6 @@ class Sample(BaseClass, LogMixin): @setup_lookup def query(cls, sample_id: str | None = None, - # sampletype: str | SampleType | None = None, limit: int = 0, **kwargs ) -> Sample | List[Sample]: @@ -1578,13 +1355,6 @@ class Sample(BaseClass, LogMixin): models.Sample|List[models.Sample]: Sample(s) of interest. """ query = cls.__database_session__.query(cls) - # match sampletype: - # case str(): - # query = query.join(SampleType).filter(SampleType.name == sampletype) - # case SampleType(): - # query = query.filter(cls.sampletype == sampletype) - # case _: - # pass match sample_id: case str(): query = query.filter(cls.sample_id == sample_id) @@ -1593,38 +1363,6 @@ class Sample(BaseClass, LogMixin): pass return cls.execute_query(query=query, limit=limit, **kwargs) - # @classmethod - # def fuzzy_search(cls, - # sampletype: str | Sample | None = None, - # **kwargs - # ) -> List[Sample]: - # """ - # Allows for fuzzy search of sample. - # - # Args: - # sampletype (str | BasicSample | None, optional): Type of sample. Defaults to None. - # - # Returns: - # List[Sample]: List of sample that match kwarg search parameters. - # """ - # query: Query = cls.__database_session__.query(cls) - # match sampletype: - # case str(): - # query = query.join(SampleType).filter(SampleType.name == sampletype) - # case SampleType(): - # query = query.filter(cls.sampletype == sampletype) - # case _: - # pass - # for k, v in kwargs.items(): - # search = f"%{v}%" - # try: - # attr = getattr(cls, k) - # # NOTE: the secret sauce is in attr.like - # query = query.filter(attr.like(search)) - # except (ArgumentError, AttributeError) as e: - # logger.error(f"Attribute {k} unavailable due to:\n\t{e}\nSkipping.") - # return query.limit(50).all() - def delete(self): raise AttributeError(f"Delete not implemented for {self.__class__}") @@ -1686,12 +1424,9 @@ class ClientSubmissionSampleAssociation(BaseClass): DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html """ - # id = Column(INTEGER, unique=True, nullable=False, autoincrement=True) #: id to be used for inheriting purposes sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id"), - primary_key=True) #: id of associated procedure - # row = Column(INTEGER) - # column = Column(INTEGER) + primary_key=True) #: id of associated client submission submission_rank = Column(INTEGER, primary_key=True, default=0) #: Location in sample list # NOTE: reference to the Submission object clientsubmission = relationship("ClientSubmission", @@ -1708,10 +1443,6 @@ class ClientSubmissionSampleAssociation(BaseClass): self.row = row self.column = column self.submission_rank = submission_rank - # if id is not None: - # self.id = id - # else: - # self.id = self.__class__.autoincrement_id() for k, v in kwargs.items(): try: self.__setattr__(k, v) @@ -1735,13 +1466,6 @@ class ClientSubmissionSampleAssociation(BaseClass): # NOTE: Get associated sample info sample = self.sample.to_sub_dict() sample['sample_id'] = self.sample.sample_id - # sample['row'] = self.row - # sample['column'] = self.column - # try: - # sample['well'] = f"{row_map[self.row]}{self.column}" - # except (KeyError, AttributeError) as e: - # logger.error(f"Unable to find row {self.row} in row_map.") - # sample['Well'] = None sample['plate_name'] = self.clientsubmission.submitter_plate_id sample['positive'] = False sample['submitted_date'] = self.clientsubmission.submitted_date @@ -1752,10 +1476,8 @@ class ClientSubmissionSampleAssociation(BaseClass): output = super().details_dict() # NOTE: Figure out how to merge the misc_info if doing .update instead. relevant = {k: v for k, v in output.items() if k not in ['sample']} - # logger.debug(f"Relevant info from assoc output: {pformat(relevant)}") output = output['sample'].details_dict() misc = output['misc_info'] - # # logger.debug(f"Output from sample: {pformat(output)}") output.update(relevant) output['misc_info'] = misc return output @@ -1798,48 +1520,6 @@ class ClientSubmissionSampleAssociation(BaseClass): sample.update(dict(Name=self.sample.sample_id[:10], tooltip=tooltip_text, background_color=background)) return sample - # @classmethod - # def autoincrement_id(cls) -> int: - # """ - # Increments the association id automatically - # - # Returns: - # int: incremented id - # """ - # if cls.__name__ == "ClientSubmissionSampleAssociation": - # model = cls - # else: - # model = next((base for base in cls.__bases__ if base.__name__ == "ClientSubmissionSampleAssociation"), - # ClientSubmissionSampleAssociation) - # try: - # return max([item.id for item in model.query()]) + 1 - # except ValueError as e: - # logger.error(f"Problem incrementing id: {e}") - # return 1 - - # @classmethod - # def find_polymorphic_subclass(cls, polymorphic_identity: str | None = None) -> ClientSubmissionSampleAssociation: - # """ - # Retrieves subclasses of ClientSubmissionSampleAssociation based on type name. - # - # Args: - # polymorphic_identity (str | None, optional): Name of subclass fed to polymorphic identity. Defaults to None. - # - # Returns: - # ClientSubmissionSampleAssociation: Subclass of interest. - # """ - # if isinstance(polymorphic_identity, dict): - # polymorphic_identity = polymorphic_identity['value'] - # if polymorphic_identity is None: - # model = cls - # else: - # try: - # model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_ - # except Exception as e: - # logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") - # model = cls - # return model - @classmethod @setup_lookup def query(cls, @@ -1857,12 +1537,14 @@ class ClientSubmissionSampleAssociation(BaseClass): Lookup junction of Submission and Sample in the database Args: - run (models.Run | str | None, optional): Submission of interest. Defaults to None. + clientsubmission (models.ClientSubmission | str | None, optional): Submission of interest. Defaults to None. + exclude_submission_type ( str | None, optional): Name of submissiontype to exclude. Defaults to None. sample (models.Sample | str | None, optional): Sample of interest. Defaults to None. row (int, optional): Row of the sample location on procedure plate. Defaults to 0. column (int, optional): Column of the sample location on the procedure plate. Defaults to 0. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. chronologic (bool, optional): Return results in chronologic order. Defaults to False. + reverse (bool, optional): Whether or not to reverse order of list. Defaults to False. Returns: models.ClientSubmissionSampleAssociation|List[models.ClientSubmissionSampleAssociation]: Junction(s) of interest @@ -1960,12 +1642,8 @@ class RunSampleAssociation(BaseClass): DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html """ - # id = Column(INTEGER, unique=True, nullable=False) #: id to be used for inheriting purposes sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample run_id = Column(INTEGER, ForeignKey("_run.id"), primary_key=True) #: id of associated procedure - # row = Column(INTEGER) #: row on the 96 well plate - # column = Column(INTEGER) #: column on the 96 well plate - # misc_info = Column(JSON) # NOTE: reference to the Submission object @@ -2003,13 +1681,6 @@ class RunSampleAssociation(BaseClass): # NOTE: Get associated sample info sample = self.sample.to_sub_dict() sample['name'] = self.sample.sample_id - # sample['row'] = self.row - # sample['column'] = self.column - # try: - # sample['well'] = f"{row_map[self.row]}{self.column}" - # except KeyError as e: - # logger.error(f"Unable to find row {self.row} in row_map.") - # sample['Well'] = None sample['plate_name'] = self.run.rsl_plate_number sample['positive'] = False return sample @@ -2070,11 +1741,13 @@ class RunSampleAssociation(BaseClass): Args: run (models.Run | str | None, optional): Submission of interest. Defaults to None. + exclude_submission_type ( str | None, optional): Name of submissiontype to exclude. Defaults to None. sample (models.Sample | str | None, optional): Sample of interest. Defaults to None. row (int, optional): Row of the sample location on procedure plate. Defaults to 0. column (int, optional): Column of the sample location on the procedure plate. Defaults to 0. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. chronologic (bool, optional): Return results in chronologic order. Defaults to False. + reverse (bool, optional): Whether or not to reverse order of list. Defaults to False. Returns: models.ClientSubmissionSampleAssociation|List[models.ClientSubmissionSampleAssociation]: Junction(s) of interest @@ -2169,13 +1842,10 @@ class RunSampleAssociation(BaseClass): output = super().details_dict() # NOTE: Figure out how to merge the misc_info if doing .update instead. relevant = {k: v for k, v in output.items() if k not in ['sample']} - # logger.debug(f"Relevant info from assoc output: {pformat(relevant)}") output = output['sample'].details_dict() misc = output['misc_info'] - # logger.debug(f"Output from sample: {pformat(output)}") output.update(relevant) output['misc_info'] = misc - return output @@ -2192,7 +1862,7 @@ class ProcedureSampleAssociation(BaseClass): sample = relationship(Sample, back_populates="sampleprocedureassociation") #: associated equipment - results = relationship("Results", back_populates="sampleprocedureassociation") + results = relationship("Results", back_populates="sampleprocedureassociation") #: associated results @classmethod def query(cls, sample: Sample | str | None = None, procedure: Procedure | str | None = None, limit: int = 0, @@ -2242,9 +1912,6 @@ class ProcedureSampleAssociation(BaseClass): # NOTE: Figure out how to merge the misc_info if doing .update instead. relevant = {k: v for k, v in output.items() if k not in ['sample']} output = output['sample'].details_dict() - logger.debug(f"Output: {pformat(output)}") - logger.debug(f"Relevant: {pformat(relevant)}") - # relevant['submission_rank'] = output['misc_info']['submission_rank'] misc = output['misc_info'] output.update(relevant) output['misc_info'] = misc diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index a903a5f..96ec78b 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -1244,7 +1244,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): name: dict = Field(default=dict(value="NA", missing=True), validate_default=True) technician: dict = Field(default=dict(value="NA", missing=True)) repeat: bool = Field(default=False) - repeat_of: str | None = Field(default=None) + repeat_of: Procedure | None = Field(default=None) # kittype: dict = Field(default=dict(value="NA", missing=True)) # possible_kits: list | None = Field(default=[], validate_default=True) plate_map: str | None = Field(default=None) @@ -1485,8 +1485,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): if sql.repeat: regex = re.compile(r".*\dR\d$") repeats = [item for item in self.run.procedure if - self.repeat_of in item.name and bool(regex.match(item.name))] - sql.name = f"{self.repeat_of}R{str(len(repeats) + 1)}" + self.repeat_of.name in item.name and bool(regex.match(item.name))] + sql.name = f"{self.repeat_of.name}-R{str(len(repeats) + 1)}" sql.repeat_of = self.repeat_of sql.started_date = datetime.now() if self.run: diff --git a/src/submissions/frontend/widgets/procedure_creation.py b/src/submissions/frontend/widgets/procedure_creation.py index b1e7f9c..b2aad8e 100644 --- a/src/submissions/frontend/widgets/procedure_creation.py +++ b/src/submissions/frontend/widgets/procedure_creation.py @@ -120,6 +120,10 @@ class ProcedureCreation(QDialog): match key: case "rsl_plate_num": setattr(self.procedure.run, key, new_value) + case "repeat_of": + from backend.db.models import Procedure + parent = Procedure.query(name=new_value, limit=1) + self.procedure.repeat_of = parent case _: attribute = getattr(self.procedure, key) match attribute: @@ -128,6 +132,7 @@ class ProcedureCreation(QDialog): case _: setattr(self.procedure, key, new_value.strip('\"')) logger.debug(f"Set value for {key}: {getattr(self.procedure, key)}") + # sys.exit()