""" All control related models. """ from __future__ import annotations from pprint import pformat from PyQt6.QtWidgets import QWidget, QCheckBox, QLabel from pandas import DataFrame from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, case, FLOAT from sqlalchemy.orm import relationship, Query, validates import logging, re from operator import itemgetter from . import BaseClass from tools import setup_lookup, report_result, Result, Report, Settings, get_unique_values_in_df_column, super_splitter from datetime import date, datetime, timedelta from typing import List, Literal, Tuple, Generator from dateutil.parser import parse from re import Pattern logger = logging.getLogger(f"submissions.{__name__}") class ControlType(BaseClass): """ Base class of a control archetype. """ id = Column(INTEGER, primary_key=True) #: primary key name = Column(String(255), unique=True) #: controltype name (e.g. Irida Control) targets = Column(JSON) #: organisms checked for instances = relationship("Control", back_populates="controltype") #: control samples created of this type. def __repr__(self) -> str: return f"" @classmethod @setup_lookup def query(cls, name: str = None, limit: int = 0 ) -> ControlType | List[ControlType]: """ Lookup control archetypes in the database Args: name (str, optional): Name of the desired controltype. Defaults to None. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. Returns: ControlType | List[ControlType]: Single result if the limit = 1, else a list. """ query = cls.__database_session__.query(cls) match name: case str(): query = query.filter(cls.name == name) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) def get_modes(self, mode: Literal['kraken', 'matches', 'contains']) -> List[str]: """ Get subtypes associated with this controltype (currently used only for Kraken) Args: mode (str): analysis mode sub_type Returns: List[str]: list of subtypes available """ if not self.instances: return # NOTE: Get first instance since all should have same subtypes # NOTE: Get mode of instance jsoner = getattr(self.instances[0], mode) try: # NOTE: Pick genera (all should have same subtypes) genera = list(jsoner.keys())[0] except IndexError: return [] # NOTE subtypes now created for all modes, but ignored for all but allowed_for_subtyping later in the ControlsChart subtypes = sorted(list(jsoner[genera].keys()), reverse=True) return subtypes @property def instance_class(self) -> Control: """ Retrieves the Control class associated with this controltype Returns: Control: Associated Control class """ return Control.find_polymorphic_subclass(polymorphic_identity=self.name) @classmethod def get_positive_control_types(cls, control_type: str) -> Generator[str, None, None]: """ Gets list of Control types if they have targets Returns: Generator[str, None, None]: Control types that have targets """ ct = cls.query(name=control_type).targets return (k for k, v in ct.items() if v) @classmethod def build_positive_regex(cls, control_type:str) -> Pattern: """ Creates a re.Pattern that will look for positive control types Returns: Pattern: Constructed pattern """ strings = list(set([super_splitter(item, "-", 0) for item in cls.get_positive_control_types(control_type)])) return re.compile(rf"(^{'|^'.join(strings)})-.*", flags=re.IGNORECASE) class Control(BaseClass): """ Base class of a control sample. """ id = Column(INTEGER, primary_key=True) #: primary key controltype_name = Column(String, ForeignKey("_controltype.name", ondelete="SET NULL", name="fk_BC_subtype_name")) #: name of joined submission type controltype = relationship("ControlType", back_populates="instances", foreign_keys=[controltype_name]) #: reference to parent control type name = Column(String(255), unique=True) #: Sample ID submitted_date = Column(TIMESTAMP) #: Date submitted to Robotics submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id")) #: parent submission id submission = relationship("BasicSubmission", back_populates="controls", foreign_keys=[submission_id]) #: parent submission __mapper_args__ = { "polymorphic_identity": "Basic Control", "polymorphic_on": case( (controltype_name == "PCR Control", "PCR Control"), (controltype_name == "Irida Control", "Irida Control"), else_="Basic Control" ), "with_polymorphic": "*", } def __repr__(self) -> str: return f"<{self.controltype_name}({self.name})>" @classmethod @setup_lookup def query(cls, submissiontype: str | None = None, subtype: str | None = None, start_date: date | str | int | None = None, end_date: date | str | int | None = None, name: str | None = None, limit: int = 0, **kwargs ) -> Control | List[Control]: """ Lookup control objects in the database based on a number of parameters. Args: submission_type (str | None, optional): Submission type associated with control. Defaults to None. subtype (str | None, optional): Control subtype, eg IridaControl. Defaults to None. start_date (date | str | int | None, optional): Beginning date to search by. Defaults to 2023-01-01 if end_date not None. end_date (date | str | int | None, optional): End date to search by. Defaults to today if start_date not None. name (str | None, optional): Name of control. Defaults to None. limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0. Returns: Control|List[Control]: Control object of interest. """ from backend.db import SubmissionType query: Query = cls.__database_session__.query(cls) match submissiontype: case str(): from backend import BasicSubmission, SubmissionType query = query.join(BasicSubmission).join(SubmissionType).filter(SubmissionType.name == submissiontype) case SubmissionType(): from backend import BasicSubmission query = query.join(BasicSubmission).filter(BasicSubmission.submission_type_name == submissiontype.name) case _: pass # NOTE: by control type match subtype: case str(): if cls.__name__ == "Control": raise ValueError(f"Cannot query base class Control with subtype.") elif cls.__name__ == "IridaControl": query = query.filter(cls.subtype == subtype) else: try: query = query.filter(cls.subtype == subtype) except AttributeError as e: logger.error(e) case _: pass # NOTE: by date range if start_date is not None and end_date is None: logger.warning(f"Start date with no end date, using today.") end_date = date.today() if end_date is not None and start_date is None: logger.warning(f"End date with no start date, using 90 days ago.") start_date = date.today() - timedelta(days=90) if start_date is not None: match start_date: case date(): start_date = start_date.strftime("%Y-%m-%d") case int(): start_date = datetime.fromordinal( datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") case _: start_date = parse(start_date).strftime("%Y-%m-%d") match end_date: case date(): end_date = end_date.strftime("%Y-%m-%d") case int(): end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime( "%Y-%m-%d") case _: end_date = parse(end_date).strftime("%Y-%m-%d") query = query.filter(cls.submitted_date.between(start_date, end_date)) match name: case str(): query = query.filter(cls.name.startswith(name)) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) @classmethod def find_polymorphic_subclass(cls, polymorphic_identity: str | ControlType | None = None, attrs: dict | None = None) -> Control: """ Find subclass based on polymorphic identity or relevant attributes. Args: polymorphic_identity (str | None, optional): String representing polymorphic identity. Defaults to None. attrs (str | SubmissionType | None, optional): Attributes of the relevant class. Defaults to None. Returns: Control: Subclass of interest. """ if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] model = cls match polymorphic_identity: case str(): try: model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_ except Exception as e: logger.error( f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}, falling back to BasicSubmission") case ControlType(): try: model = cls.__mapper__.polymorphic_map[polymorphic_identity.name].class_ except Exception as e: logger.error( f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}, falling back to BasicSubmission") case _: pass # NOTE: if attrs passed in and this cls doesn't have all attributes in attr if attrs and any([not hasattr(cls, attr) for attr in attrs.keys()]): # NOTE: looks for first model that has all included kwargs try: model = next(subclass for subclass in cls.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs.keys()])) except StopIteration as e: raise AttributeError( f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs.keys())}") return model @classmethod def make_parent_buttons(cls, parent: QWidget) -> None: """ Super that will make buttons in a CustomFigure. Made to be overridden. Args: parent (QWidget): chart holding widget to add buttons to. Returns: None: Child methods will return things. """ return None @classmethod def make_chart(cls, parent, chart_settings: dict, ctx) -> Tuple[Report, "CustomFigure" | None]: """ Dummy operation to be overridden by child classes. Args: chart_settings (dict): settings passed down from chart widget ctx (Settings): settings passed down from gui """ return Report(), None def delete(self): self.__database_session__.delete(self) self.__database_session__.commit() class PCRControl(Control): """ Class made to hold info from Design & Analysis software. """ id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True) subtype = Column(String(16)) #: PC or NC target = Column(String(16)) #: N1, N2, etc. ct = Column(FLOAT) #: PCR result reagent_lot = Column(String(64), ForeignKey("_reagent.lot", ondelete="SET NULL", name="fk_reagent_lot")) reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control __mapper_args__ = dict(polymorphic_identity="PCR Control", polymorphic_load="inline", inherit_condition=(id == Control.id)) def to_sub_dict(self) -> dict: """ Creates dictionary of fields for this object. Returns: dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date """ return dict( name=self.name, ct=self.ct, subtype=self.subtype, target=self.target, reagent_lot=self.reagent_lot, submitted_date=self.submitted_date.date() ) @classmethod @report_result def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]: """ Creates a PCRFigure. Overrides parent Args: parent (__type__): Widget to contain the chart. chart_settings (dict): settings passed down from chart widget ctx (Settings): settings passed down from gui. Not used here. Returns: Tuple[Report, "PCRFigure"]: Report of status and resulting figure. """ from frontend.visualizations.pcr_charts import PCRFigure parent.mode_typer.clear() parent.mode_typer.setEnabled(False) report = Report() controls = cls.query(submissiontype=chart_settings['sub_type'], start_date=chart_settings['start_date'], end_date=chart_settings['end_date']) data = [control.to_sub_dict() for control in controls] df = DataFrame.from_records(data) try: df = df[df.ct > 0.0] except AttributeError: df = df fig = PCRFigure(df=df, modes=[], settings=chart_settings) return report, fig def to_pydantic(self): from backend.validators import PydPCRControl return PydPCRControl(**self.to_sub_dict(), controltype_name=self.controltype_name, submission_id=self.submission_id) class IridaControl(Control): subtyping_allowed = ['kraken'] id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True) contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism kraken = Column(JSON) #: unstructured output from kraken_report subtype = Column(String(16), nullable=False) #: EN-NOS, MCS-NOS, etc refseq_version = Column(String(16)) #: version of refseq used in fastq parsing kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing kraken2_db_version = Column(String(32)) #: folder name of kraken2 db sample = relationship("BacterialCultureSample", back_populates="control") #: This control's submission sample sample_id = Column(INTEGER, ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key __mapper_args__ = dict(polymorphic_identity="Irida Control", polymorphic_load="inline", inherit_condition=(id == Control.id)) @validates("subtype") def enforce_subtype_literals(self, key: str, value: str) -> str: """ Validates sub_type field with acceptable values Args: key (str): Field name value (str): Field Value Raises: KeyError: Raised if value is not in the acceptable list. Returns: str: Validated string. """ acceptables = ['ATCC49226', 'ATCC49619', 'EN-NOS', "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"] if value.upper() not in acceptables: raise KeyError(f"Sub-type must be in {acceptables}") return value def to_sub_dict(self) -> dict: """ Converts object into convenient dictionary for use in submission summary Returns: dict: output dictionary containing: Name, Type, Targets, Top Kraken results """ try: kraken = self.kraken except TypeError: kraken = {} kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()]) new_kraken = [dict(name=item, kraken_count=kraken[item]['kraken_count'], kraken_percent=f"{kraken[item]['kraken_count'] / kraken_cnt_total:0.2%}", target=item in self.controltype.targets) for item in kraken] new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True) if self.controltype.targets: targets = self.controltype.targets else: targets = ["None"] output = dict( name=self.name, type=self.controltype.name, targets=", ".join(targets), kraken=new_kraken[0:10] ) return output def convert_by_mode(self, control_sub_type: str, mode: Literal['kraken', 'matches', 'contains'], consolidate: bool = False) -> Generator[dict, None, None]: """ split this instance into analysis types ('kraken', 'matches', 'contains') for controls graphs Args: consolidate (bool): whether to merge all off-target genera. Defaults to False control_sub_type (str): control subtype, 'MCS-NOS', etc. mode (Literal['kraken', 'matches', 'contains']): analysis type, 'contains', etc. Returns: List[dict]: list of records """ try: data = self.__getattribute__(mode) except TypeError: data = {} if data is None: data = {} # NOTE: Data truncation and consolidation. if "kraken" in mode: data = {k: v for k, v in sorted(data.items(), key=lambda d: d[1][f"{mode}_count"], reverse=True)[:50]} else: if consolidate: on_tar = {k: v for k, v in data.items() if k.strip("*") in self.controltype.targets[control_sub_type]} off_tar = sum(v[f'{mode}_ratio'] for k, v in data.items() if k.strip("*") not in self.controltype.targets[control_sub_type]) on_tar['Off-target'] = {f"{mode}_ratio": off_tar} data = on_tar for genus in data: _dict = dict( name=self.name, submitted_date=self.submitted_date, genus=genus, target='Target' if genus.strip("*") in self.controltype.targets[control_sub_type] else "Off-target" ) for key in data[genus]: _dict[key] = data[genus][key] yield _dict @classproperty def modes(cls) -> List[str]: """ Get all control modes from database Returns: List[str]: List of control mode names. """ try: cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)] except AttributeError as e: logger.error(f"Failed to get available modes from db: {e}") cols = [] return cols @classmethod def make_parent_buttons(cls, parent: QWidget) -> None: """ Creates buttons for controlling Args: parent (QWidget): chart holding widget to add buttons to. """ super().make_parent_buttons(parent=parent) rows = parent.layout.rowCount() - 2 # NOTE: check box for consolidating off-target items checker = QCheckBox(parent) checker.setChecked(True) checker.setObjectName("irida_check") checker.setToolTip("Pools off-target genera to save time.") parent.layout.addWidget(QLabel("Consolidate Off-targets"), rows, 0, 1, 1) parent.layout.addWidget(checker, rows, 1, 1, 2) checker.checkStateChanged.connect(parent.update_data) @classmethod @report_result def make_chart(cls, chart_settings: dict, parent, ctx) -> Tuple[Report, "IridaFigure" | None]: """ Creates a IridaFigure. Overrides parent Args: parent (__type__): Widget to contain the chart. chart_settings (dict): settings passed down from chart widget ctx (Settings): settings passed down from gui. Returns: Tuple[Report, "IridaFigure"]: Report of status and resulting figure. """ from frontend.visualizations import IridaFigure try: checker = parent.findChild(QCheckBox, name="irida_check") if chart_settings['mode'] == "kraken": checker.setEnabled(False) checker.setChecked(False) else: checker.setEnabled(True) consolidate = checker.isChecked() except AttributeError: consolidate = False report = Report() controls = cls.query(subtype=chart_settings['sub_type'], start_date=chart_settings['start_date'], end_date=chart_settings['end_date']) if not controls: report.add_result(Result(status="Critical", msg="No controls found in given date range.")) return report, None # NOTE: change each control to list of dictionaries data = [control.convert_by_mode(control_sub_type=chart_settings['sub_type'], mode=chart_settings['mode'], consolidate=consolidate) for control in controls] # NOTE: flatten data to one dimensional list data = [item for sublist in data for item in sublist] if not data: report.add_result(Result(status="Critical", msg="No data found for controls in given date range.")) return report, None df = cls.convert_data_list_to_df(input_df=data, sub_mode=chart_settings['sub_mode']) if chart_settings['sub_mode'] is None: title = chart_settings['sub_mode'] else: title = f"{chart_settings['mode']} - {chart_settings['sub_mode']}" # NOTE: send dataframe to chart maker df, modes = cls.prep_df(ctx=ctx, df=df) fig = IridaFigure(df=df, ytitle=title, modes=modes, parent=parent, settings=chart_settings) return report, fig @classmethod def convert_data_list_to_df(cls, input_df: list[dict], sub_mode) -> DataFrame: """ Convert list of control records to dataframe Args: input_df (list[dict]): list of dictionaries containing records sub_mode (str | None, optional): sub_type of submission type. Defaults to None. Returns: DataFrame: dataframe of controls """ df = DataFrame.from_records(input_df) safe = ['name', 'submitted_date', 'genus', 'target'] for column in df.columns: if column not in safe: if sub_mode is not None and column != sub_mode: continue else: safe.append(column) if "percent" in column: try: count_col = next(item for item in df.columns if "count" in item) except StopIteration: continue # NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating. df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum') df = df[[c for c in df.columns if c in safe]] # NOTE: move date of sample submitted on same date as previous ahead one. df = cls.displace_date(df=df) # NOTE: ad hoc method to make data labels more accurate. df = cls.df_column_renamer(df=df) return df @classmethod def df_column_renamer(cls, df: DataFrame) -> DataFrame: """ Ad hoc function I created to clarify some fields Args: df (DataFrame): input dataframe Returns: DataFrame: dataframe with 'clarified' column names """ df = df[df.columns.drop(list(df.filter(regex='_hashes')))] return df.rename(columns={ "contains_ratio": "contains_shared_hashes_ratio", "matches_ratio": "matches_shared_hashes_ratio", "kraken_count": "kraken2_read_count_(top_50)", "kraken_percent": "kraken2_read_percent_(top_50)" }) @classmethod def displace_date(cls, df: DataFrame) -> DataFrame: """ This function serves to split samples that were submitted on the same date by incrementing dates. It will shift the date forward by one day if it is the same day as an existing date in a list. Args: df (DataFrame): input dataframe composed of control records Returns: DataFrame: output dataframe with dates incremented. """ # NOTE: get submitted dates for each control dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in sorted(df['name'].unique())] previous_dates = set() for item in dict_list: df, previous_dates = cls.check_date(df=df, item=item, previous_dates=previous_dates) return df @classmethod def check_date(cls, df: DataFrame, item: dict, previous_dates: set) -> Tuple[DataFrame, list]: """ Checks if an items date is already present in df and adjusts df accordingly Args: df (DataFrame): input dataframe item (dict): control for checking previous_dates (list): list of dates found in previous controls Returns: Tuple[DataFrame, list]: Output dataframe and appended list of previous dates """ try: check = item['date'] in previous_dates except IndexError: check = False previous_dates.add(item['date']) if check: # NOTE: get df locations where name == item name mask = df['name'] == item['name'] # NOTE: increment date in dataframe df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1)) item['date'] += timedelta(days=1) passed = False else: passed = True # NOTE: if run didn't lead to changed date, return values if passed: return df, previous_dates # NOTE: if date was changed, rerun with new date else: logger.warning(f"Date check failed, running recursion") df, previous_dates = cls.check_date(df, item, previous_dates) return df, previous_dates @classmethod def prep_df(cls, ctx: Settings, df: DataFrame) -> Tuple[DataFrame | None, list]: """ Constructs figures based on parsed pandas dataframe. Args: ctx (Settings): settings passed down from gui df (pd.DataFrame): input dataframe ytitle (str | None, optional): title for the y-axis. Defaults to None. Returns: Figure: Plotly figure """ # NOTE: converts starred genera to normal and splits off list of starred if df.empty: return None, [] df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"}) df['genera'] = [item[-1] if item and item[-1] == "*" else "" for item in df['genus'].to_list()] # NOTE: remove original runs, using reruns if applicable df = cls.drop_reruns_from_df(ctx=ctx, df=df) # NOTE: sort by and exclude from sorts = ['submitted_date', "target", "genus"] exclude = ['name', 'genera'] modes = [item for item in df.columns if item not in sorts and item not in exclude] # NOTE: Set descending for any columns that have "{mode}" in the header. ascending = [False if item == "target" else True for item in sorts] df = df.sort_values(by=sorts, ascending=ascending) # NOTE: actual chart construction is done by return df, modes @classmethod def drop_reruns_from_df(cls, ctx: Settings, df: DataFrame) -> DataFrame: """ Removes semi-duplicates from dataframe after finding sequencing repeats. Args: ctx (Settings): settings passed from gui df (DataFrame): initial dataframe Returns: DataFrame: dataframe with originals removed in favour of repeats. """ if 'rerun_regex' in ctx: sample_names = get_unique_values_in_df_column(df, column_name="name") rerun_regex = re.compile(fr"{ctx.rerun_regex}") exclude = [re.sub(rerun_regex, "", sample) for sample in sample_names if rerun_regex.search(sample)] df = df[df.name not in exclude] return df def to_pydantic(self) -> "PydIridaControl": """ Constructs a pydantic version of this object. Returns: PydIridaControl: This object as a pydantic model. """ from backend.validators import PydIridaControl return PydIridaControl(**self.__dict__)