Code cleanup for db.models complete.

This commit is contained in:
lwark
2025-09-03 14:04:26 -05:00
parent b2225ef731
commit fcda0d873c
9 changed files with 208 additions and 2200 deletions

View File

@@ -22,7 +22,6 @@ def set_sqlite_pragma(dbapi_connection, connection_record):
execution_phrase = "PRAGMA foreign_keys=ON"
print(f"Executing '{execution_phrase}' in sql.")
else:
# print("Nothing to execute, returning")
cursor.close()
return
cursor.execute(execution_phrase)
@@ -55,9 +54,6 @@ def update_log(mapper, connection, target):
continue
added = [str(item) for item in hist.added]
# NOTE: Attributes left out to save space
# if attr.key in ['artic_technician', 'clientsubmissionsampleassociation', 'submission_reagent_associations',
# 'submission_equipment_associations', 'submission_tips_associations', 'contact_id', 'gel_info',
# 'gel_controls', 'source_plates']:
if attr.key in LogMixin.tracking_exclusion:
continue
deleted = [str(item) for item in hist.deleted]

View File

@@ -164,8 +164,7 @@ class BaseClass(Base):
dict | list | str: Output of key:value dict or single (list, str) desired variable
"""
# NOTE: singles is a list of fields that need to be limited to 1 result.
singles = list(set(cls.singles + BaseClass.singles))
return dict(singles=singles)
return dict(singles=list(set(cls.singles + BaseClass.singles)))
@classmethod
def find_regular_subclass(cls, name: str | None = None) -> Any:
@@ -237,10 +236,8 @@ class BaseClass(Base):
new = False
allowed = [k for k, v in cls.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, hybrid_property)]
# and not isinstance(v.property, _RelationshipDeclared)]
sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed}
outside_kwargs = {k: v for k, v in kwargs.items() if k not in allowed}
logger.debug(f"Sanitized kwargs: {sanitized_kwargs}")
instance = cls.query(limit=1, **sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
@@ -254,10 +251,8 @@ class BaseClass(Base):
from backend.validators.pydant import PydBaseClass
if issubclass(v.__class__, PydBaseClass):
setattr(instance, k, v.to_sql())
# else:
# logger.error(f"Could not set {k} due to {e}")
instance._misc_info.update(outside_kwargs)
# logger.info(f"Instance from query or create: {instance}, new: {new}")
logger.info(f"Instance from query or create: {instance}, new: {new}")
return instance, new
@classmethod
@@ -286,17 +281,10 @@ class BaseClass(Base):
Returns:
Any | List[Any]: Single result if limit = 1 or List if other.
"""
# logger.debug(f"Kwargs: {kwargs}")
# if model is None:
# model = cls
# logger.debug(f"Model: {model}")
if query is None:
query: Query = cls.__database_session__.query(cls)
# else:
# logger.debug(f"Incoming query: {query}")
singles = cls.get_default_info('singles')
for k, v in kwargs.items():
# logger.info(f"Using key: {k} with value: {v} against {cls}")
try:
attr = getattr(cls, k)
except (ArgumentError, AttributeError) as e:
@@ -314,7 +302,6 @@ class BaseClass(Base):
except ArgumentError:
continue
else:
# logger.debug("Single item.")
try:
query = query.filter(attr == v)
except ArgumentError:
@@ -354,9 +341,6 @@ class BaseClass(Base):
try:
self.__database_session__.add(self)
self.__database_session__.commit()
# except sqlalchemy.exc.IntegrityError as i:
# logger.error(f"Integrity error saving {self} due to: {i}")
# logger.error(pformat(self.__dict__))
except Exception as e:
logger.critical(f"Problem saving {self} due to: {e}")
logger.error(f"Error message: {type(e)}")
@@ -434,7 +418,7 @@ class BaseClass(Base):
try:
template = env.get_template(temp_name)
except TemplateNotFound as e:
# logger.error(f"Couldn't find template {e}")
logger.error(f"Couldn't find template {e}")
template = env.get_template("details.html")
return template
@@ -448,14 +432,11 @@ class BaseClass(Base):
Returns:
bool: If a single unequivocal value is found will be false, else true.
"""
# logger.debug(f"Incoming attributes: {attributes}")
for key, value in attributes.items():
if value.lower() == "none":
value = None
# logger.debug(f"Attempting to grab attribute: {key}")
self_value = getattr(self, key)
class_attr = getattr(self.__class__, key)
# logger.debug(f"Self value: {self_value}, class attr: {class_attr} of type: {type(class_attr)}")
if isinstance(class_attr, property):
filter = "property"
else:
@@ -475,7 +456,6 @@ class BaseClass(Base):
case "property":
pass
case _RelationshipDeclared():
# logger.debug(f"Checking {self_value}")
try:
self_value = self_value.name
except AttributeError:
@@ -483,18 +463,14 @@ class BaseClass(Base):
if class_attr.property.uselist:
self_value = self_value.__str__()
try:
# logger.debug(f"Check if {self_value.__class__} is subclass of {self.__class__}")
check = issubclass(self_value.__class__, self.__class__)
except TypeError as e:
logger.error(f"Couldn't check if {self_value.__class__} is subclass of {self.__class__} due to {e}")
check = False
if check:
# logger.debug(f"Checking for subclass name.")
self_value = self_value.name
# logger.debug(f"Checking self_value {self_value} of type {type(self_value)} against attribute {value} of type {type(value)}")
if self_value != value:
output = False
# logger.debug(f"Value {key} is False, returning.")
return output
return True
@@ -502,13 +478,9 @@ class BaseClass(Base):
"""
Custom dunder method to handle potential list relationship issues.
"""
# logger.debug(f"Attempting to set: {key} to {value}")
if key.startswith("_"):
return super().__setattr__(key, value)
# try:
check = not hasattr(self, key)
# except:
# return
if check:
try:
value = json.dumps(value)
@@ -524,27 +496,21 @@ class BaseClass(Base):
except AttributeError:
return super().__setattr__(key, value)
if isinstance(field_type, InstrumentedAttribute):
# logger.debug(f"{key} is an InstrumentedAttribute.")
match field_type.property:
case ColumnProperty():
# logger.debug(f"Setting ColumnProperty to {value}")
return super().__setattr__(key, value)
case _RelationshipDeclared():
# logger.debug(f"{self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}")
if field_type.property.uselist:
# logger.debug(f"Setting with uselist")
existing = self.__getattribute__(key)
# NOTE: This is causing problems with removal of items from lists. Have to overhaul it.
if existing is not None:
logger.debug(f"{key} Existing: {existing}, incoming: {value}")
if isinstance(value, list):
# value = existing + value
value = value
else:
value = existing + [value]
else:
if isinstance(value, list):
# value = value
pass
else:
value = [value]
@@ -552,7 +518,6 @@ class BaseClass(Base):
value = list(set(value))
except TypeError:
pass
# logger.debug(f"Final value for {key}: {value}")
return super().__setattr__(key, value)
else:
if isinstance(value, list):
@@ -608,7 +573,6 @@ class BaseClass(Base):
relevant = {k: v for k, v in self.__class__.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)}
# output = OrderedDict()
output = dict(excluded=["excluded", "misc_info", "_misc_info", "id"])
for k, v in relevant.items():
try:
@@ -621,15 +585,9 @@ class BaseClass(Base):
value = getattr(self, k)
except AttributeError:
continue
# try:
# logger.debug(f"Setting {k} to {value} for details dict.")
# except AttributeError as e:
# logger.error(f"Can't log {k} value due to {type(e)}")
# continue
output[k.strip("_")] = value
if self._misc_info:
for key, value in self._misc_info.items():
# logger.debug(f"Misc info key {key}")
output[key] = value
return output
@@ -669,28 +627,15 @@ class BaseClass(Base):
pyd = getattr(pydant, pyd_model_name)
except AttributeError:
raise AttributeError(f"Could not get pydantic class {pyd_model_name}")
# logger.debug(f"Kwargs: {kwargs}")
# logger.debug(f"Dict: {pformat(self.details_dict())}")
return pyd(**self.details_dict(**kwargs))
def show_details(self, obj):
logger.debug("Show Details")
from frontend.widgets.submission_details import SubmissionDetails
dlg = SubmissionDetails(parent=obj, sub=self)
if dlg.exec():
pass
def export(self, obj, output_filepath: str | Path | None = None):
# if not hasattr(self, "template_file"):
# logger.error(f"Export not implemented for {self.__class__.__name__}")
# return
# pyd = self.to_pydantic()
# if not output_filepath:
# from frontend import select_save_file
# output_filepath = select_save_file(obj=obj, default_name=pyd.construct_filename(), extension="xlsx")
# Writer = getattr(writers, f"{self.__class__.__name__}Writer")
# writer = Writer(output_filepath=output_filepath, pydant_obj=pyd, range_dict=self.range_dict)
# workbook = writer
from backend import managers
Manager = getattr(managers, f"Default{self.__class__.__name__}")
manager = Manager(parent=obj, input_object=self)

View File

@@ -18,10 +18,10 @@ class AuditLog(Base):
__tablename__ = "_auditlog"
id = Column(INTEGER, primary_key=True, autoincrement=True) #: primary key
user = Column(String(64))
time = Column(TIMESTAMP)
object = Column(String(64))
changes = Column(JSON)
user = Column(String(64)) #: The user who made the change
time = Column(TIMESTAMP) #: When the change was made
object = Column(String(64)) #: What was changed
changes = Column(JSON) #: List of changes that were made
def __repr__(self):
return f"<{self.object}: {self.user} @ {self.time}>"

View File

@@ -2,17 +2,13 @@
All control related models.
"""
from __future__ import annotations
import itertools
from pprint import pformat
from PyQt6.QtWidgets import QWidget, QCheckBox, QLabel
from pandas import DataFrame
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, case, FLOAT
from sqlalchemy.orm import relationship, Query, validates
from PyQt6.QtWidgets import QWidget
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, case
from sqlalchemy.orm import relationship, Query
import logging, re
from operator import itemgetter
from . import BaseClass
from tools import setup_lookup, report_result, Result, Report, Settings, get_unique_values_in_df_column, super_splitter, \
flatten_list, timer
from tools import setup_lookup, Report, Settings, super_splitter
from datetime import date, datetime, timedelta
from typing import List, Literal, Tuple, Generator
from re import Pattern
@@ -131,16 +127,6 @@ class Control(BaseClass):
procedure = relationship("Procedure", back_populates="control",
foreign_keys=[procedure_id]) #: parent procedure
# __mapper_args__ = {
# "polymorphic_identity": "Basic Control",
# "polymorphic_on": case(
# (controltype_name == "PCR Control", "PCR Control"),
# (controltype_name == "Irida Control", "Irida Control"),
# else_="Basic Control"
# ),
# "with_polymorphic": "*",
# }
def __repr__(self) -> str:
return f"<{self.controltype_name}({self.name})>"
@@ -282,450 +268,3 @@ class Control(BaseClass):
def delete(self):
self.__database_session__.delete(self)
self.__database_session__.commit()
# class PCRControl(Control):
# """
# Class made to hold info from Design & Analysis software.
# """
#
# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
# subtype = Column(String(16)) #: PC or NC
# target = Column(String(16)) #: N1, N2, etc.
# ct = Column(FLOAT) #: PCR result
# reagent_lot = Column(String(64), ForeignKey("_reagent.lot", ondelete="SET NULL",
# name="fk_reagent_lot"))
# reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control
#
# __mapper_args__ = dict(polymorphic_identity="PCR Control",
# polymorphic_load="inline",
# inherit_condition=(id == Control.id))
#
# def to_sub_dict(self) -> dict:
# """
# Creates dictionary of fields for this object.
#
# Returns:
# dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date
# """
# return dict(
# name=self.name,
# ct=self.ct,
# subtype=self.subtype,
# target=self.target,
# reagent_lot=self.reagent_lot,
# submitted_date=self.submitted_date.date()
# )
#
# @classmethod
# @report_result
# def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]:
# """
# Creates a PCRFigure. Overrides parent
#
# Args:
# parent (__type__): Widget to contain the chart.
# chart_settings (dict): settings passed down from chart widget
# ctx (Settings): settings passed down from gui. Not used here.
#
# Returns:
# Tuple[Report, "PCRFigure"]: Report of status and resulting figure.
# """
# from frontend.visualizations.pcr_charts import PCRFigure
# parent.mode_typer.clear()
# parent.mode_typer.setEnabled(False)
# report = Report()
# control = cls.query(proceduretype=chart_settings['submissiontype'], start_date=chart_settings['start_date'],
# end_date=chart_settings['end_date'])
# data = [control.to_sub_dict() for control in control]
# df = DataFrame.from_records(data)
# # NOTE: Get all PCR control with ct over 0
# try:
# df = df[df.ct > 0.0]
# except AttributeError:
# df = df
# fig = PCRFigure(df=df, modes=[], settings=chart_settings)
# return report, fig
#
# def to_pydantic(self):
# from backend.validators import PydPCRControl
# return PydPCRControl(**self.to_sub_dict(),
# controltype_name=self.controltype_name,
# clientsubmission_id=self.clientsubmission_id)
#
#
# class IridaControl(Control):
# subtyping_allowed = ['kraken']
#
# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
# contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism
# matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism
# kraken = Column(JSON) #: unstructured output from kraken_report
# subtype = Column(String(16), nullable=False) #: EN-NOS, MCS-NOS, etc
# refseq_version = Column(String(16)) #: version of refseq used in fastq parsing
# kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing
# kraken2_db_version = Column(String(32)) #: folder name of kraken2 db
# sample_id = Column(INTEGER,
# ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key
#
# __mapper_args__ = dict(polymorphic_identity="Irida Control",
# polymorphic_load="inline",
# inherit_condition=(id == Control.id))
#
# @property
# def targets(self):
# if self.controltype.targets:
# return list(itertools.chain.from_iterable([value for key, value in self.controltype.targets.items()
# if key == self.subtype]))
# else:
# return ["None"]
#
# @validates("subtype")
# def enforce_subtype_literals(self, key: str, value: str) -> str:
# """
# Validates submissiontype field with acceptable values
#
# Args:
# key (str): Field name
# value (str): Field Value
#
# Raises:
# KeyError: Raised if value is not in the acceptable list.
#
# Returns:
# str: Validated string.
# """
# acceptables = ['ATCC49226', 'ATCC49619', 'EN-NOS', "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"]
# if value.upper() not in acceptables:
# raise KeyError(f"Sub-type must be in {acceptables}")
# return value
#
# def to_sub_dict(self) -> dict:
# """
# Converts object into convenient dictionary for use in procedure summary
#
# Returns:
# dict: output dictionary containing: Name, Type, Targets, Top Kraken results
# """
# try:
# kraken = self.kraken
# except TypeError:
# kraken = {}
# try:
# kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()])
# except AttributeError:
# kraken_cnt_total = 0
# try:
# new_kraken = [dict(name=key, kraken_count=value['kraken_count'],
# kraken_percent=f"{value['kraken_count'] / kraken_cnt_total:0.2%}",
# target=key in self.controltype.targets)
# for key, value in kraken.items()]
# new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)[0:10]
# except (AttributeError, ZeroDivisionError):
# new_kraken = []
# output = dict(
# name=self.name,
# type=self.controltype.name,
# targets=", ".join(self.targets),
# kraken=new_kraken
# )
# return output
#
# def convert_by_mode(self, control_sub_type: str, mode: Literal['kraken', 'matches', 'contains'],
# consolidate: bool = False) -> Generator[dict, None, None]:
# """
# split this instance into analysis types ('kraken', 'matches', 'contains') for control graphs
#
# Args:
# consolidate (bool): whether to merge all off-target genera. Defaults to False
# control_sub_type (str): control subtype, 'MCS-NOS', etc.
# mode (Literal['kraken', 'matches', 'contains']): analysis type, 'contains', etc.
#
# Returns:
# List[dict]: list of records
# """
# try:
# data = self.__getattribute__(mode)
# except TypeError:
# data = {}
# if data is None:
# data = {}
# # NOTE: Data truncation and consolidation.
# if "kraken" in mode:
# data = {k: v for k, v in sorted(data.items(), key=lambda d: d[1][f"{mode}_count"], reverse=True)[:50]}
# else:
# if consolidate:
# on_tar = {k: v for k, v in data.items() if k.strip("*") in self.controltype.targets[control_sub_type]}
# off_tar = sum(v[f'{mode}_ratio'] for k, v in data.items() if
# k.strip("*") not in self.controltype.targets[control_sub_type])
# on_tar['Off-target'] = {f"{mode}_ratio": off_tar}
# data = on_tar
# for genus in data:
# _dict = dict(
# name=self.name,
# submitted_date=self.submitted_date,
# genus=genus,
# target='Target' if genus.strip("*") in self.controltype.targets[control_sub_type] else "Off-target"
# )
# for key in data[genus]:
# _dict[key] = data[genus][key]
# yield _dict
#
# @classproperty
# def modes(cls) -> List[str]:
# """
# Get all control modes from database
#
# Returns:
# List[str]: List of control mode names.
# """
# try:
# cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)]
# except AttributeError as e:
# logger.error(f"Failed to get available modes from db: {e}")
# cols = []
# return cols
#
# @classmethod
# def make_parent_buttons(cls, parent: QWidget) -> None:
# """
# Creates buttons for controlling
#
# Args:
# parent (QWidget): chart holding widget to add buttons to.
#
# """
# super().make_parent_buttons(parent=parent)
# rows = parent.layout.rowCount() - 2
# # NOTE: check box for consolidating off-target items
# checker = QCheckBox(parent)
# checker.setChecked(True)
# checker.setObjectName("irida_check")
# checker.setToolTip("Pools off-target genera to save time.")
# parent.layout.addWidget(QLabel("Consolidate Off-targets"), rows, 0, 1, 1)
# parent.layout.addWidget(checker, rows, 1, 1, 2)
# checker.checkStateChanged.connect(parent.update_data)
#
# @classmethod
# @report_result
# def make_chart(cls, chart_settings: dict, parent, ctx) -> Tuple[Report, "IridaFigure" | None]:
# """
# Creates a IridaFigure. Overrides parent
#
# Args:
# parent (__type__): Widget to contain the chart.
# chart_settings (dict): settings passed down from chart widget
# ctx (Settings): settings passed down from gui.
#
# Returns:
# Tuple[Report, "IridaFigure"]: Report of status and resulting figure.
# """
# from frontend.visualizations import IridaFigure
# try:
# checker = parent.findChild(QCheckBox, name="irida_check")
# if chart_settings['mode'] == "kraken":
# checker.setEnabled(False)
# checker.setChecked(False)
# else:
# checker.setEnabled(True)
# consolidate = checker.isChecked()
# except AttributeError:
# consolidate = False
# report = Report()
# control = cls.query(subtype=chart_settings['submissiontype'], start_date=chart_settings['start_date'],
# end_date=chart_settings['end_date'])
# if not control:
# report.add_result(Result(status="Critical", msg="No control found in given date range."))
# return report, None
# # NOTE: change each control to list of dictionaries
# data = [control.convert_by_mode(control_sub_type=chart_settings['submissiontype'], mode=chart_settings['mode'],
# consolidate=consolidate) for
# control in control]
# # NOTE: flatten data to one dimensional list
# # data = [item for sublist in data for item in sublist]
# data = flatten_list(data)
# if not data:
# report.add_result(Result(status="Critical", msg="No data found for control in given date range."))
# return report, None
# df = cls.convert_data_list_to_df(input_df=data, sub_mode=chart_settings['sub_mode'])
# if chart_settings['sub_mode'] is None:
# title = chart_settings['sub_mode']
# else:
# title = f"{chart_settings['mode']} - {chart_settings['sub_mode']}"
# # NOTE: send dataframe to chart maker
# df, modes = cls.prep_df(ctx=ctx, df=df)
# fig = IridaFigure(df=df, ytitle=title, modes=modes, parent=parent,
# settings=chart_settings)
# return report, fig
#
# @classmethod
# def convert_data_list_to_df(cls, input_df: list[dict], sub_mode) -> DataFrame:
# """
# Convert list of control records to dataframe
#
# Args:
# input_df (list[dict]): list of dictionaries containing records
# sub_mode (str | None, optional): submissiontype of procedure type. Defaults to None.
#
# Returns:
# DataFrame: dataframe of control
# """
# df = DataFrame.from_records(input_df)
# safe = ['name', 'submitted_date', 'genus', 'target']
# for column in df.columns:
# if column not in safe:
# if sub_mode is not None and column != sub_mode:
# continue
# else:
# safe.append(column)
# if "percent" in column:
# try:
# count_col = next(item for item in df.columns if "count" in item)
# except StopIteration:
# continue
# # NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating.
# df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
# df = df[[c for c in df.columns if c in safe]]
# # NOTE: move date of sample submitted on same date as previous ahead one.
# df = cls.displace_date(df=df)
# # NOTE: ad hoc method to make data labels more accurate.
# df = cls.df_column_renamer(df=df)
# return df
#
# @classmethod
# def df_column_renamer(cls, df: DataFrame) -> DataFrame:
# """
# Ad hoc function I created to clarify some fields
#
# Args:
# df (DataFrame): input dataframe
#
# Returns:
# DataFrame: dataframe with 'clarified' column names
# """
# df = df[df.columns.drop(list(df.filter(regex='_hashes')))]
# return df.rename(columns={
# "contains_ratio": "contains_shared_hashes_ratio",
# "matches_ratio": "matches_shared_hashes_ratio",
# "kraken_count": "kraken2_read_count_(top_50)",
# "kraken_percent": "kraken2_read_percent_(top_50)"
# })
#
# @classmethod
# def displace_date(cls, df: DataFrame) -> DataFrame:
# """
# This function serves to split sample that were submitted on the same date by incrementing dates.
# It will shift the date forward by one day if it is the same day as an existing date in a list.
#
# Args:
# df (DataFrame): input dataframe composed of control records
#
# Returns:
# DataFrame: output dataframe with dates incremented.
# """
# # NOTE: get submitted dates for each control
# dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in
# sorted(df['name'].unique())]
# previous_dates = set()
# for item in dict_list:
# df, previous_dates = cls.check_date(df=df, item=item, previous_dates=previous_dates)
# return df
#
# @classmethod
# def check_date(cls, df: DataFrame, item: dict, previous_dates: set) -> Tuple[DataFrame, list]:
# """
# Checks if an items date is already present in df and adjusts df accordingly
#
# Args:
# df (DataFrame): input dataframe
# item (dict): control for checking
# previous_dates (list): list of dates found in previous control
#
# Returns:
# Tuple[DataFrame, list]: Output dataframe and appended list of previous dates
# """
# try:
# check = item['date'] in previous_dates
# except IndexError:
# check = False
# previous_dates.add(item['date'])
# if check:
# # NOTE: get df locations where name == item name
# mask = df['name'] == item['name']
# # NOTE: increment date in dataframe
# df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
# item['date'] += timedelta(days=1)
# passed = False
# else:
# passed = True
# # NOTE: if procedure didn't lead to changed date, return values
# if passed:
# return df, previous_dates
# # NOTE: if date was changed, rerun with new date
# else:
# logger.warning(f"Date check failed, running recursion.")
# df, previous_dates = cls.check_date(df, item, previous_dates)
# return df, previous_dates
#
# @classmethod
# def prep_df(cls, ctx: Settings, df: DataFrame) -> Tuple[DataFrame | None, list]:
# """
# Constructs figures based on parsed pandas dataframe.
#
# Args:
# ctx (Settings): settings passed down from gui
# df (pd.DataFrame): input dataframe
# ytitle (str | None, optional): title for the y-axis. Defaults to None.
#
# Returns:
# Figure: Plotly figure
# """
# # NOTE: converts starred genera to normal and splits off list of starred
# if df.empty:
# return None, []
# df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"})
# df['genera'] = [item[-1] if item and item[-1] == "*" else "" for item in df['genus'].to_list()]
# # NOTE: remove original run, using reruns if applicable
# df = cls.drop_reruns_from_df(ctx=ctx, df=df)
# # NOTE: sort by and exclude from
# sorts = ['submitted_date', "target", "genus"]
# exclude = ['name', 'genera']
# modes = [item for item in df.columns if item not in sorts and item not in exclude]
# # NOTE: Set descending for any columns that have "{mode}" in the header.
# ascending = [False if item == "target" else True for item in sorts]
# df = df.sort_values(by=sorts, ascending=ascending)
# # NOTE: actual chart construction is done by
# return df, modes
#
# @classmethod
# def drop_reruns_from_df(cls, ctx: Settings, df: DataFrame) -> DataFrame:
# """
# Removes semi-duplicates from dataframe after finding sequencing repeats.
#
# Args:
# ctx (Settings): settings passed from gui
# df (DataFrame): initial dataframe
#
# Returns:
# DataFrame: dataframe with originals removed in favour of repeats.
# """
# if 'rerun_regex' in ctx.model_extra:
# sample_names = get_unique_values_in_df_column(df, column_name="name")
# rerun_regex = re.compile(fr"{ctx.rerun_regex}")
# exclude = [re.sub(rerun_regex, "", sample) for sample in sample_names if rerun_regex.search(sample)]
# df = df[~df.name.isin(exclude)]
# return df
#
# def to_pydantic(self) -> "PydIridaControl":
# """
# Constructs a pydantic version of this object.
#
# Returns:
# PydIridaControl: This object as a pydantic model.
# """
# from backend.validators import PydIridaControl
# return PydIridaControl(**self.__dict__)
#
# @property
# def is_positive_control(self):
# return not self.subtype.lower().startswith("en")

View File

@@ -3,14 +3,11 @@ All client organization related models.
'''
from __future__ import annotations
import logging
from pathlib import Path
from pprint import pformat
from sqlalchemy import Column, String, INTEGER, ForeignKey, Table
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, Query
from . import Base, BaseClass
from tools import check_authorization, setup_lookup
from typing import List, Tuple
from typing import List
logger = logging.getLogger(f"submissions.{__name__}")
@@ -31,7 +28,7 @@ class ClientLab(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: clientlab name
clientsubmission = relationship("ClientSubmission", back_populates="clientlab") #: procedure this clientlab has submitted
clientsubmission = relationship("ClientSubmission", back_populates="clientlab") #: submission this clientlab has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contact = relationship("Contact", back_populates="clientlab",
secondary=clientlab_contact) #: contact involved with this org
@@ -47,6 +44,7 @@ class ClientLab(BaseClass):
Lookup clientlabs in the database by a number of parameters.
Args:
id (int | None, optional): id integer of the clientlab. Defaults to None.
name (str | None, optional): Name of the clientlab. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
@@ -104,20 +102,6 @@ class Contact(BaseClass):
def searchables(cls):
return []
# @classmethod
# def query_or_create(cls, **kwargs) -> Tuple[Contact, bool]:
# new = False
# disallowed = []
# sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
# instance = cls.query(**sanitized_kwargs)
# if not instance or isinstance(instance, list):
# instance = cls()
# new = True
# for k, v in sanitized_kwargs.items():
# setattr(instance, k, v)
# logger.info(f"Instance from contact query or create: {instance}")
# return instance, new
@classmethod
@setup_lookup
def query(cls,
@@ -131,6 +115,7 @@ class Contact(BaseClass):
Lookup contact in the database by a number of parameters.
Args:
id (int | None, optional): id integer of the contact. Defaults to None.
name (str | None, optional): Name of the contact. Defaults to None.
email (str | None, optional): Email of the contact. Defaults to None.
phone (str | None, optional): Phone number of the contact. Defaults to None.

File diff suppressed because it is too large Load Diff

View File

@@ -2,43 +2,26 @@
Models for the main procedure and sample types.
"""
from __future__ import annotations
import itertools
import pickle
from copy import deepcopy
from getpass import getuser
import logging, uuid, tempfile, re, base64, numpy as np, pandas as pd, types, sys
import logging, tempfile, re, numpy as np, pandas as pd, types, sys, itertools
from inspect import isclass
from io import BytesIO
from zipfile import ZipFile, BadZipfile
from tempfile import TemporaryDirectory, TemporaryFile
from zipfile import BadZipfile
from operator import itemgetter
from pprint import pformat
import openpyxl
from pandas import DataFrame
from sqlalchemy.ext.hybrid import hybrid_property
from frontend.widgets.functions import select_save_file
from . import Base, BaseClass, Reagent, SubmissionType, ClientLab, Contact, LogMixin, Procedure
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence
from sqlalchemy.orm import relationship, validates, Query
from . import Base, BaseClass, SubmissionType, ClientLab, Contact, LogMixin, Procedure
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func
from sqlalchemy.orm import relationship, Query
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError, \
ArgumentError
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from openpyxl import Workbook
from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \
report_result, create_holidays_for_year, check_dictionary_inclusion_equality, is_power_user
from tools import setup_lookup, jinja_template_loading, create_holidays_for_year, check_dictionary_inclusion_equality, is_power_user
from datetime import datetime, date
from typing import List, Any, Tuple, Literal, Generator, Type, TYPE_CHECKING
from typing import List, Literal, Generator, TYPE_CHECKING
from pathlib import Path
from jinja2.exceptions import TemplateNotFound
from jinja2 import Template
from PIL import Image
if TYPE_CHECKING:
from backend.db.models.procedures import ProcedureType, Procedure
@@ -51,21 +34,21 @@ class ClientSubmission(BaseClass, LogMixin):
"""
id = Column(INTEGER, primary_key=True) #: primary key
submitter_plate_id = Column(String(127), unique=True) #: The number given to the procedure by the submitting lab
submitted_date = Column(TIMESTAMP) #: Date procedure received
submitter_plate_id = Column(String(127), unique=True) #: The number given to the submission by the submitting lab
submitted_date = Column(TIMESTAMP) #: Date submission received
clientlab = relationship("ClientLab", back_populates="clientsubmission") #: client org
clientlab_id = Column(INTEGER, ForeignKey("_clientlab.id", ondelete="SET NULL",
name="fk_BS_sublab_id")) #: client lab id from _organizations
submission_category = Column(String(64))
submission_category = Column(String(64)) #: i.e. Surveillance
sample_count = Column(INTEGER) #: Number of sample in the procedure
full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate.
comment = Column(JSON)
comment = Column(JSON) #: comment objects from users.
run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship
contact = relationship("Contact", back_populates="clientsubmission") #: client org
contact = relationship("Contact", back_populates="clientsubmission") #: contact representing submitting lab.
contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL",
name="fk_BS_contact_id")) #: client lab id from _organizations
name="fk_BS_contact_id")) #: contact id from _organizations
submissiontype_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL",
name="fk_BS_subtype_name")) #: name of joined procedure type
name="fk_BS_subtype_name")) #: name of joined submission type
submissiontype = relationship("SubmissionType", back_populates="clientsubmission") #: archetype of this procedure
cost_centre = Column(
String(64)) #: Permanent storage of used cost centre in case organization field changed in the future.
@@ -93,7 +76,7 @@ class ClientSubmission(BaseClass, LogMixin):
@setup_lookup
def query(cls,
submissiontype: str | SubmissionType | None = None,
submissiontype_name: str | None = None,
# submissiontype_name: str | None = None,
id: int | str | None = None,
submitter_plate_id: str | None = None,
start_date: date | datetime | str | int | None = None,
@@ -108,7 +91,7 @@ class ClientSubmission(BaseClass, LogMixin):
Lookup procedure based on a number of parameters. Overrides parent.
Args:
submission_type (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None.
submissiontype (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None.
id (int | str | None, optional): Submission id in the database (limits results to 1). Defaults to None.
rsl_plate_number (str | None, optional): Submission name in the database (limits results to 1). Defaults to None.
start_date (date | str | int | None, optional): Beginning date to search by. Defaults to None.
@@ -142,9 +125,11 @@ class ClientSubmission(BaseClass, LogMixin):
limit = 1
case _:
pass
match submissiontype_name:
match submissiontype:
case SubmissionType():
query = query.filter(cls.submissiontype == submissiontype)
case str():
query = query.filter(cls.submissiontype_name == submissiontype_name)
query = query.filter(cls.submissiontype_name == submissiontype)
case _:
pass
# NOTE: by id (returns only a single value)
@@ -157,7 +142,6 @@ class ClientSubmission(BaseClass, LogMixin):
limit = 1
case _:
pass
# query = query.order_by(cls.submitted_date.desc())
# NOTE: Split query results into pages of size {page_size}
if page_size > 0 and limit == 0:
limit = page_size
@@ -249,11 +233,8 @@ class ClientSubmission(BaseClass, LogMixin):
if report:
return output
if full_data:
# dicto, _ = self.kittype.construct_xl_map_for_use(self.proceduretype)
# sample = self.generate_associations(name="clientsubmissionsampleassociation")
samples = None
runs = [item.to_dict(full_data=True) for item in self.run]
# custom = self.custom
else:
samples = None
custom = None
@@ -280,7 +261,6 @@ class ClientSubmission(BaseClass, LogMixin):
output["comment"] = comments
output["contact"] = contact
output["contact_phone"] = contact_phone
# output["custom"] = custom
output["run"] = runs
output['name'] = self.name
return output
@@ -291,7 +271,6 @@ class ClientSubmission(BaseClass, LogMixin):
except AssertionError:
logger.warning(f"Converting {sample} to sql.")
sample = sample.to_sql()
# logger.debug(sample.__dict__)
try:
row = sample._misc_info['row']
except (KeyError, AttributeError):
@@ -300,7 +279,6 @@ class ClientSubmission(BaseClass, LogMixin):
column = sample._misc_info['column']
except KeyError:
column = 0
# logger.debug(f"Sample: {sample}")
submission_rank = sample._misc_info['submission_rank']
if sample in self.sample:
return
@@ -311,7 +289,6 @@ class ClientSubmission(BaseClass, LogMixin):
row=row,
column=column
)
# assoc.save()
return assoc
@property
@@ -333,13 +310,10 @@ class ClientSubmission(BaseClass, LogMixin):
if checker.exec():
run = Run(clientsubmission=self, rsl_plate_number=checker.rsl_plate_number)
active_samples = [sample for sample in samples if sample.enabled]
logger.debug(active_samples)
for sample in active_samples:
sample = sample.to_sql()
logger.debug(f"Sample: {sample.id}")
if sample not in run.sample:
assoc = run.add_sample(sample)
# assoc.save()
run.save()
else:
logger.warning("Run cancelled.")
@@ -351,13 +325,6 @@ class ClientSubmission(BaseClass, LogMixin):
def add_comment(self, obj):
logger.debug("Add Comment")
# def show_details(self, obj):
# logger.debug("Show Details")
# from frontend.widgets.submission_details import SubmissionDetails
# dlg = SubmissionDetails(parent=obj, sub=self)
# if dlg.exec():
# pass
def details_dict(self, **kwargs):
output = super().details_dict(**kwargs)
output['clientlab'] = output['clientlab'].details_dict()
@@ -377,7 +344,6 @@ class ClientSubmission(BaseClass, LogMixin):
def to_pydantic(self, filepath: Path | str | None = None, **kwargs):
output = super().to_pydantic(filepath=filepath, **kwargs)
# output.template_file = self.template_file
return output
@@ -389,18 +355,16 @@ class Run(BaseClass, LogMixin):
id = Column(INTEGER, primary_key=True) #: primary key
rsl_plate_number = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012)
clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id", ondelete="SET NULL",
name="fk_BS_clientsub_id")) #: client lab id from _organizations)
clientsubmission = relationship("ClientSubmission", back_populates="run")
name="fk_BS_clientsub_id")) #: id of parent clientsubmission
clientsubmission = relationship("ClientSubmission", back_populates="run") #: parent clientsubmission
_started_date = Column(TIMESTAMP) #: Date this procedure was started.
run_cost = Column(
FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kittype costs at time of creation.
signed_by = Column(String(32)) #: user name of person who submitted the procedure to the database.
comment = Column(JSON) #: user notes
custom = Column(JSON)
_completed_date = Column(TIMESTAMP)
procedure = relationship("Procedure", back_populates="run", uselist=True)
custom = Column(JSON) #: unknown
_completed_date = Column(TIMESTAMP) #: Date this procedure was finished.
procedure = relationship("Procedure", back_populates="run", uselist=True) #: children procedures
runsampleassociation = relationship(
"RunSampleAssociation",
@@ -412,20 +376,6 @@ class Run(BaseClass, LogMixin):
"sample", creator=lambda sample: RunSampleAssociation(
sample=sample)) #: Association proxy to ClientSubmissionSampleAssociation.sample
# NOTE: Allows for subclassing into ex. BacterialCulture, Wastewater, etc.
# __mapper_args__ = {
# "polymorphic_identity": "Basic Submission",
# "polymorphic_on": case(
#
# (submissiontype_name == "Wastewater", "Wastewater"),
# (submissiontype_name == "Wastewater Artic", "Wastewater Artic"),
# (submissiontype_name == "Bacterial Culture", "Bacterial Culture"),
#
# else_="Basic Submission"
# ),
# "with_polymorphic": "*",
# }
def __repr__(self) -> str:
return f"<Submission({self.name})>"
@@ -556,7 +506,6 @@ class Run(BaseClass, LogMixin):
case SubmissionType():
return submissiontype
case _:
# return SubmissionType.query(cls.__mapper_args__['polymorphic_identity'])
return None
@classmethod
@@ -712,14 +661,12 @@ class Run(BaseClass, LogMixin):
query_out = []
for sub_type in submissiontype:
subs = cls.query(page_size=0, start_date=start_date, end_date=end_date, submissiontype=sub_type)
# logger.debug(f"Sub results: {run}")
query_out.append(subs)
query_out = list(itertools.chain.from_iterable(query_out))
else:
query_out = cls.query(page_size=0, start_date=start_date, end_date=end_date)
records = []
for sub in query_out:
# output = sub.to_dict(full_data=True)
output = sub.details_dict()
for k, v in output.items():
if isinstance(v, types.GeneratorType):
@@ -746,29 +693,6 @@ class Run(BaseClass, LogMixin):
Calculates cost of the plate
"""
# NOTE: Calculate number of columns based on largest column number
# try:
# cols_count_96 = self.column_count
# except Exception as e:
# logger.error(f"Column count error: {e}")
# # NOTE: Get kittype associated with this procedure
# # logger.debug(f"Checking associations with procedure type: {self.submissiontype_name}")
# assoc = next((item for item in self.kittype.kit_submissiontype_associations if
# item.proceduretype == self.submission_type),
# None)
# # logger.debug(f"Got association: {assoc}")
# # NOTE: If every individual cost is 0 this is probably an old plate.
# if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]):
# try:
# self.run_cost = self.kittype.cost_per_run
# except Exception as e:
# logger.error(f"Calculation error: {e}")
# else:
# try:
# self.run_cost = assoc.constant_cost + (assoc.mutable_cost_column * cols_count_96) + (
# assoc.mutable_cost_sample * int(self.sample_count))
# except Exception as e:
# logger.error(f"Calculation error: {e}")
# self.run_cost = round(self.run_cost, 2)
pass
@property
@@ -802,7 +726,6 @@ class Run(BaseClass, LogMixin):
"""
rows = range(1, plate_rows + 1)
columns = range(1, plate_columns + 1)
# logger.debug(f"sample list for plate map: {pformat(sample_list)}")
# NOTE: An overly complicated list comprehension create a list of sample locations
# NOTE: next will return a blank cell if no value found for row/column
output_samples = [next((item for item in sample_list if item['row'] == row and item['column'] == column),
@@ -841,7 +764,6 @@ class Run(BaseClass, LogMixin):
pd.DataFrame: Pandas Dataframe of all relevant procedure
"""
# NOTE: use lookup function to create list of dicts
# subs = [item.to_dict() for item in
subs = [item.details_dict() for item in
cls.query(submissiontype=submission_type, limit=limit, chronologic=chronologic, page=page,
page_size=page_size)]
@@ -872,8 +794,6 @@ class Run(BaseClass, LogMixin):
value (_type_): value of attribute
"""
match key:
# case "kittype":
# field_value = KitType.query(name=value)
case "clientlab":
field_value = ClientLab.query(name=value)
case "contact":
@@ -900,13 +820,11 @@ class Run(BaseClass, LogMixin):
existing = value
case _:
existing = self.__getattribute__(key)
logger.debug(f"Existing value is {pformat(existing)}")
if value in ['', 'null', None]:
logger.error(f"No value given, not setting.")
return
if existing is None:
existing = []
# if value in existing:
if check_dictionary_inclusion_equality(existing, value):
logger.warning("Value already exists. Preventing duplicate addition.")
return
@@ -955,17 +873,6 @@ class Run(BaseClass, LogMixin):
pass
return assoc
# def update_reagentassoc(self, reagent: Reagent, role: str):
# # NOTE: get the first reagent assoc that fills the given reagentrole.
# try:
# assoc = next(item for item in self.submission_reagent_associations if
# item.reagent and role in [role.name for role in item.reagent.equipmentrole])
# assoc.reagent = reagent
# except StopIteration as e:
# logger.error(f"Association for {role} not found, creating new association.")
# assoc = ProcedureReagentAssociation(procedure=self, reagent=reagent)
# self.submission_reagent_associations.append(assoc)
def to_pydantic(self, backup: bool = False) -> "PydSubmission":
"""
Converts this instance into a PydSubmission
@@ -1028,7 +935,6 @@ class Run(BaseClass, LogMixin):
Returns:
str: String from which regex will be compiled.
"""
# logger.debug(f"Class for regex: {cls}")
try:
regex = cls.get_submission_type(submission_type).defaults['regex']
except AttributeError as e:
@@ -1038,7 +944,6 @@ class Run(BaseClass, LogMixin):
regex = re.compile(rf"{regex}", flags=re.IGNORECASE | re.VERBOSE)
except re.error as e:
regex = cls.construct_regex()
# logger.debug(f"Returning regex: {regex}")
return regex
# NOTE: Polymorphic functions
@@ -1089,15 +994,6 @@ class Run(BaseClass, LogMixin):
Returns:
models.Run | List[models.Run]: Run(s) of interest
"""
# from ... import RunReagentAssociation
# NOTE: if you go back to using 'model' change the appropriate cls to model in the query filters
# if submissiontype is not None:
# model = cls.find_polymorphic_subclass(polymorphic_identity=submissiontype)
# elif len(kwargs) > 0:
# # NOTE: find the subclass containing the relevant attributes
# model = cls.find_polymorphic_subclass(attrs=kwargs)
# else:
# model = cls
query: Query = cls.__database_session__.query(cls)
if start_date is not None and end_date is None:
logger.warning(f"Start date with no end date, using today.")
@@ -1107,38 +1003,8 @@ class Run(BaseClass, LogMixin):
start_date = cls.__database_session__.query(cls, func.min(cls.submitted_date)).first()[1]
logger.warning(f"End date with no start date, using first procedure date: {start_date}")
if start_date is not None:
# match start_date:
# case date():
# pass
# case datetime():
# start_date = start_date.date()
# case int():
# start_date = datetime.fromordinal(
# datetime(1900, 1, 1).toordinal() + start_date - 2).date()
# case _:
# start_date = parse(start_date).date()
# # start_date = start_date.strftime("%Y-%m-%d")
# match end_date:
# case date():
# pass
# case datetime():
# end_date = end_date # + timedelta(days=1)
# # pass
# case int():
# end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date() # \
# # + timedelta(days=1)
# case _:
# end_date = parse(end_date).date() # + timedelta(days=1)
# # end_date = end_date.strftime("%Y-%m-%d")
# start_date = datetime.combine(start_date, datetime.min.time()).strftime("%Y-%m-%d %H:%M:%S.%f")
# end_date = datetime.combine(end_date, datetime.max.time()).strftime("%Y-%m-%d %H:%M:%S.%f")
# # if start_date == end_date:
# # start_date = start_date.strftime("%Y-%m-%d %H:%M:%S.%f")
# # query = query.filter(model.submitted_date == start_date)
# # else:
start_date = cls.rectify_query_date(start_date)
end_date = cls.rectify_query_date(end_date, eod=True)
logger.debug(f"Start date: {start_date}, end date: {end_date}")
query = query.join(ClientSubmission).filter(ClientSubmission.submitted_date.between(start_date, end_date))
# NOTE: by rsl number (returns only a single value)
match name:
@@ -1164,7 +1030,6 @@ class Run(BaseClass, LogMixin):
limit = 1
case _:
pass
# query = query.order_by(cls.submitted_date.desc())
# NOTE: Split query results into pages of size {page_size}
if page_size > 0:
query = query.limit(page_size)
@@ -1173,58 +1038,6 @@ class Run(BaseClass, LogMixin):
query = query.offset(page * page_size)
return cls.execute_query(query=query, limit=limit, **kwargs)
# @classmethod
# def query_or_create(cls, submissiontype: str | SubmissionType | None = None, **kwargs) -> Run:
# """
# Returns object from db if exists, else, creates new. Due to need for user input, doesn't see much use ATM.
#
# Args:
# submissiontype (str | SubmissionType | None, optional): Submission type to be created. Defaults to None.
#
# Raises:
# ValueError: Raised if no kwargs passed.
# ValueError: Raised if disallowed key is passed.
#
# Returns:
# cls: A Run subclass instance.
# """
# code = 0
# msg = ""
# report = Report()
# disallowed = ["id"]
# if kwargs == {}:
# raise ValueError("Need to narrow down query or the first available instance will be returned.")
# sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
# instance = cls.query(submissiontype=submissiontype, limit=1, **sanitized_kwargs)
# if instance is None:
# used_class = cls.find_polymorphic_subclass(attrs=kwargs, polymorphic_identity=submissiontype)
# instance = used_class(**sanitized_kwargs)
# match submissiontype:
# case str():
# submissiontype = SubmissionType.query(name=submissiontype)
# case _:
# pass
# instance.proceduretype = submissiontype
# instance.submissiontype_name = submissiontype.name
# if "submitted_date" not in kwargs.keys():
# instance.submitted_date = date.today()
# else:
# from frontend.widgets.pop_ups import QuestionAsker
# logger.warning(f"Found existing instance: {instance}, asking to overwrite.")
# # code = 1
# # msg = "This procedure already exists.\nWould you like to overwrite?"
# # report.add_result(Result(msg=msg, code=code))
# dlg = QuestionAsker(title="Overwrite?",
# message="This procedure already exists.\nWould you like to overwrite?")
# if dlg.exec():
# pass
# else:
# code = 1
# msg = "This procedure already exists.\nWould you like to overwrite?"
# report.add_result(Result(msg=msg, code=code))
# return None, report
# return instance, report
# NOTE: Custom context events for the ui
@property
@@ -1237,18 +1050,15 @@ class Run(BaseClass, LogMixin):
"""
names = ["Add Procedure", "Edit", "Export", "Add Comment", "Show Details", "Delete"]
output = {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names}
logger.debug(output)
return output
def add_procedure(self, obj, proceduretype_name: str):
from frontend.widgets.procedure_creation import ProcedureCreation
procedure_type: ProcedureType = next(
(proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name))
logger.debug(f"Got ProcedureType: {procedure_type}")
dlg = ProcedureCreation(parent=obj, procedure=procedure_type.construct_dummy_procedure(run=self))
if dlg.exec():
sql, _ = dlg.return_sql(new=True)
# sys.exit(pformat(sql.__dict__))
sql.save()
obj.set_data()
@@ -1282,18 +1092,6 @@ class Run(BaseClass, LogMixin):
except AttributeError:
logger.error("App will not refresh data at this time.")
# def show_details(self, obj):
# """
# Creates Widget for showing procedure details.
#
# Args:
# obj (Widget): Parent widget
# """
# from frontend.widgets.submission_details import SubmissionDetails
# dlg = SubmissionDetails(parent=obj, sub=self)
# if dlg.exec():
# pass
def edit(self, obj):
"""
Return procedure to form widget for updating
@@ -1315,7 +1113,6 @@ class Run(BaseClass, LogMixin):
Args:
obj (_type_): parent widget
"""
logger.debug(obj)
from frontend.widgets.submission_details import SubmissionComment
dlg = SubmissionComment(parent=obj, submission=self)
if dlg.exec():
@@ -1437,8 +1234,6 @@ class Run(BaseClass, LogMixin):
unranked_samples.append(sample)
possible_ranks = (item for item in list(plate_dict.keys()) if
item not in [sample['submission_rank'] for sample in ranked_samples])
# logger.debug(possible_ranks)
# possible_ranks = (plate_dict[idx] for idx in possible_ranks)
for sample in unranked_samples:
try:
submission_rank = next(possible_ranks)
@@ -1457,17 +1252,9 @@ class Run(BaseClass, LogMixin):
background_color="#ffffff", enabled=False)
)
padded_list.append(sample)
# logger.debug(f"Final padded list:\n{pformat(list(sorted(padded_list, key=itemgetter('submission_rank'))))}")
return list(sorted(padded_list, key=itemgetter('submission_rank')))
# class SampleType(BaseClass):
# id = Column(INTEGER, primary_key=True) #: primary key
# name = Column(String(64), nullable=False, unique=True) #: identification from submitter
#
# sample = relationship("Sample", back_populates="sampletype", uselist=True)
# NOTE: Sample Classes
class Sample(BaseClass, LogMixin):
@@ -1477,11 +1264,7 @@ class Sample(BaseClass, LogMixin):
id = Column(INTEGER, primary_key=True) #: primary key
sample_id = Column(String(64), nullable=False, unique=True) #: identification from submitter
# sampletype_id = Column(INTEGER, ForeignKey("_sampletype.id", ondelete="SET NULL",
# name="fk_SAMP_sampletype_id"))
# sampletype = relationship("SampleType", back_populates="sample")
# misc_info = Column(JSON)
control = relationship("Control", back_populates="sample", uselist=False)
control = relationship("Control", back_populates="sample", uselist=False) #: Control function this sample fills.
sampleclientsubmissionassociation = relationship(
"ClientSubmissionSampleAssociation",
@@ -1529,13 +1312,8 @@ class Sample(BaseClass, LogMixin):
Returns:
dict: submitter id and sample type and linked procedure if full data
"""
# try:
# sample_type = self.sampletype.name
# except AttributeError:
# sample_type = "NA"
sample = dict(
sample_id=self.sample_id
# sampletype=sample_type
)
if full_data:
sample['clientsubmission'] = sorted([item.to_sub_dict() for item in self.sampleclientsubmissionassociation],
@@ -1563,7 +1341,6 @@ class Sample(BaseClass, LogMixin):
@setup_lookup
def query(cls,
sample_id: str | None = None,
# sampletype: str | SampleType | None = None,
limit: int = 0,
**kwargs
) -> Sample | List[Sample]:
@@ -1578,13 +1355,6 @@ class Sample(BaseClass, LogMixin):
models.Sample|List[models.Sample]: Sample(s) of interest.
"""
query = cls.__database_session__.query(cls)
# match sampletype:
# case str():
# query = query.join(SampleType).filter(SampleType.name == sampletype)
# case SampleType():
# query = query.filter(cls.sampletype == sampletype)
# case _:
# pass
match sample_id:
case str():
query = query.filter(cls.sample_id == sample_id)
@@ -1593,38 +1363,6 @@ class Sample(BaseClass, LogMixin):
pass
return cls.execute_query(query=query, limit=limit, **kwargs)
# @classmethod
# def fuzzy_search(cls,
# sampletype: str | Sample | None = None,
# **kwargs
# ) -> List[Sample]:
# """
# Allows for fuzzy search of sample.
#
# Args:
# sampletype (str | BasicSample | None, optional): Type of sample. Defaults to None.
#
# Returns:
# List[Sample]: List of sample that match kwarg search parameters.
# """
# query: Query = cls.__database_session__.query(cls)
# match sampletype:
# case str():
# query = query.join(SampleType).filter(SampleType.name == sampletype)
# case SampleType():
# query = query.filter(cls.sampletype == sampletype)
# case _:
# pass
# for k, v in kwargs.items():
# search = f"%{v}%"
# try:
# attr = getattr(cls, k)
# # NOTE: the secret sauce is in attr.like
# query = query.filter(attr.like(search))
# except (ArgumentError, AttributeError) as e:
# logger.error(f"Attribute {k} unavailable due to:\n\t{e}\nSkipping.")
# return query.limit(50).all()
def delete(self):
raise AttributeError(f"Delete not implemented for {self.__class__}")
@@ -1686,12 +1424,9 @@ class ClientSubmissionSampleAssociation(BaseClass):
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
# id = Column(INTEGER, unique=True, nullable=False, autoincrement=True) #: id to be used for inheriting purposes
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample
clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id"),
primary_key=True) #: id of associated procedure
# row = Column(INTEGER)
# column = Column(INTEGER)
primary_key=True) #: id of associated client submission
submission_rank = Column(INTEGER, primary_key=True, default=0) #: Location in sample list
# NOTE: reference to the Submission object
clientsubmission = relationship("ClientSubmission",
@@ -1708,10 +1443,6 @@ class ClientSubmissionSampleAssociation(BaseClass):
self.row = row
self.column = column
self.submission_rank = submission_rank
# if id is not None:
# self.id = id
# else:
# self.id = self.__class__.autoincrement_id()
for k, v in kwargs.items():
try:
self.__setattr__(k, v)
@@ -1735,13 +1466,6 @@ class ClientSubmissionSampleAssociation(BaseClass):
# NOTE: Get associated sample info
sample = self.sample.to_sub_dict()
sample['sample_id'] = self.sample.sample_id
# sample['row'] = self.row
# sample['column'] = self.column
# try:
# sample['well'] = f"{row_map[self.row]}{self.column}"
# except (KeyError, AttributeError) as e:
# logger.error(f"Unable to find row {self.row} in row_map.")
# sample['Well'] = None
sample['plate_name'] = self.clientsubmission.submitter_plate_id
sample['positive'] = False
sample['submitted_date'] = self.clientsubmission.submitted_date
@@ -1752,10 +1476,8 @@ class ClientSubmissionSampleAssociation(BaseClass):
output = super().details_dict()
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']}
# logger.debug(f"Relevant info from assoc output: {pformat(relevant)}")
output = output['sample'].details_dict()
misc = output['misc_info']
# # logger.debug(f"Output from sample: {pformat(output)}")
output.update(relevant)
output['misc_info'] = misc
return output
@@ -1798,48 +1520,6 @@ class ClientSubmissionSampleAssociation(BaseClass):
sample.update(dict(Name=self.sample.sample_id[:10], tooltip=tooltip_text, background_color=background))
return sample
# @classmethod
# def autoincrement_id(cls) -> int:
# """
# Increments the association id automatically
#
# Returns:
# int: incremented id
# """
# if cls.__name__ == "ClientSubmissionSampleAssociation":
# model = cls
# else:
# model = next((base for base in cls.__bases__ if base.__name__ == "ClientSubmissionSampleAssociation"),
# ClientSubmissionSampleAssociation)
# try:
# return max([item.id for item in model.query()]) + 1
# except ValueError as e:
# logger.error(f"Problem incrementing id: {e}")
# return 1
# @classmethod
# def find_polymorphic_subclass(cls, polymorphic_identity: str | None = None) -> ClientSubmissionSampleAssociation:
# """
# Retrieves subclasses of ClientSubmissionSampleAssociation based on type name.
#
# Args:
# polymorphic_identity (str | None, optional): Name of subclass fed to polymorphic identity. Defaults to None.
#
# Returns:
# ClientSubmissionSampleAssociation: Subclass of interest.
# """
# if isinstance(polymorphic_identity, dict):
# polymorphic_identity = polymorphic_identity['value']
# if polymorphic_identity is None:
# model = cls
# else:
# try:
# model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
# except Exception as e:
# logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
# model = cls
# return model
@classmethod
@setup_lookup
def query(cls,
@@ -1857,12 +1537,14 @@ class ClientSubmissionSampleAssociation(BaseClass):
Lookup junction of Submission and Sample in the database
Args:
run (models.Run | str | None, optional): Submission of interest. Defaults to None.
clientsubmission (models.ClientSubmission | str | None, optional): Submission of interest. Defaults to None.
exclude_submission_type ( str | None, optional): Name of submissiontype to exclude. Defaults to None.
sample (models.Sample | str | None, optional): Sample of interest. Defaults to None.
row (int, optional): Row of the sample location on procedure plate. Defaults to 0.
column (int, optional): Column of the sample location on the procedure plate. Defaults to 0.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
chronologic (bool, optional): Return results in chronologic order. Defaults to False.
reverse (bool, optional): Whether or not to reverse order of list. Defaults to False.
Returns:
models.ClientSubmissionSampleAssociation|List[models.ClientSubmissionSampleAssociation]: Junction(s) of interest
@@ -1960,12 +1642,8 @@ class RunSampleAssociation(BaseClass):
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
# id = Column(INTEGER, unique=True, nullable=False) #: id to be used for inheriting purposes
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample
run_id = Column(INTEGER, ForeignKey("_run.id"), primary_key=True) #: id of associated procedure
# row = Column(INTEGER) #: row on the 96 well plate
# column = Column(INTEGER) #: column on the 96 well plate
# misc_info = Column(JSON)
# NOTE: reference to the Submission object
@@ -2003,13 +1681,6 @@ class RunSampleAssociation(BaseClass):
# NOTE: Get associated sample info
sample = self.sample.to_sub_dict()
sample['name'] = self.sample.sample_id
# sample['row'] = self.row
# sample['column'] = self.column
# try:
# sample['well'] = f"{row_map[self.row]}{self.column}"
# except KeyError as e:
# logger.error(f"Unable to find row {self.row} in row_map.")
# sample['Well'] = None
sample['plate_name'] = self.run.rsl_plate_number
sample['positive'] = False
return sample
@@ -2070,11 +1741,13 @@ class RunSampleAssociation(BaseClass):
Args:
run (models.Run | str | None, optional): Submission of interest. Defaults to None.
exclude_submission_type ( str | None, optional): Name of submissiontype to exclude. Defaults to None.
sample (models.Sample | str | None, optional): Sample of interest. Defaults to None.
row (int, optional): Row of the sample location on procedure plate. Defaults to 0.
column (int, optional): Column of the sample location on the procedure plate. Defaults to 0.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
chronologic (bool, optional): Return results in chronologic order. Defaults to False.
reverse (bool, optional): Whether or not to reverse order of list. Defaults to False.
Returns:
models.ClientSubmissionSampleAssociation|List[models.ClientSubmissionSampleAssociation]: Junction(s) of interest
@@ -2169,13 +1842,10 @@ class RunSampleAssociation(BaseClass):
output = super().details_dict()
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']}
# logger.debug(f"Relevant info from assoc output: {pformat(relevant)}")
output = output['sample'].details_dict()
misc = output['misc_info']
# logger.debug(f"Output from sample: {pformat(output)}")
output.update(relevant)
output['misc_info'] = misc
return output
@@ -2192,7 +1862,7 @@ class ProcedureSampleAssociation(BaseClass):
sample = relationship(Sample, back_populates="sampleprocedureassociation") #: associated equipment
results = relationship("Results", back_populates="sampleprocedureassociation")
results = relationship("Results", back_populates="sampleprocedureassociation") #: associated results
@classmethod
def query(cls, sample: Sample | str | None = None, procedure: Procedure | str | None = None, limit: int = 0,
@@ -2242,9 +1912,6 @@ class ProcedureSampleAssociation(BaseClass):
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']}
output = output['sample'].details_dict()
logger.debug(f"Output: {pformat(output)}")
logger.debug(f"Relevant: {pformat(relevant)}")
# relevant['submission_rank'] = output['misc_info']['submission_rank']
misc = output['misc_info']
output.update(relevant)
output['misc_info'] = misc

View File

@@ -1244,7 +1244,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
name: dict = Field(default=dict(value="NA", missing=True), validate_default=True)
technician: dict = Field(default=dict(value="NA", missing=True))
repeat: bool = Field(default=False)
repeat_of: str | None = Field(default=None)
repeat_of: Procedure | None = Field(default=None)
# kittype: dict = Field(default=dict(value="NA", missing=True))
# possible_kits: list | None = Field(default=[], validate_default=True)
plate_map: str | None = Field(default=None)
@@ -1485,8 +1485,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
if sql.repeat:
regex = re.compile(r".*\dR\d$")
repeats = [item for item in self.run.procedure if
self.repeat_of in item.name and bool(regex.match(item.name))]
sql.name = f"{self.repeat_of}R{str(len(repeats) + 1)}"
self.repeat_of.name in item.name and bool(regex.match(item.name))]
sql.name = f"{self.repeat_of.name}-R{str(len(repeats) + 1)}"
sql.repeat_of = self.repeat_of
sql.started_date = datetime.now()
if self.run:

View File

@@ -120,6 +120,10 @@ class ProcedureCreation(QDialog):
match key:
case "rsl_plate_num":
setattr(self.procedure.run, key, new_value)
case "repeat_of":
from backend.db.models import Procedure
parent = Procedure.query(name=new_value, limit=1)
self.procedure.repeat_of = parent
case _:
attribute = getattr(self.procedure, key)
match attribute:
@@ -128,6 +132,7 @@ class ProcedureCreation(QDialog):
case _:
setattr(self.procedure, key, new_value.strip('\"'))
logger.debug(f"Set value for {key}: {getattr(self.procedure, key)}")
# sys.exit()