diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b2a5d3..03b38cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 202404.05 + +- Addition of default query method using Kwargs. + ## 202404.04 - Storing of default values in db rather than hardcoded. diff --git a/TODO.md b/TODO.md index 33c910a..79e35b7 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,7 @@ +- [ ] Create a default info return function. +- [ ] Parse comment from excel sheet. - [ ] Make reporting better. -- [ ] Build master query method? +- [x] Build master query method? - Obviously there will need to be extensions, but I feel the attr method I have in Submissions could work. - [x] Fix Artic RSLNamer - [x] Put "Not applicable" reagents in to_dict() method. diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 9be93dc..74fc344 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -1,26 +1,32 @@ ''' Contains all models for sqlalchemy ''' -import sys +from __future__ import annotations +import sys, logging from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.exc import ArgumentError from typing import Any, List from pathlib import Path + # Load testing environment if 'pytest' in sys.modules: from pathlib import Path + sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__()) Base: DeclarativeMeta = declarative_base() +logger = logging.getLogger(f"submissions.{__name__}") + class BaseClass(Base): """ Abstract class to pass ctx values to all SQLAlchemy objects. """ __abstract__ = True #: Will not be added to DB - - __table_args__ = {'extend_existing': True} #: Will only add new columns + + __table_args__ = {'extend_existing': True} #: Will only add new columns @declared_attr def __tablename__(cls) -> str: @@ -29,7 +35,7 @@ class BaseClass(Base): Returns: str: lower case class name - """ + """ return f"_{cls.__name__.lower()}" @declared_attr @@ -39,7 +45,7 @@ class BaseClass(Base): Returns: Session: DB session from ctx settings. - """ + """ if not 'pytest' in sys.modules: from tools import ctx else: @@ -53,13 +59,13 @@ class BaseClass(Base): Returns: Path: Location of the Submissions directory in Settings object - """ + """ if not 'pytest' in sys.modules: from tools import ctx else: from test_settings import ctx return ctx.directory_path - + @declared_attr def __backup_path__(cls) -> Path: """ @@ -67,7 +73,7 @@ class BaseClass(Base): Returns: Path: Location of the Submissions backup directory in Settings object - """ + """ if not 'pytest' in sys.modules: from tools import ctx else: @@ -75,7 +81,25 @@ class BaseClass(Base): return ctx.backup_path @classmethod - def execute_query(cls, query: Query, limit: int = 0) -> Any | List[Any]: + def get_default_info(cls, *args) -> dict | List[str]: + dicto = dict(singles=['id']) + output = {} + for k, v in dicto.items(): + if len(args) > 0 and k not in args: + # logger.debug(f"Don't want {k}") + continue + else: + output[k] = v + if len(args) == 1: + return output[args[0]] + return output + + @classmethod + def query(cls, **kwargs): + return cls.execute_query(**kwargs) + + @classmethod + def execute_query(cls, query: Query = None, model=None, limit: int = 0, **kwargs) -> Any | List[Any]: """ Execute sqlalchemy query. @@ -85,7 +109,24 @@ class BaseClass(Base): Returns: Any | List[Any]: Single result if limit = 1 or List if other. - """ + """ + if model is None: + model = cls + if query is None: + query: Query = cls.__database_session__.query(model) + # logger.debug(f"Grabbing singles using {model.get_default_info}") + singles = model.get_default_info('singles') + logger.debug(f"Querying: {model}, singles: {singles}") + for k, v in kwargs.items(): + logger.debug(f"Using key: {k} with value: {v}") + # logger.debug(f"That key found attribute: {attr} with type: {attr}") + try: + attr = getattr(model, k) + query = query.filter(attr == v) + except (ArgumentError, AttributeError) as e: + logger.error(f"Attribute {k} available due to:\n\t{e}\nSkipping.") + if k in singles: + limit = 1 with query.session.no_autoflush: match limit: case 0: @@ -95,6 +136,10 @@ class BaseClass(Base): case _: return query.limit(limit).all() + @classmethod + def default_info_return(cls, info, *args): + return info + def save(self): """ Add the object to the database and commit diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 4b54fb9..00838c4 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -947,7 +947,8 @@ class SubmissionReagentAssociation(BaseClass): match submission: case BasicSubmission() | str(): if isinstance(submission, str): - submission = BasicSubmission.query(rsl_number=submission) + # submission = BasicSubmission.query(rsl_number=submission) + submission = BasicSubmission.query(rsl_plate_num=submission) # logger.debug(f"Lookup SubmissionReagentAssociation by submission BasicSubmission {submission}") query = query.filter(cls.submission==submission) case int(): diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 8da30ad..b799a08 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -1,6 +1,6 @@ -''' -Models for the main submission types. -''' +""" +Models for the main submission and sample types. +""" from __future__ import annotations from getpass import getuser import logging, uuid, tempfile, re, yaml, base64 @@ -23,62 +23,72 @@ from tools import check_not_nan, row_map, setup_lookup, jinja_template_loading, from datetime import datetime, date from typing import List, Any, Tuple from dateutil.parser import parse -from dateutil.parser._parser import ParserError +from dateutil.parser import ParserError from pathlib import Path from jinja2.exceptions import TemplateNotFound from jinja2 import Template logger = logging.getLogger(f"submissions.{__name__}") + class BasicSubmission(BaseClass): """ Concrete of basic submission which polymorphs into BacterialCulture and Wastewater """ - - id = Column(INTEGER, primary_key=True) #: primary key - rsl_plate_num = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012) - submitter_plate_num = Column(String(127), unique=True) #: The number given to the submission by the submitting lab - submitted_date = Column(TIMESTAMP) #: Date submission received - submitting_lab = relationship("Organization", back_populates="submissions") #: client org - submitting_lab_id = Column(INTEGER, ForeignKey("_organization.id", ondelete="SET NULL", name="fk_BS_sublab_id")) #: client lab id from _organizations - sample_count = Column(INTEGER) #: Number of samples in the submission - extraction_kit = relationship("KitType", back_populates="submissions") #: The extraction kit used - extraction_kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete="SET NULL", name="fk_BS_extkit_id")) #: id of joined extraction kit - submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", name="fk_BS_subtype_name")) #: name of joined submission type - technician = Column(String(64)) #: initials of processing tech(s) - # Move this into custom types? - reagents_id = Column(String, ForeignKey("_reagent.id", ondelete="SET NULL", name="fk_BS_reagents_id")) #: id of used reagents - extraction_info = Column(JSON) #: unstructured output from the extraction table logger. - run_cost = Column(FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kit costs at time of creation. - signed_by = Column(String(32)) #: user name of person who submitted the submission to the database. - comment = Column(JSON) #: user notes - submission_category = Column(String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name - cost_centre = Column(String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. + id = Column(INTEGER, primary_key=True) #: primary key + rsl_plate_num = Column(String(32), unique=True, nullable=False) #: RSL name (e.g. RSL-22-0012) + submitter_plate_num = Column(String(127), unique=True) #: The number given to the submission by the submitting lab + submitted_date = Column(TIMESTAMP) #: Date submission received + submitting_lab = relationship("Organization", back_populates="submissions") #: client org + submitting_lab_id = Column(INTEGER, ForeignKey("_organization.id", ondelete="SET NULL", + name="fk_BS_sublab_id")) #: client lab id from _organizations + sample_count = Column(INTEGER) #: Number of samples in the submission + extraction_kit = relationship("KitType", back_populates="submissions") #: The extraction kit used + extraction_kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete="SET NULL", + name="fk_BS_extkit_id")) #: id of joined extraction kit + submission_type_name = Column(String, ForeignKey("_submissiontype.name", ondelete="SET NULL", + name="fk_BS_subtype_name")) #: name of joined submission type + technician = Column(String(64)) #: initials of processing tech(s) + # Move this into custom types? + reagents_id = Column(String, ForeignKey("_reagent.id", ondelete="SET NULL", + name="fk_BS_reagents_id")) #: id of used reagents + extraction_info = Column(JSON) #: unstructured output from the extraction table logger. + run_cost = Column( + FLOAT(2)) #: total cost of running the plate. Set from constant and mutable kit costs at time of creation. + signed_by = Column(String(32)) #: user name of person who submitted the submission to the database. + comment = Column(JSON) #: user notes + submission_category = Column( + String(64)) #: ["Research", "Diagnostic", "Surveillance", "Validation"], else defaults to submission_type_name + cost_centre = Column( + String(64)) #: Permanent storage of used cost centre in case organization field changed in the future. submission_sample_associations = relationship( "SubmissionSampleAssociation", back_populates="submission", cascade="all, delete-orphan", - ) #: Relation to SubmissionSampleAssociation - - samples = association_proxy("submission_sample_associations", "sample") #: Association proxy to SubmissionSampleAssociation.samples + ) #: Relation to SubmissionSampleAssociation + + samples = association_proxy("submission_sample_associations", + "sample") #: Association proxy to SubmissionSampleAssociation.samples submission_reagent_associations = relationship( "SubmissionReagentAssociation", back_populates="submission", cascade="all, delete-orphan", - ) #: Relation to SubmissionReagentAssociation - - reagents = association_proxy("submission_reagent_associations", "reagent") #: Association proxy to SubmissionReagentAssociation.reagent + ) #: Relation to SubmissionReagentAssociation + + reagents = association_proxy("submission_reagent_associations", + "reagent") #: Association proxy to SubmissionReagentAssociation.reagent submission_equipment_associations = relationship( "SubmissionEquipmentAssociation", back_populates="submission", cascade="all, delete-orphan" - ) #: Relation to Equipment + ) #: Relation to Equipment - equipment = association_proxy("submission_equipment_associations", "equipment") #: Association proxy to SubmissionEquipmentAssociation.equipment + equipment = association_proxy("submission_equipment_associations", + "equipment") #: Association proxy to SubmissionEquipmentAssociation.equipment # Allows for subclassing into ex. BacterialCulture, Wastewater, etc. __mapper_args__ = { @@ -91,7 +101,7 @@ class BasicSubmission(BaseClass): """ Returns: str: Representation of this BasicSubmission - """ + """ submission_type = self.submission_type or "Basic" return f"{submission_type}Submission({self.rsl_plate_num})" @@ -105,32 +115,36 @@ class BasicSubmission(BaseClass): @classmethod def get_default_info(cls, *args): # Create defaults for all submission_types - # print(args) + parent_defs = super().get_default_info() recover = ['filepath', 'samples', 'csv', 'comment', 'equipment'] dicto = dict( - details_ignore = ['excluded', 'reagents', 'samples', - 'extraction_info', 'comment', 'barcode', - 'platemap', 'export_map', 'equipment'], - form_recover = recover, - form_ignore = ['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by'] + recover, - parser_ignore = ['samples', 'signed_by'] + cls.jsons(), - excel_ignore = [] + details_ignore=['excluded', 'reagents', 'samples', + 'extraction_info', 'comment', 'barcode', + 'platemap', 'export_map', 'equipment'], + form_recover=recover, + form_ignore=['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by', 'comment'] + recover, + parser_ignore=['samples', 'signed_by'] + cls.jsons(), + excel_ignore=[], ) - # Grab subtype specific info. + # logger.debug(dicto['singles']) + """Singles tells the query which fields to set limit to 1""" + dicto['singles'] = parent_defs['singles'] + # logger.debug(dicto['singles']) + """Grab subtype specific info.""" + output = {} + for k, v in dicto.items(): + if len(args) > 0 and k not in args: + # logger.debug(f"Don't want {k}") + continue + else: + output[k] = v st = cls.get_submission_type() if st is None: logger.error("No default info for BasicSubmission.") - return dicto + # return output else: - dicto['submission_type'] = st.name - output = {} - for k,v in dicto.items(): - if len(args) > 0 and k not in args: - # logger.debug(f"Don't want {k}") - continue - else: - output[k] = v - for k,v in st.defaults.items(): + output['submission_type'] = st.name + for k, v in st.defaults.items(): if len(args) > 0 and k not in args: # logger.debug(f"Don't want {k}") continue @@ -149,7 +163,7 @@ class BasicSubmission(BaseClass): name = cls.__mapper_args__['polymorphic_identity'] return SubmissionType.query(name=name) - def to_dict(self, full_data:bool=False, backup:bool=False, report:bool=False) -> dict: + def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: """ Constructs dictionary used in submissions summary @@ -159,7 +173,7 @@ class BasicSubmission(BaseClass): Returns: dict: dictionary used in submissions summary and details - """ + """ # get lab from nested organization object # logger.debug(f"Converting {self.rsl_plate_num} to dict...") try: @@ -196,12 +210,15 @@ class BasicSubmission(BaseClass): if full_data: logger.debug(f"Attempting reagents.") try: - reagents = [item.to_sub_dict(extraction_kit=self.extraction_kit) for item in self.submission_reagent_associations] + reagents = [item.to_sub_dict(extraction_kit=self.extraction_kit) for item in + self.submission_reagent_associations] for k in self.extraction_kit.construct_xl_map_for_use(self.submission_type): - if k == 'info': - continue - if not any([item['type']==k for item in reagents]): - reagents.append(dict(type=k, name="Not Applicable", lot="NA", expiry=date(year=1970, month=1, day=1), missing=True)) + if k == 'info': + continue + if not any([item['type'] == k for item in reagents]): + reagents.append( + dict(type=k, name="Not Applicable", lot="NA", expiry=date(year=1970, month=1, day=1), + missing=True)) except Exception as e: logger.error(f"We got an error retrieving reagents: {e}") reagents = None @@ -237,14 +254,14 @@ class BasicSubmission(BaseClass): output["Cost Centre"] = cost_centre output["Signed By"] = self.signed_by return output - + def calculate_column_count(self) -> int: """ Calculate the number of columns in this submission Returns: int: Number of unique columns. - """ + """ # logger.debug(f"Here's the samples: {self.samples}") columns = set([assoc.column for assoc in self.submission_sample_associations]) # logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}") @@ -253,14 +270,15 @@ class BasicSubmission(BaseClass): def calculate_base_cost(self): """ Calculates cost of the plate - """ + """ # Calculate number of columns based on largest column number try: cols_count_96 = self.calculate_column_count() except Exception as e: logger.error(f"Column count error: {e}") # Get kit associated with this submission - assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type][0] + assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if + item.submission_type == self.submission_type][0] logger.debug(f"Came up with association: {assoc}") # If every individual cost is 0 this is probably an old plate. if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]): @@ -270,22 +288,23 @@ class BasicSubmission(BaseClass): logger.error(f"Calculation error: {e}") else: try: - self.run_cost = assoc.constant_cost + (assoc.mutable_cost_column * cols_count_96) + (assoc.mutable_cost_sample * int(self.sample_count)) + self.run_cost = assoc.constant_cost + (assoc.mutable_cost_column * cols_count_96) + ( + assoc.mutable_cost_sample * int(self.sample_count)) except Exception as e: logger.error(f"Calculation error: {e}") self.run_cost = round(self.run_cost, 2) - + def hitpick_plate(self) -> list: """ Returns positve sample locations for plate Returns: list: list of htipick dictionaries for each sample - """ + """ output_list = [assoc.to_hitpick() for assoc in self.submission_sample_associations] return output_list - def make_plate_map(self, plate_rows:int=8, plate_columns=12) -> str: + def make_plate_map(self, plate_rows: int = 8, plate_columns=12) -> str: """ Constructs an html based plate map. @@ -296,7 +315,7 @@ class BasicSubmission(BaseClass): Returns: str: html output string. - """ + """ # logger.debug("Creating basic hitpick") sample_list = self.hitpick_plate() # logger.debug("Setting background colours") @@ -310,10 +329,10 @@ class BasicSubmission(BaseClass): sample['background_color'] = "#80cbc4" output_samples = [] # logger.debug("Setting locations.") - for column in range(1, plate_columns+1): - for row in range(1, plate_rows+1): + for column in range(1, plate_columns + 1): + for row in range(1, plate_rows + 1): try: - well = [item for item in sample_list if item['Row'] == row and item['Column']==column][0] + well = [item for item in sample_list if item['Row'] == row and item['Column'] == column][0] except IndexError: well = dict(name="", row=row, column=column, background_color="#ffffff") output_samples.append(well) @@ -321,18 +340,18 @@ class BasicSubmission(BaseClass): template = env.get_template("plate_map.html") html = template.render(samples=output_samples, PLATE_ROWS=plate_rows, PLATE_COLUMNS=plate_columns) return html + "
" - + def get_used_equipment(self) -> List[str]: """ Gets EquipmentRole names associated with this BasicSubmission Returns: List[str]: List of names - """ + """ return [item.role for item in self.submission_equipment_associations] @classmethod - def submissions_to_df(cls, submission_type:str|None=None, limit:int=0) -> pd.DataFrame: + def submissions_to_df(cls, submission_type: str | None = None, limit: int = 0) -> pd.DataFrame: """ Convert all submissions to dataframe @@ -342,7 +361,7 @@ class BasicSubmission(BaseClass): Returns: pd.DataFrame: Pandas Dataframe of all relevant submissions - """ + """ logger.debug(f"Querying Type: {submission_type}") logger.debug(f"Using limit: {limit}") # use lookup function to create list of dicts @@ -350,21 +369,22 @@ class BasicSubmission(BaseClass): logger.debug(f"Got {len(subs)} submissions.") df = pd.DataFrame.from_records(subs) # Exclude sub information - for item in ['controls', 'extraction_info', 'pcr_info', 'comment', 'comments', 'samples', 'reagents', 'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls']: + for item in ['controls', 'extraction_info', 'pcr_info', 'comment', 'comments', 'samples', 'reagents', + 'equipment', 'gel_info', 'gel_image', 'dna_core_submission_number', 'gel_controls']: try: df = df.drop(item, axis=1) except: logger.warning(f"Couldn't drop '{item}' column from submissionsheet df.") return df - def set_attribute(self, key:str, value): + def set_attribute(self, key: str, value): """ Performs custom attribute setting based on values. Args: key (str): name of attribute value (_type_): value of attribute - """ + """ match key: case "extraction_kit": # logger.debug(f"Looking up kit {value}") @@ -384,7 +404,8 @@ class BasicSubmission(BaseClass): return case "reagents": logger.debug(f"Reagents coming into SQL: {value}") - field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for reagent in value] + field_value = [reagent['value'].toSQL()[0] if isinstance(reagent, dict) else reagent.toSQL()[0] for + reagent in value] logger.debug(f"Reagents coming out of SQL: {field_value}") case "submission_type": field_value = SubmissionType.query(name=value) @@ -434,7 +455,7 @@ class BasicSubmission(BaseClass): except AttributeError as e: logger.error(f"Could not set {self} attribute {key} to {value} due to \n{e}") - def update_subsampassoc(self, sample:BasicSample, input_dict:dict): + def update_subsampassoc(self, sample: BasicSample, input_dict: dict): """ Update a joined submission sample association. @@ -444,9 +465,9 @@ class BasicSubmission(BaseClass): Returns: Result: _description_ - """ - assoc = [item for item in self.submission_sample_associations if item.sample==sample][0] - for k,v in input_dict.items(): + """ + assoc = [item for item in self.submission_sample_associations if item.sample == sample][0] + for k, v in input_dict.items(): try: setattr(assoc, k, v) except AttributeError: @@ -454,13 +475,13 @@ class BasicSubmission(BaseClass): result = assoc.save() return result - def to_pydantic(self, backup:bool=False) -> "PydSubmission": + def to_pydantic(self, backup: bool = False) -> "PydSubmission": """ Converts this instance into a PydSubmission Returns: PydSubmission: converted object. - """ + """ from backend.validators import PydSubmission, PydSample, PydReagent, PydEquipment dicto = self.to_dict(full_data=True, backup=backup) logger.debug("To dict complete") @@ -471,10 +492,11 @@ class BasicSubmission(BaseClass): missing = value is None or value in ['', 'None'] match key: case "reagents": - + new_dict[key] = [PydReagent(**reagent) for reagent in value] case "samples": - new_dict[key] = [PydSample(**{k.lower().replace(" ", "_"):v for k,v in sample.items()}) for sample in dicto['samples']] + new_dict[key] = [PydSample(**{k.lower().replace(" ", "_"): v for k, v in sample.items()}) for sample + in dicto['samples']] case "equipment": try: new_dict[key] = [PydEquipment(**equipment) for equipment in dicto['equipment']] @@ -492,19 +514,19 @@ class BasicSubmission(BaseClass): logger.debug("Done converting fields.") return PydSubmission(**new_dict) - def save(self, original:bool=True): + def save(self, original: bool = True): """ Adds this instance to database and commits. Args: original (bool, optional): Is this the first save. Defaults to True. - """ + """ logger.debug("Saving submission.") if original: self.uploaded_by = getuser() super().save() -# Polymorphic functions + # Polymorphic functions @classmethod def construct_regex(cls) -> re.Pattern: @@ -513,13 +535,14 @@ class BasicSubmission(BaseClass): Returns: re.Pattern: Regular expression pattern to discriminate between submission types. - """ - rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}' - regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + """ + rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}' + regex = re.compile(rstring, flags=re.IGNORECASE | re.VERBOSE) return regex - + @classmethod - def find_polymorphic_subclass(cls, attrs: dict|None = None, polymorphic_identity:str|SubmissionType|None = None): + def find_polymorphic_subclass(cls, polymorphic_identity: str | SubmissionType | None = None, + attrs: dict | None = None): """ Find subclass based on polymorphic identity or relevant attributes. @@ -542,26 +565,28 @@ class BasicSubmission(BaseClass): try: logger.info(f"Recruiting: {cls}") model = [item for item in cls.__subclasses__() if - item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0] + item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0] except Exception as e: logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") case _: pass if attrs is None or len(attrs) == 0: return model - if any([not hasattr(cls, attr) for attr in attrs]): + if any([not hasattr(cls, attr) for attr in attrs.keys()]): # looks for first model that has all included kwargs try: - model = [subclass for subclass in cls.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] + model = [subclass for subclass in cls.__subclasses__() if + all([hasattr(subclass, attr) for attr in attrs.keys()])][0] except IndexError as e: - raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") + raise AttributeError( + f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs.keys())}") logger.info(f"Recruiting model: {model}") return model -# Child class custom functions + # Child class custom functions @classmethod - def custom_platemap(cls, xl:pd.ExcelFile, plate_map:pd.DataFrame) -> pd.DataFrame: + def custom_platemap(cls, xl: pd.ExcelFile, plate_map: pd.DataFrame) -> pd.DataFrame: """ Stupid stopgap solution to there being an issue with the Bacterial Culture plate map @@ -571,12 +596,12 @@ class BasicSubmission(BaseClass): Returns: pd.DataFrame: updated plate map. - """ + """ logger.info(f"Calling {cls.__mapper_args__['polymorphic_identity']} plate mapper.") return plate_map - + @classmethod - def parse_info(cls, input_dict:dict, xl:pd.ExcelFile|None=None) -> dict: + def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict: """ Update submission dictionary with type specific information @@ -586,12 +611,12 @@ class BasicSubmission(BaseClass): Returns: dict: Updated sample dictionary - """ + """ logger.info(f"Calling {cls.__mapper_args__['polymorphic_identity']} info parser.") return input_dict - + @classmethod - def parse_samples(cls, input_dict:dict) -> dict: + def parse_samples(cls, input_dict: dict) -> dict: """ Update sample dictionary with type specific information @@ -600,12 +625,13 @@ class BasicSubmission(BaseClass): Returns: dict: Updated sample dictionary - """ + """ logger.info(f"Called {cls.__mapper_args__['polymorphic_identity']} sample parser") return input_dict - + @classmethod - def finalize_parse(cls, input_dict:dict, xl:pd.ExcelFile|None=None, info_map:dict|None=None, plate_map:dict|None=None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, + plate_map: dict | None = None) -> dict: """ Performs any final custom parsing of the excel file. @@ -617,12 +643,12 @@ class BasicSubmission(BaseClass): Returns: dict: Updated parser product. - """ + """ logger.info(f"Called {cls.__mapper_args__['polymorphic_identity']} finalizer") return input_dict @classmethod - def custom_autofill(cls, input_excel:Workbook, info:dict|None=None, backup:bool=False) -> Workbook: + def custom_autofill(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook: """ Adds custom autofill methods for submission @@ -633,12 +659,12 @@ class BasicSubmission(BaseClass): Returns: Workbook: Updated workbook - """ + """ logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} autofill") return input_excel - + @classmethod - def enforce_name(cls, instr:str, data:dict|None={}) -> str: + def enforce_name(cls, instr: str, data: dict | None = {}) -> str: """ Custom naming method for this class. @@ -648,7 +674,7 @@ class BasicSubmission(BaseClass): Returns: str: Updated name. - """ + """ # logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") # return instr from backend.validators import RSLNamer @@ -668,7 +694,8 @@ class BasicSubmission(BaseClass): outstr = re.sub(rf"RSL-?", rf"RSL-{data['abbreviation']}-", outstr, flags=re.IGNORECASE) try: outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr) - outstr = re.sub(rf"{data['abbreviation']}(\d{6})", rf"{data['abbreviation']}-\1", outstr, flags=re.IGNORECASE).upper() + outstr = re.sub(rf"{data['abbreviation']}(\d{6})", rf"{data['abbreviation']}-\1", outstr, + flags=re.IGNORECASE).upper() except (AttributeError, TypeError) as e: logger.error(f"Error making outstr: {e}, sending to RSLNamer to make new plate name.") outstr = RSLNamer.construct_new_plate_name(data=data) @@ -689,7 +716,7 @@ class BasicSubmission(BaseClass): # return outstr @classmethod - def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list: + def parse_pcr(cls, xl: pd.DataFrame, rsl_number: str) -> list: """ Perform custom parsing of pcr info. @@ -699,7 +726,7 @@ class BasicSubmission(BaseClass): Returns: list: _description_ - """ + """ logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") return [] @@ -711,11 +738,11 @@ class BasicSubmission(BaseClass): Returns: str: filename template in jinja friendly format. - """ + """ return "{{ rsl_plate_num }}" @classmethod - def custom_sample_autofill_row(cls, sample, worksheet:Worksheet) -> int: + def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int: """ _summary_ @@ -725,15 +752,15 @@ class BasicSubmission(BaseClass): Returns: int: _description_ - """ - return None + """ + return None @classmethod - def adjust_autofill_samples(cls, samples:List[Any]) -> List[Any]: + def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]: logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} sampler") return samples - - def adjust_to_dict_samples(self, backup:bool=False) -> List[dict]: + + def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]: """ Updates sample dictionaries with custom values @@ -742,12 +769,12 @@ class BasicSubmission(BaseClass): Returns: List[dict]: Updated dictionaries - """ + """ logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.") - return [item.to_sub_dict() for item in self.submission_sample_associations] - + return [item.to_sub_dict() for item in self.submission_sample_associations] + @classmethod - def get_details_template(cls, base_dict:dict) -> Tuple[dict, Template]: + def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: """ Get the details jinja template for the correct class @@ -756,7 +783,7 @@ class BasicSubmission(BaseClass): Returns: Tuple(dict, Template): (Updated dictionary, Template to be rendered) - """ + """ base_dict['excluded'] = cls.get_default_info('details_ignore') env = jinja_template_loading() temp_name = f"{cls.__name__.lower()}_details.html" @@ -768,28 +795,28 @@ class BasicSubmission(BaseClass): template = env.get_template("basicsubmission_details.html") return base_dict, template -# Query functions + # Query functions @classmethod @setup_lookup - def query(cls, - submission_type:str|SubmissionType|None=None, - id:int|str|None=None, - rsl_number:str|None=None, - start_date:date|str|int|None=None, - end_date:date|str|int|None=None, - reagent:Reagent|str|None=None, - chronologic:bool=False, - limit:int=0, - **kwargs - ) -> BasicSubmission | List[BasicSubmission]: + def query(cls, + submission_type: str | SubmissionType | None = None, + id: int | str | None = None, + rsl_plate_num: str | None = None, + start_date: date | str | int | None = None, + end_date: date | str | int | None = None, + reagent: Reagent | str | None = None, + chronologic: bool = False, + limit: int = 0, + **kwargs + ) -> BasicSubmission | List[BasicSubmission]: """ - Lookup submissions based on a number of parameters. + Lookup submissions based on a number of parameters. Overrides parent. Args: submission_type (str | models.SubmissionType | None, optional): Submission type of interest. Defaults to None. id (int | str | None, optional): Submission id in the database (limits results to 1). Defaults to None. - rsl_number (str | None, optional): Submission name in the database (limits results to 1). Defaults to None. + rsl_plate_num (str | None, optional): Submission name in the database (limits results to 1). Defaults to None. start_date (date | str | int | None, optional): Beginning date to search by. Defaults to None. end_date (date | str | int | None, optional): Ending date to search by. Defaults to None. reagent (models.Reagent | str | None, optional): A reagent used in the submission. Defaults to None. @@ -803,7 +830,7 @@ class BasicSubmission(BaseClass): # NOTE: if you go back to using 'model' change the appropriate cls to model in the query filters if submission_type is not None: # if isinstance(submission_type, SubmissionType): - # model = cls.find_subclasses(submission_type=submission_type.name) + # model = cls.find_subclasses(submission_type=submission_type.name) model = cls.find_polymorphic_subclass(polymorphic_identity=submission_type) # else: # model = cls.find_subclasses(submission_type=submission_type) @@ -815,13 +842,13 @@ class BasicSubmission(BaseClass): else: model = cls query: Query = cls.__database_session__.query(model) - if start_date != None and end_date == None: + if start_date is not None and end_date is None: logger.warning(f"Start date with no end date, using today.") end_date = date.today() - if end_date != None and start_date == None: + if end_date is not None and start_date is None: logger.warning(f"End date with no start date, using Jan 1, 2023") start_date = date(2023, 1, 1) - if start_date != None: + if start_date is not None: logger.debug(f"Querying with start date: {start_date} and end date: {end_date}") match start_date: case date(): @@ -829,7 +856,8 @@ class BasicSubmission(BaseClass): start_date = start_date.strftime("%Y-%m-%d") case int(): # logger.debug(f"Lookup BasicSubmission by ordinal start_date {start_date}") - start_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") + start_date = datetime.fromordinal( + datetime(1900, 1, 1).toordinal() + start_date - 2).date().strftime("%Y-%m-%d") case _: # logger.debug(f"Lookup BasicSubmission by parsed str start_date {start_date}") start_date = parse(start_date).strftime("%Y-%m-%d") @@ -839,32 +867,35 @@ class BasicSubmission(BaseClass): end_date = end_date.strftime("%Y-%m-%d") case int(): # logger.debug(f"Lookup BasicSubmission by ordinal end_date {end_date}") - end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d") + end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime( + "%Y-%m-%d") case _: # logger.debug(f"Lookup BasicSubmission by parsed str end_date {end_date}") end_date = parse(end_date).strftime("%Y-%m-%d") # logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}") - logger.debug(f"Start date {start_date} == End date {end_date}: {start_date==end_date}") + logger.debug(f"Start date {start_date} == End date {end_date}: {start_date == end_date}") # logger.debug(f"Compensating for same date by using time") if start_date == end_date: start_date = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m-%d %H:%M:%S.%f") - query = query.filter(model.submitted_date==start_date) + query = query.filter(model.submitted_date == start_date) else: query = query.filter(model.submitted_date.between(start_date, end_date)) # by reagent (for some reason) match reagent: case str(): # logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") - query = query.join(model.submission_reagent_associations).filter(SubmissionSampleAssociation.reagent.lot==reagent) + query = query.join(model.submission_reagent_associations).filter( + SubmissionSampleAssociation.reagent.lot == reagent) case Reagent(): # logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") - query = query.join(model.submission_reagent_associations).join(SubmissionSampleAssociation.reagent).filter(Reagent.lot==reagent) + query = query.join(model.submission_reagent_associations).join( + SubmissionSampleAssociation.reagent).filter(Reagent.lot == reagent) case _: pass # by rsl number (returns only a single value) - match rsl_number: + match rsl_plate_num: case str(): - query = query.filter(model.rsl_plate_num==rsl_number) + query = query.filter(model.rsl_plate_num == rsl_plate_num) # logger.debug(f"At this point the query gets: {query.all()}") limit = 1 case _: @@ -873,27 +904,31 @@ class BasicSubmission(BaseClass): match id: case int(): # logger.debug(f"Looking up BasicSubmission with id: {id}") - query = query.filter(model.id==id) + query = query.filter(model.id == id) limit = 1 case str(): # logger.debug(f"Looking up BasicSubmission with id: {id}") - query = query.filter(model.id==int(id)) + query = query.filter(model.id == int(id)) limit = 1 case _: pass - for k, v in kwargs.items(): - logger.debug(f"Looking up attribute: {k}") - attr = getattr(model, k) - logger.debug(f"Got attr: {attr}") - query = query.filter(attr==v) + # for k, v in kwargs.items(): + # logger.debug(f"Looking up attribute: {k}") + # attr = getattr(model, k) + # logger.debug(f"Got attr: {attr}") + # query = query.filter(attr==v) # if len(kwargs) > 0: # limit = 1 + # query = cls.query_by_keywords(query=query, model=model, **kwargs) + # if any(x in kwargs.keys() for x in cls.get_default_info('singles')): + # logger.debug(f"There's a singled out item in kwargs") + # limit = 1 if chronologic: query.order_by(cls.submitted_date) - return cls.execute_query(query=query, limit=limit) + return cls.execute_query(query=query, model=model, limit=limit, **kwargs) @classmethod - def query_or_create(cls, submission_type:str|SubmissionType|None=None, **kwargs) -> BasicSubmission: + def query_or_create(cls, submission_type: str | SubmissionType | None = None, **kwargs) -> BasicSubmission: """ Returns object from db if exists, else, creates new. Due to need for user input, doesn't see much use ATM. @@ -906,7 +941,7 @@ class BasicSubmission(BaseClass): Returns: cls: _description_ - """ + """ code = 0 msg = "" disallowed = ["id"] @@ -914,7 +949,8 @@ class BasicSubmission(BaseClass): raise ValueError("Need to narrow down query or the first available instance will be returned.") for key in kwargs.keys(): if key in disallowed: - raise ValueError(f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects. Use .query() instead.") + raise ValueError( + f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects. Use .query() instead.") instance = cls.query(submission_type=submission_type, limit=1, **kwargs) # logger.debug(f"Retrieved instance: {instance}") if instance == None: @@ -934,7 +970,7 @@ class BasicSubmission(BaseClass): msg = "This submission already exists.\nWould you like to overwrite?" return instance, code, msg -# Custom context events for the ui + # Custom context events for the ui def custom_context_events(self) -> dict: """ @@ -942,12 +978,12 @@ class BasicSubmission(BaseClass): Returns: dict: dictionary of functions - """ + """ names = ["Delete", "Details", "Edit", "Add Comment", "Add Equipment", "Export"] funcs = [self.delete, self.show_details, self.edit, self.add_comment, self.add_equipment, self.backup] - dicto = {item[0]:item[1] for item in zip(names, funcs)} + dicto = {item[0]: item[1] for item in zip(names, funcs)} return dicto - + def delete(self, obj=None): """ Performs backup and deletes this instance from database. @@ -957,7 +993,7 @@ class BasicSubmission(BaseClass): Raises: e: _description_ - """ + """ from frontend.widgets.pop_ups import QuestionAsker logger.debug("Hello from delete") fname = self.__backup_path__.joinpath(f"{self.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')})") @@ -978,7 +1014,7 @@ class BasicSubmission(BaseClass): Args: obj (_type_): parent widget - """ + """ logger.debug("Hello from details") from frontend.widgets.submission_details import SubmissionDetails dlg = SubmissionDetails(parent=obj, sub=self) @@ -1000,7 +1036,7 @@ class BasicSubmission(BaseClass): Args: obj (_type_): parent widget - """ + """ from frontend.widgets.submission_details import SubmissionComment dlg = SubmissionComment(parent=obj, submission=self) if dlg.exec(): @@ -1015,7 +1051,7 @@ class BasicSubmission(BaseClass): Args: obj (_type_): parent widget - """ + """ from frontend.widgets.equipment_usage import EquipmentUsage dlg = EquipmentUsage(parent=obj, submission=self) if dlg.exec(): @@ -1029,7 +1065,7 @@ class BasicSubmission(BaseClass): else: pass - def backup(self, obj=None, fname:Path|None=None, full_backup:bool=False): + def backup(self, obj=None, fname: Path | None = None, full_backup: bool = False): """ Exports xlsx and yml info files for this instance. @@ -1037,7 +1073,7 @@ class BasicSubmission(BaseClass): obj (_type_, optional): _description_. Defaults to None. fname (Path | None, optional): Filename of xlsx file. Defaults to None. full_backup (bool, optional): Whether or not to make yaml file. Defaults to False. - """ + """ logger.debug("Hello from backup.") pyd = self.to_pydantic(backup=True) if fname == None: @@ -1059,35 +1095,33 @@ class BasicSubmission(BaseClass): wb = pyd.autofill_equipment(wb) wb.save(filename=fname.with_suffix(".xlsx")) + # Below are the custom submission types class BacterialCulture(BasicSubmission): """ derivative submission type from BasicSubmission - """ + """ id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) - controls = relationship("Control", back_populates="submission", uselist=True) #: A control sample added to submission - __mapper_args__ = dict(polymorphic_identity="Bacterial Culture", - polymorphic_load="inline", + controls = relationship("Control", back_populates="submission", + uselist=True) #: A control sample added to submission + __mapper_args__ = dict(polymorphic_identity="Bacterial Culture", + polymorphic_load="inline", inherit_condition=(id == BasicSubmission.id)) - def to_dict(self, full_data:bool=False, backup:bool=False, report:bool=False) -> dict: + def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: """ Extends parent class method to add controls to dict Returns: dict: dictionary used in submissions summary - """ + """ output = super().to_dict(full_data=full_data, backup=backup, report=report) if report: return output if full_data: output['controls'] = [item.to_sub_dict() for item in self.controls] return output - - # @classmethod - # def get_default_info(cls) -> dict: - # return dict(abbreviation="BC", submission_type="Bacterial Culture") @classmethod def custom_platemap(cls, xl: pd.ExcelFile, plate_map: pd.DataFrame) -> pd.DataFrame: @@ -1100,20 +1134,20 @@ class BacterialCulture(BasicSubmission): Returns: pd.DataFrame: updated plate map. - """ + """ plate_map = super().custom_platemap(xl, plate_map) - num1 = xl.parse("Sample List").iloc[40,1] - num2 = xl.parse("Sample List").iloc[41,1] - logger.debug(f"Broken: {plate_map.iloc[5,0]}, {plate_map.iloc[6,0]}") + num1 = xl.parse("Sample List").iloc[40, 1] + num2 = xl.parse("Sample List").iloc[41, 1] + logger.debug(f"Broken: {plate_map.iloc[5, 0]}, {plate_map.iloc[6, 0]}") logger.debug(f"Replace: {num1}, {num2}") - if not check_not_nan(plate_map.iloc[5,0]): - plate_map.iloc[5,0] = num1 - if not check_not_nan(plate_map.iloc[6,0]): - plate_map.iloc[6,0] = num2 + if not check_not_nan(plate_map.iloc[5, 0]): + plate_map.iloc[5, 0] = num1 + if not check_not_nan(plate_map.iloc[6, 0]): + plate_map.iloc[6, 0] = num2 return plate_map - + @classmethod - def custom_autofill(cls, input_excel: Workbook, info:dict|None=None, backup:bool=False) -> Workbook: + def custom_autofill(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False) -> Workbook: """ Stupid stopgap solution to there being an issue with the Bacterial Culture plate map. Extends parent. @@ -1122,12 +1156,12 @@ class BacterialCulture(BasicSubmission): Returns: Workbook: Updated openpyxl workbook - """ + """ input_excel = super().custom_autofill(input_excel) sheet = input_excel['Plate Map'] - if sheet.cell(12,2).value == None: + if sheet.cell(12, 2).value == None: sheet.cell(row=12, column=2, value="=IF(ISBLANK('Sample List'!$B42),\"\",'Sample List'!$B42)") - if sheet.cell(13,2).value == None: + if sheet.cell(13, 2).value == None: sheet.cell(row=13, column=2, value="=IF(ISBLANK('Sample List'!$B43),\"\",'Sample List'!$B43)") input_excel["Sample List"].cell(row=15, column=2, value=getuser()) return input_excel @@ -1139,20 +1173,21 @@ class BacterialCulture(BasicSubmission): Returns: str: string for regex construction - """ + """ return "(?PRSL(?:-|_)?BC(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)" - + @classmethod def filename_template(cls): """ extends parent - """ + """ template = super().filename_template() template += "_{{ submitting_lab }}_{{ submitter_plate_num }}" return template - + @classmethod - def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, plate_map: dict | None = None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, + plate_map: dict | None = None) -> dict: """ Extends parent. Currently finds control sample and adds to reagents. @@ -1164,7 +1199,7 @@ class BacterialCulture(BasicSubmission): Returns: dict: _description_ - """ + """ from . import ControlType input_dict = super().finalize_parse(input_dict, xl, info_map, plate_map) # build regex for all control types that have targets @@ -1176,7 +1211,8 @@ class BacterialCulture(BasicSubmission): logger.debug(f"Control match found: {sample.submitter_id}") new_lot = matched.group() try: - pos_control_reg = [reg for reg in input_dict['reagents'] if reg.type=="Bacterial-Positive Control"][0] + pos_control_reg = \ + [reg for reg in input_dict['reagents'] if reg.type == "Bacterial-Positive Control"][0] except IndexError: logger.error(f"No positive control reagent listed") return input_dict @@ -1188,41 +1224,42 @@ class BacterialCulture(BasicSubmission): def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int: """ Extends parent - """ + """ logger.debug(f"Checking {sample.well}") logger.debug(f"here's the worksheet: {worksheet}") row = super().custom_sample_autofill_row(sample, worksheet) df = pd.DataFrame(list(worksheet.values)) # logger.debug(f"Here's the dataframe: {df}") - idx = df[df[0]==sample.well] + idx = df[df[0] == sample.well] if idx.empty: new = f"{sample.well[0]}{sample.well[1:].zfill(2)}" logger.debug(f"Checking: {new}") - idx = df[df[0]==new] + idx = df[df[0] == new] logger.debug(f"Here is the row: {idx}") row = idx.index.to_list()[0] return row + 1 + class Wastewater(BasicSubmission): """ derivative submission type from BasicSubmission - """ + """ id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) - ext_technician = Column(String(64)) #: Name of technician doing extraction - pcr_technician = Column(String(64)) #: Name of technician doing pcr - pcr_info = Column(JSON)#: unstructured output from pcr table logger or user(Artic) + ext_technician = Column(String(64)) #: Name of technician doing extraction + pcr_technician = Column(String(64)) #: Name of technician doing pcr + pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) - __mapper_args__ = __mapper_args__ = dict(polymorphic_identity="Wastewater", - polymorphic_load="inline", - inherit_condition=(id == BasicSubmission.id)) + __mapper_args__ = __mapper_args__ = dict(polymorphic_identity="Wastewater", + polymorphic_load="inline", + inherit_condition=(id == BasicSubmission.id)) - def to_dict(self, full_data:bool=False, backup:bool=False, report:bool=False) -> dict: + def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: """ Extends parent class method to add controls to dict Returns: dict: dictionary used in submissions summary - """ + """ output = super().to_dict(full_data=full_data, backup=backup, report=report) if report: return output @@ -1239,9 +1276,9 @@ class Wastewater(BasicSubmission): else: output['PCR Technician'] = self.pcr_technician return output - + @classmethod - def parse_info(cls, input_dict:dict, xl:pd.ExcelFile|None=None) -> dict: + def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict: """ Update submission dictionary with type specific information. Extends parent @@ -1250,25 +1287,28 @@ class Wastewater(BasicSubmission): Returns: dict: Updated sample dictionary - """ + """ input_dict = super().parse_info(input_dict) if xl != None: input_dict['csv'] = xl.parse("Copy to import file") return input_dict - + @classmethod - def parse_pcr(cls, xl: pd.ExcelFile, rsl_number:str) -> list: + def parse_pcr(cls, xl: pd.ExcelFile, rsl_number: str) -> list: """ Parse specific to wastewater samples. - """ + """ samples = super().parse_pcr(xl=xl, rsl_number=rsl_number) df = xl.parse(sheet_name="Results", dtype=object).fillna("") - column_names = ["Well", "Well Position", "Omit","Sample","Target","Task"," Reporter","Quencher","Amp Status","Amp Score","Curve Quality","Result Quality Issues","Cq","Cq Confidence","Cq Mean","Cq SD","Auto Threshold","Threshold", "Auto Baseline", "Baseline Start", "Baseline End"] + column_names = ["Well", "Well Position", "Omit", "Sample", "Target", "Task", " Reporter", "Quencher", + "Amp Status", "Amp Score", "Curve Quality", "Result Quality Issues", "Cq", "Cq Confidence", + "Cq Mean", "Cq SD", "Auto Threshold", "Threshold", "Auto Baseline", "Baseline Start", + "Baseline End"] samples_df = df.iloc[23:][0:] logger.debug(f"Dataframe of PCR results:\n\t{samples_df}") samples_df.columns = column_names logger.debug(f"Samples columns: {samples_df.columns}") - well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:,-1:] + well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:, -1:] try: samples_df['Assessment'] = well_call_df.values except ValueError: @@ -1276,13 +1316,13 @@ class Wastewater(BasicSubmission): logger.debug(f"Well call df: {well_call_df}") for _, row in samples_df.iterrows(): try: - sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0] + sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0] except IndexError: sample_obj = dict( - sample = row['Sample'], - plate_rsl = rsl_number, + sample=row['Sample'], + plate_rsl=rsl_number, ) - logger.debug(f"Got sample obj: {sample_obj}") + logger.debug(f"Got sample obj: {sample_obj}") if isinstance(row['Cq'], float): sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] else: @@ -1293,12 +1333,12 @@ class Wastewater(BasicSubmission): logger.error(f"No assessment for {sample_obj['sample']}") samples.append(sample_obj) return samples - + @classmethod - def enforce_name(cls, instr:str, data:dict|None={}) -> str: + def enforce_name(cls, instr: str, data: dict | None = {}) -> str: """ Extends parent - """ + """ try: # Deal with PCR file. instr = re.sub(r"PCR(-|_)", "", instr) @@ -1314,9 +1354,9 @@ class Wastewater(BasicSubmission): Returns: str: String for regex construction - """ + """ return "(?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)" - + @classmethod def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]: """ @@ -1330,13 +1370,13 @@ class Wastewater(BasicSubmission): def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int: """ Extends parent - """ + """ logger.debug(f"Checking {sample.well}") logger.debug(f"here's the worksheet: {worksheet}") row = super().custom_sample_autofill_row(sample, worksheet) df = pd.DataFrame(list(worksheet.values)) logger.debug(f"Here's the dataframe: {df}") - idx = df[df[1]==sample.sample_location] + idx = df[df[1] == sample.sample_location] logger.debug(f"Here is the row: {idx}") row = idx.index.to_list()[0] return row + 1 @@ -1358,36 +1398,37 @@ class Wastewater(BasicSubmission): for sample in self.samples: logger.debug(f"Running update on: {sample}") try: - sample_dict = [item for item in parser.samples if item['sample']==sample.rsl_number][0] + sample_dict = [item for item in parser.samples if item['sample'] == sample.rsl_number][0] except IndexError: continue self.update_subsampassoc(sample=sample, input_dict=sample_dict) # self.report.add_result(Result(msg=f"We added PCR info to {sub.rsl_plate_num}.", status='Information')) + class WastewaterArtic(BasicSubmission): """ derivative submission type for artic wastewater - """ + """ id = Column(INTEGER, ForeignKey('_basicsubmission.id'), primary_key=True) - artic_technician = Column(String(64)) #: Name of technician performing artic - dna_core_submission_number = Column(String(64)) #: Number used by core as id - pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) - gel_image = Column(String(64)) #: file name of gel image in zip file - gel_info = Column(JSON) #: unstructured data from gel. - gel_controls = Column(JSON) #: locations of controls on the gel - source_plates = Column(JSON) #: wastewater plates that samples come from + artic_technician = Column(String(64)) #: Name of technician performing artic + dna_core_submission_number = Column(String(64)) #: Number used by core as id + pcr_info = Column(JSON) #: unstructured output from pcr table logger or user(Artic) + gel_image = Column(String(64)) #: file name of gel image in zip file + gel_info = Column(JSON) #: unstructured data from gel. + gel_controls = Column(JSON) #: locations of controls on the gel + source_plates = Column(JSON) #: wastewater plates that samples come from - __mapper_args__ = dict(polymorphic_identity="Wastewater Artic", - polymorphic_load="inline", + __mapper_args__ = dict(polymorphic_identity="Wastewater Artic", + polymorphic_load="inline", inherit_condition=(id == BasicSubmission.id)) - def to_dict(self, full_data:bool=False, backup:bool=False, report:bool=False) -> dict: + def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: """ Extends parent class method to add controls to dict Returns: dict: dictionary used in submissions summary - """ + """ output = super().to_dict(full_data=full_data, backup=backup, report=report) if report: return output @@ -1398,7 +1439,7 @@ class WastewaterArtic(BasicSubmission): return output @classmethod - def parse_info(cls, input_dict:dict, xl:pd.ExcelFile|None=None) -> dict: + def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict: """ Update submission dictionary with type specific information @@ -1408,23 +1449,26 @@ class WastewaterArtic(BasicSubmission): Returns: dict: Updated sample dictionary - """ + """ input_dict = super().parse_info(input_dict) workbook = load_workbook(xl.io, data_only=True) ws = workbook['Egel results'] - data = [ws.cell(row=ii,column=jj) for jj in range(15,27) for ii in range(10,18)] + data = [ws.cell(row=ii, column=jj) for jj in range(15, 27) for ii in range(10, 18)] data = [cell for cell in data if cell.value is not None and "NTC" in cell.value] - input_dict['gel_controls'] = [dict(sample_id=cell.value, location=f"{row_map[cell.row-9]}{str(cell.column-14).zfill(2)}") for cell in data] + input_dict['gel_controls'] = [ + dict(sample_id=cell.value, location=f"{row_map[cell.row - 9]}{str(cell.column - 14).zfill(2)}") for cell in + data] ws = workbook['First Strand List'] - data = [dict(plate=ws.cell(row=ii, column=3).value, starting_sample=ws.cell(row=ii, column=4).value) for ii in range(8,11)] + data = [dict(plate=ws.cell(row=ii, column=3).value, starting_sample=ws.cell(row=ii, column=4).value) for ii in + range(8, 11)] input_dict['source_plates'] = data return input_dict @classmethod - def enforce_name(cls, instr:str, data:dict={}) -> str: + def enforce_name(cls, instr: str, data: dict = {}) -> str: """ Extends parent - """ + """ try: # Deal with PCR file. instr = re.sub(r"Artic", "", instr, flags=re.IGNORECASE) @@ -1448,18 +1492,18 @@ class WastewaterArtic(BasicSubmission): Returns: dict: Updated sample dictionary - """ + """ input_dict = super().parse_samples(input_dict) input_dict['sample_type'] = "Wastewater Sample" # Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" # at the end, this has to be done here. No moving to sqlalchemy object :( input_dict['submitter_id'] = re.sub(r"\s\(.+\)\s?$", "", str(input_dict['submitter_id'])).strip() - try: + try: input_dict['ww_processing_num'] = input_dict['sample_name_(lims)'] del input_dict['sample_name_(lims)'] except KeyError: logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") - try: + try: input_dict['ww_full_sample_id'] = input_dict['sample_name_(ww)'] del input_dict['sample_name_(ww)'] except KeyError: @@ -1467,9 +1511,9 @@ class WastewaterArtic(BasicSubmission): if "ENC" in input_dict['submitter_id']: input_dict['submitter_id'] = cls.en_adapter(input_str=input_dict['submitter_id']) return input_dict - + @classmethod - def en_adapter(cls, input_str:str) -> str: + def en_adapter(cls, input_str: str) -> str: """ Stopgap solution because WW names their ENs different @@ -1478,7 +1522,7 @@ class WastewaterArtic(BasicSubmission): Returns: str: output name - """ + """ logger.debug(f"input string raw: {input_str}") # Remove letters. processed = re.sub(r"[A-QS-Z]+\d*", "", input_str) @@ -1492,7 +1536,7 @@ class WastewaterArtic(BasicSubmission): en_num = "1" en_num = en_num.strip("-") logger.debug(f"Processed after en-num: {processed}") - try: + try: plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() processed = rreplace(processed, plate_num, "") except AttributeError: @@ -1519,11 +1563,12 @@ class WastewaterArtic(BasicSubmission): Returns: str: string for regex construction. - """ + """ return "(?P(\\d{4}-\\d{2}-\\d{2}(?:-|_)(?:\\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)\\d?(\\D|$)R?\\d?)?))" @classmethod - def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, plate_map: dict | None = None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, + plate_map: dict | None = None) -> dict: """ Performs any final custom parsing of the excel file. Extends parent @@ -1535,7 +1580,7 @@ class WastewaterArtic(BasicSubmission): Returns: dict: Updated parser product. - """ + """ input_dict = super().finalize_parse(input_dict, xl, info_map, plate_map) input_dict['csv'] = xl.parse("hitpicks_csv_to_export") return input_dict @@ -1600,14 +1645,14 @@ class WastewaterArtic(BasicSubmission): with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name)) img = OpenpyxlImage(z) - img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) + img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) img.width = 600 img.anchor = 'B9' worksheet.add_image(img) return input_excel @classmethod - def get_details_template(cls, base_dict:dict) -> Tuple[dict, Template]: + def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: """ Get the details jinja template for the correct class. Extends parent @@ -1616,9 +1661,10 @@ class WastewaterArtic(BasicSubmission): Returns: Tuple[dict, Template]: (Updated dictionary, Template to be rendered) - """ + """ base_dict, template = super().get_details_template(base_dict=base_dict) - base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates", "gel_controls"] + base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates", + "gel_controls"] base_dict['DNA Core ID'] = base_dict['dna_core_submission_number'] check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] != None if check: @@ -1632,7 +1678,7 @@ class WastewaterArtic(BasicSubmission): base_dict['gel_image'] = base64.b64encode(zipped.read(base_dict['gel_image'])).decode('utf-8') return base_dict, template - def adjust_to_dict_samples(self, backup:bool=False) -> List[dict]: + def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]: """ Updates sample dictionaries with custom values @@ -1641,7 +1687,7 @@ class WastewaterArtic(BasicSubmission): Returns: List[dict]: Updated dictionaries - """ + """ logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.") output = [] for assoc in self.submission_sample_associations: @@ -1662,18 +1708,18 @@ class WastewaterArtic(BasicSubmission): Returns: dict: dictionary of functions - """ + """ events = super().custom_context_events() events['Gel Box'] = self.gel_box return events - + def gel_box(self, obj): """ Creates widget to perform gel viewing operations Args: obj (_type_): parent widget - """ + """ from frontend.widgets.gel_checker import GelBox from frontend.widgets import select_open_file fname = select_open_file(obj=obj, file_extension="jpg") @@ -1697,40 +1743,41 @@ class WastewaterArtic(BasicSubmission): zipf.write(img_path, self.gel_image) self.save() + # Sample Classes class BasicSample(BaseClass): """ Base of basic sample which polymorphs into BCSample and WWSample - """ + """ - id = Column(INTEGER, primary_key=True) #: primary key - submitter_id = Column(String(64), nullable=False, unique=True) #: identification from submitter - sample_type = Column(String(32)) #: subtype of sample + id = Column(INTEGER, primary_key=True) #: primary key + submitter_id = Column(String(64), nullable=False, unique=True) #: identification from submitter + sample_type = Column(String(32)) #: subtype of sample sample_submission_associations = relationship( "SubmissionSampleAssociation", back_populates="sample", cascade="all, delete-orphan", - ) #: associated submissions + ) #: associated submissions __mapper_args__ = { "polymorphic_identity": "Basic Sample", "polymorphic_on": case( - - (sample_type == "Wastewater Sample", "Wastewater Sample"), - (sample_type == "Wastewater Artic Sample", "Wastewater Sample"), - (sample_type == "Bacterial Culture Sample", "Bacterial Culture Sample"), - + + (sample_type == "Wastewater Sample", "Wastewater Sample"), + (sample_type == "Wastewater Artic Sample", "Wastewater Sample"), + (sample_type == "Bacterial Culture Sample", "Bacterial Culture Sample"), + else_="Basic Sample" - ), + ), "with_polymorphic": "*", } - submissions = association_proxy("sample_submission_associations", "submission") #: proxy of associated submissions + submissions = association_proxy("sample_submission_associations", "submission") #: proxy of associated submissions @validates('submitter_id') - def create_id(self, key:str, value:str): + def create_id(self, key: str, value: str): """ Creates a random string as a submitter id. @@ -1740,19 +1787,19 @@ class BasicSample(BaseClass): Returns: str: new (or unchanged) submitter id - """ + """ if value == None: return uuid.uuid4().hex.upper() else: return value - + def __repr__(self) -> str: try: return f"<{self.sample_type.replace('_', ' ').title().replace(' ', '')}({self.submitter_id})>" except AttributeError: return f" dict: + + def to_sub_dict(self, full_data: bool = False) -> dict: """ gui friendly dictionary, extends parent method. @@ -1765,80 +1812,63 @@ class BasicSample(BaseClass): sample['Submitter ID'] = self.submitter_id sample['Sample Type'] = self.sample_type if full_data: - sample['submissions'] = sorted([item.to_sub_dict() for item in self.sample_submission_associations], key=itemgetter('submitted_date')) + sample['submissions'] = sorted([item.to_sub_dict() for item in self.sample_submission_associations], + key=itemgetter('submitted_date')) # logger.debug(f"Done converting {self} after {time()-start}") return sample - def set_attribute(self, name:str, value): + def set_attribute(self, name: str, value): """ Custom attribute setter (depreciated over built-in __setattr__) Args: name (str): name of attribute value (_type_): value to be set to attribute - """ + """ try: setattr(self, name, value) except AttributeError: logger.error(f"Attribute {name} not found") @classmethod - def find_subclasses(cls, attrs:dict|None=None, sample_type:str|None=None) -> BasicSample: - """ - Retrieves subclass of BasicSample based on type or possessed attributes. - - Args: - attrs (dict | None, optional): attributes for query. Defaults to None. - sample_type (str | None, optional): sample type by name. Defaults to None. - - Raises: - AttributeError: Raised if class containing all given attributes cannot be found. - - Returns: - BasicSample: sample type object of interest - """ - if sample_type != None: - return cls.find_polymorphic_subclass(polymorphic_identity=sample_type) - if len(attrs) == 0 or attrs == None: - logger.warning(f"No attr, returning {cls}") - return cls - if any([not hasattr(cls, attr) for attr in attrs]): - logger.debug(f"{cls} is missing attrs. searching for better match.") - # looks for first model that has all included kwargs - try: - model = [subclass for subclass in cls.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] - except IndexError as e: - raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") - else: - # logger.debug(f"{cls} has all necessary attributes, returning") - return cls - # logger.debug(f"Using model: {model}") - return model - - @classmethod - def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None) -> BasicSample: + def find_polymorphic_subclass(cls, polymorphic_identity: str | None = None, + attrs: dict | None = None) -> BasicSample: """ Retrieves subclasses of BasicSample based on type name. Args: + attrs (dict | None, optional): name: value of attributes in the wanted subclass polymorphic_identity (str | None, optional): Name of subclass fed to polymorphic identity. Defaults to None. Returns: BasicSample: Subclass of interest. - """ + """ if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] - if polymorphic_identity == None: - return cls - else: + if polymorphic_identity is not None: try: - return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + return [item for item in cls.__subclasses__() if + item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0] except Exception as e: logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") - return cls + model = cls + else: + model = cls + if attrs is None or len(attrs) == 0: + return model + if any([not hasattr(cls, attr) for attr in attrs.keys()]): + # looks for first model that has all included kwargs + try: + model = [subclass for subclass in cls.__subclasses__() if + all([hasattr(subclass, attr) for attr in attrs.keys()])][0] + except IndexError as e: + raise AttributeError( + f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs.keys())}") + logger.info(f"Recruiting model: {model}") + return model @classmethod - def parse_sample(cls, input_dict:dict) -> dict: + def parse_sample(cls, input_dict: dict) -> dict: f""" Custom sample parser @@ -1847,11 +1877,11 @@ class BasicSample(BaseClass): Returns: dict: Updated parser results. - """ + """ return input_dict - + @classmethod - def get_details_template(cls, base_dict:dict) -> Tuple[dict, Template]: + def get_details_template(cls, base_dict: dict) -> Tuple[dict, Template]: """ Get the details jinja template for the correct class @@ -1860,7 +1890,7 @@ class BasicSample(BaseClass): Returns: Tuple(dict, Template): (Updated dictionary, Template to be rendered) - """ + """ base_dict['excluded'] = ['submissions', 'excluded', 'colour', 'tooltip'] env = jinja_template_loading() temp_name = f"{cls.__name__.lower()}_details.html" @@ -1874,12 +1904,12 @@ class BasicSample(BaseClass): @classmethod @setup_lookup - def query(cls, - submitter_id:str|None=None, - sample_type:str|None=None, - limit:int=0, + def query(cls, + submitter_id: str | None = None, + sample_type: str | None = None, + limit: int = 0, **kwargs - ) -> BasicSample|List[BasicSample]: + ) -> BasicSample | List[BasicSample]: """ Lookup samples in the database by a number of parameters. @@ -1890,11 +1920,12 @@ class BasicSample(BaseClass): Returns: models.BasicSample|List[models.BasicSample]: Sample(s) of interest. - """ - if sample_type == None: - model = cls.find_subclasses(attrs=kwargs) + """ + if sample_type is None: + # model = cls.find_subclasses(attrs=kwargs) + model = cls.find_polymorphic_subclass(attrs=kwargs) else: - model = cls.find_subclasses(sample_type=sample_type) + model = cls.find_polymorphic_subclass(polymorphic_identity=sample_type) logger.debug(f"Length of kwargs: {len(kwargs)}") # model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs) # query: Query = setup_lookup(ctx=ctx, locals=locals()).query(model) @@ -1902,26 +1933,27 @@ class BasicSample(BaseClass): match submitter_id: case str(): # logger.debug(f"Looking up {model} with submitter id: {submitter_id}") - query = query.filter(model.submitter_id==submitter_id) + query = query.filter(model.submitter_id == submitter_id) limit = 1 case _: pass - match sample_type: - case str(): - logger.warning(f"Looking up samples with sample_type is disabled.") - # query = query.filter(models.BasicSample.sample_type==sample_type) - case _: - pass - for k, v in kwargs.items(): - attr = getattr(model, k) - # logger.debug(f"Got attr: {attr}") - query = query.filter(attr==v) - if len(kwargs) > 0: - limit = 1 - return cls.execute_query(query=query, limit=limit) - + # match sample_type: + # case str(): + # logger.warning(f"Looking up samples with sample_type is disabled.") + # # query = query.filter(models.BasicSample.sample_type==sample_type) + # case _: + # pass + # for k, v in kwargs.items(): + # attr = getattr(model, k) + # # logger.debug(f"Got attr: {attr}") + # query = query.filter(attr==v) + # if len(kwargs) > 0: + # limit = 1 + return cls.execute_query(query=query, model=model, limit=limit, **kwargs) + # return cls.execute_query(query=query, limit=limit) + @classmethod - def query_or_create(cls, sample_type:str|None=None, **kwargs) -> BasicSample: + def query_or_create(cls, sample_type: str | None = None, **kwargs) -> BasicSample: """ Queries for a sample, if none found creates a new one. @@ -1934,17 +1966,18 @@ class BasicSample(BaseClass): Returns: _type_: _description_ - """ + """ disallowed = ["id"] if kwargs == {}: raise ValueError("Need to narrow down query or the first available instance will be returned.") for key in kwargs.keys(): if key in disallowed: - raise ValueError(f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects.") + raise ValueError( + f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects.") instance = cls.query(sample_type=sample_type, limit=1, **kwargs) logger.debug(f"Retrieved instance: {instance}") if instance == None: - used_class = cls.find_subclasses(attrs=kwargs, sample_type=sample_type) + used_class = cls.find_polymorphic_subclass(attrs=kwargs, polymorphic_identity=sample_type) instance = used_class(**kwargs) instance.sample_type = sample_type logger.debug(f"Creating instance: {instance}") @@ -1953,6 +1986,7 @@ class BasicSample(BaseClass): def delete(self): raise AttributeError(f"Delete not implemented for {self.__class__}") + #Below are the custom sample types class WastewaterSample(BasicSample): @@ -1960,18 +1994,40 @@ class WastewaterSample(BasicSample): Derivative wastewater sample """ id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True) - ww_processing_num = Column(String(64)) #: wastewater processing number - ww_full_sample_id = Column(String(64)) #: full id given by entrics - rsl_number = Column(String(64)) #: rsl plate identification number - collection_date = Column(TIMESTAMP) #: Date sample collected - received_date = Column(TIMESTAMP) #: Date sample received - notes = Column(String(2000)) #: notes from submission form - sample_location = Column(String(8)) #: location on 24 well plate - __mapper_args__ = dict(polymorphic_identity="Wastewater Sample", - polymorphic_load="inline", + ww_processing_num = Column(String(64)) #: wastewater processing number + ww_full_sample_id = Column(String(64)) #: full id given by entrics + rsl_number = Column(String(64)) #: rsl plate identification number + collection_date = Column(TIMESTAMP) #: Date sample collected + received_date = Column(TIMESTAMP) #: Date sample received + notes = Column(String(2000)) #: notes from submission form + sample_location = Column(String(8)) #: location on 24 well plate + __mapper_args__ = dict(polymorphic_identity="Wastewater Sample", + polymorphic_load="inline", inherit_condition=(id == BasicSample.id)) - def to_sub_dict(self, full_data:bool=False) -> dict: + @classmethod + def get_default_info(cls, *args): + dicto = super().get_default_info(*args) + match dicto: + case dict(): + dicto['singles'] += ['ww_processing_num'] + output = {} + for k, v in dicto.items(): + if len(args) > 0 and k not in args: + # logger.debug(f"Don't want {k}") + continue + else: + output[k] = v + if len(args) == 1: + return output[args[0]] + case list(): + if "singles" in args: + dicto += ['ww_processing_num'] + return dicto + case _: + pass + + def to_sub_dict(self, full_data: bool = False) -> dict: """ gui friendly dictionary, extends parent method. @@ -1984,7 +2040,7 @@ class WastewaterSample(BasicSample): sample['Received Date'] = self.received_date sample['Collection Date'] = self.collection_date return sample - + @classmethod def parse_sample(cls, input_dict: dict) -> dict: """ @@ -1995,7 +2051,7 @@ class WastewaterSample(BasicSample): Returns: dict: Updated parser results. - """ + """ output_dict = super().parse_sample(input_dict) if output_dict['rsl_number'] == None: output_dict['rsl_number'] = output_dict['submitter_id'] @@ -2008,7 +2064,7 @@ class WastewaterSample(BasicSample): output_dict['collection_date'] = parse(output_dict['collection_date']).date() except ParserError: logger.error(f"Problem parsing collection_date: {output_dict['collection_date']}") - output_dict['collection_date'] = date(1970,1,1) + output_dict['collection_date'] = date(1970, 1, 1) case datetime(): output_dict['collection_date'] = output_dict['collection_date'].date() case date(): @@ -2016,15 +2072,16 @@ class WastewaterSample(BasicSample): case _: del output_dict['collection_date'] return output_dict - - def get_previous_ww_submission(self, current_artic_submission:WastewaterArtic): + + def get_previous_ww_submission(self, current_artic_submission: WastewaterArtic): # assocs = [assoc for assoc in self.sample_submission_associations if assoc.submission.submission_type_name=="Wastewater"] # subs = self.submissions[:self.submissions.index(current_artic_submission)] try: plates = [item['plate'] for item in current_artic_submission.source_plates] except TypeError as e: logger.error(f"source_plates must not be present") - plates = [item.rsl_plate_num for item in self.submissions[:self.submissions.index(current_artic_submission)]] + plates = [item.rsl_plate_num for item in + self.submissions[:self.submissions.index(current_artic_submission)]] subs = [sub for sub in self.submissions if sub.rsl_plate_num in plates] logger.debug(f"Submissions: {subs}") try: @@ -2032,19 +2089,20 @@ class WastewaterSample(BasicSample): except IndexError: return None + class BacterialCultureSample(BasicSample): """ base of bacterial culture sample """ id = Column(INTEGER, ForeignKey('_basicsample.id'), primary_key=True) - organism = Column(String(64)) #: bacterial specimen - concentration = Column(String(16)) #: sample concentration + organism = Column(String(64)) #: bacterial specimen + concentration = Column(String(16)) #: sample concentration control = relationship("Control", back_populates="sample", uselist=False) - __mapper_args__ = dict(polymorphic_identity="Bacterial Culture Sample", - polymorphic_load="inline", + __mapper_args__ = dict(polymorphic_identity="Bacterial Culture Sample", + polymorphic_load="inline", inherit_condition=(id == BasicSample.id)) - def to_sub_dict(self, full_data:bool=False) -> dict: + def to_sub_dict(self, full_data: bool = False) -> dict: """ gui friendly dictionary, extends parent method. @@ -2057,33 +2115,35 @@ class BacterialCultureSample(BasicSample): sample['Organism'] = self.organism sample['Concentration'] = self.concentration if self.control != None: - sample['colour'] = [0,128,0] + sample['colour'] = [0, 128, 0] sample['tooltip'] = f"Control: {self.control.controltype.name} - {self.control.controltype.targets}" # logger.debug(f"Done converting to {self} to dict after {time()-start}") return sample + # Submission to Sample Associations class SubmissionSampleAssociation(BaseClass): """ table containing submission/sample associations DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html - """ - - id = Column(INTEGER, unique=True, nullable=False) #: id to be used for inheriting purposes - sample_id = Column(INTEGER, ForeignKey("_basicsample.id"), nullable=False) #: id of associated sample - submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission - row = Column(INTEGER, primary_key=True) #: row on the 96 well plate - column = Column(INTEGER, primary_key=True) #: column on the 96 well plate + """ + + id = Column(INTEGER, unique=True, nullable=False) #: id to be used for inheriting purposes + sample_id = Column(INTEGER, ForeignKey("_basicsample.id"), nullable=False) #: id of associated sample + submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission + row = Column(INTEGER, primary_key=True) #: row on the 96 well plate + column = Column(INTEGER, primary_key=True) #: column on the 96 well plate # reference to the Submission object - submission = relationship(BasicSubmission, back_populates="submission_sample_associations") #: associated submission + submission = relationship(BasicSubmission, + back_populates="submission_sample_associations") #: associated submission # reference to the Sample object - sample = relationship(BasicSample, back_populates="sample_submission_associations") #: associated sample + sample = relationship(BasicSample, back_populates="sample_submission_associations") #: associated sample + + base_sub_type = Column(String) #: string of subtype name - base_sub_type = Column(String) #: string of subtype name - # Refers to the type of parent. # Hooooooo boy, polymorphic association type, now we're getting into the weeds! __mapper_args__ = { @@ -2092,7 +2152,8 @@ class SubmissionSampleAssociation(BaseClass): "with_polymorphic": "*", } - def __init__(self, submission:BasicSubmission=None, sample:BasicSample=None, row:int=1, column:int=1, id:int|None=None): + def __init__(self, submission: BasicSubmission = None, sample: BasicSample = None, row: int = 1, column: int = 1, + id: int | None = None): self.submission = submission self.sample = sample self.row = row @@ -2109,14 +2170,14 @@ class SubmissionSampleAssociation(BaseClass): except AttributeError as e: logger.error(f"Unable to construct __repr__ due to: {e}") return super().__repr__() - + def to_sub_dict(self) -> dict: """ Returns a sample dictionary updated with instance information Returns: dict: Updated dictionary with row, column and well updated - """ + """ # Get sample info logger.debug(f"Running {self.__repr__()}") sample = self.sample.to_sub_dict() @@ -2133,14 +2194,14 @@ class SubmissionSampleAssociation(BaseClass): sample['positive'] = False sample['submitted_date'] = self.submission.submitted_date return sample - - def to_hitpick(self) -> dict|None: + + def to_hitpick(self) -> dict | None: """ Outputs a dictionary usable for html plate maps. Returns: dict: dictionary of sample id, row and column in elution plate - """ + """ # Since there is no PCR, negliable result is necessary. sample = self.to_sub_dict() logger.debug(f"Sample dict to hitpick: {sample}") @@ -2161,7 +2222,7 @@ class SubmissionSampleAssociation(BaseClass): Returns: int: incremented id - """ + """ try: return max([item.id for item in cls.query()]) + 1 except ValueError as e: @@ -2169,7 +2230,7 @@ class SubmissionSampleAssociation(BaseClass): return 1 @classmethod - def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None) -> SubmissionSampleAssociation: + def find_polymorphic_subclass(cls, polymorphic_identity: str | None = None) -> SubmissionSampleAssociation: """ Retrieves subclasses of SubmissionSampleAssociation based on type name. @@ -2178,32 +2239,34 @@ class SubmissionSampleAssociation(BaseClass): Returns: SubmissionSampleAssociation: Subclass of interest. - """ + """ if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] if polymorphic_identity == None: output = cls else: try: - output = [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + output = [item for item in cls.__subclasses__() if + item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0] except Exception as e: logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") output = cls logger.debug(f"Using SubmissionSampleAssociation subclass: {output}") return output - + @classmethod @setup_lookup - def query(cls, - submission:BasicSubmission|str|None=None, - exclude_submission_type:str|None=None, - sample:BasicSample|str|None=None, - row:int=0, - column:int=0, - limit:int=0, - chronologic:bool=False, - reverse:bool=False, - ) -> SubmissionSampleAssociation|List[SubmissionSampleAssociation]: + def query(cls, + submission: BasicSubmission | str | None = None, + exclude_submission_type: str | None = None, + sample: BasicSample | str | None = None, + row: int = 0, + column: int = 0, + limit: int = 0, + chronologic: bool = False, + reverse: bool = False, + **kwargs + ) -> SubmissionSampleAssociation | List[SubmissionSampleAssociation]: """ Lookup junction of Submission and Sample in the database @@ -2217,34 +2280,35 @@ class SubmissionSampleAssociation(BaseClass): Returns: models.SubmissionSampleAssociation|List[models.SubmissionSampleAssociation]: Junction(s) of interest - """ + """ query: Query = cls.__database_session__.query(cls) match submission: case BasicSubmission(): # logger.debug(f"Lookup SampleSubmissionAssociation with submission BasicSubmission {submission}") - query = query.filter(cls.submission==submission) + query = query.filter(cls.submission == submission) case str(): # logger.debug(f"Lookup SampleSubmissionAssociation with submission str {submission}") - query = query.join(BasicSubmission).filter(BasicSubmission.rsl_plate_num==submission) + query = query.join(BasicSubmission).filter(BasicSubmission.rsl_plate_num == submission) case _: pass match sample: case BasicSample(): # logger.debug(f"Lookup SampleSubmissionAssociation with sample BasicSample {sample}") - query = query.filter(cls.sample==sample) + query = query.filter(cls.sample == sample) case str(): # logger.debug(f"Lookup SampleSubmissionAssociation with sample str {sample}") - query = query.join(BasicSample).filter(BasicSample.submitter_id==sample) + query = query.join(BasicSample).filter(BasicSample.submitter_id == sample) case _: pass if row > 0: - query = query.filter(cls.row==row) + query = query.filter(cls.row == row) if column > 0: - query = query.filter(cls.column==column) + query = query.filter(cls.column == column) match exclude_submission_type: case str(): # logger.debug(f"filter SampleSubmissionAssociation to exclude submission type {exclude_submission_type}") - query = query.join(BasicSubmission).filter(BasicSubmission.submission_type_name != exclude_submission_type) + query = query.join(BasicSubmission).filter( + BasicSubmission.submission_type_name != exclude_submission_type) case _: pass # logger.debug(f"Query count: {query.count()}") @@ -2255,15 +2319,15 @@ class SubmissionSampleAssociation(BaseClass): query = query.order_by(BasicSubmission.submitted_date.desc()) else: query = query.order_by(BasicSubmission.submitted_date) - return cls.execute_query(query=query, limit=limit) - + return cls.execute_query(query=query, limit=limit, **kwargs) + @classmethod def query_or_create(cls, - association_type:str="Basic Association", - submission:BasicSubmission|str|None=None, - sample:BasicSample|str|None=None, - id:int|None=None, - **kwargs) -> SubmissionSampleAssociation: + association_type: str = "Basic Association", + submission: BasicSubmission | str | None = None, + sample: BasicSample | str | None = None, + # id:int|None=None, + **kwargs) -> SubmissionSampleAssociation: """ Queries for an association, if none exists creates a new one. @@ -2275,13 +2339,13 @@ class SubmissionSampleAssociation(BaseClass): Returns: SubmissionSampleAssociation: Queried or new association. - """ + """ logger.debug(f"Attempting create or query with {kwargs}") match submission: case BasicSubmission(): pass case str(): - submission = BasicSubmission.query(rsl_number=submission) + submission = BasicSubmission.query(rsl_plate_num=submission) case _: raise ValueError() match sample: @@ -2305,32 +2369,33 @@ class SubmissionSampleAssociation(BaseClass): instance = None if instance == None: used_cls = cls.find_polymorphic_subclass(polymorphic_identity=association_type) - instance = used_cls(submission=submission, sample=sample, id=id, **kwargs) + # instance = used_cls(submission=submission, sample=sample, id=id, **kwargs) + instance = used_cls(submission=submission, sample=sample, **kwargs) return instance def delete(self): raise AttributeError(f"Delete not implemented for {self.__class__}") -class WastewaterAssociation(SubmissionSampleAssociation): - - id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True) - ct_n1 = Column(FLOAT(2)) #: AKA ct for N1 - ct_n2 = Column(FLOAT(2)) #: AKA ct for N2 - n1_status = Column(String(32)) #: positive or negative for N1 - n2_status = Column(String(32)) #: positive or negative for N2 - pcr_results = Column(JSON) #: imported PCR status from QuantStudio - __mapper_args__ = dict(polymorphic_identity="Wastewater Association", - polymorphic_load="inline", - inherit_condition=(id==SubmissionSampleAssociation.id)) - +class WastewaterAssociation(SubmissionSampleAssociation): + id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True) + ct_n1 = Column(FLOAT(2)) #: AKA ct for N1 + ct_n2 = Column(FLOAT(2)) #: AKA ct for N2 + n1_status = Column(String(32)) #: positive or negative for N1 + n2_status = Column(String(32)) #: positive or negative for N2 + pcr_results = Column(JSON) #: imported PCR status from QuantStudio + + __mapper_args__ = dict(polymorphic_identity="Wastewater Association", + polymorphic_load="inline", + inherit_condition=(id == SubmissionSampleAssociation.id)) + def to_sub_dict(self) -> dict: """ Returns a sample dictionary updated with instance information. Extends parent Returns: dict: Updated dictionary with row, column and well updated - """ + """ sample = super().to_sub_dict() sample['ct'] = f"({self.ct_n1}, {self.ct_n2})" @@ -2346,10 +2411,11 @@ class WastewaterAssociation(SubmissionSampleAssociation): Returns: dict: dictionary of sample id, row and column in elution plate - """ + """ sample = super().to_hitpick() try: - sample['tooltip'] += f"
- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})
- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})" + sample[ + 'tooltip'] += f"
- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})
- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})" except (TypeError, AttributeError) as e: logger.error(f"Couldn't set tooltip for {self.sample.rsl_number}. Looks like there isn't PCR data.") return sample @@ -2361,9 +2427,9 @@ class WastewaterAssociation(SubmissionSampleAssociation): Returns: int: incremented id - """ + """ try: - parent = [base for base in cls.__bases__ if base.__name__=="SubmissionSampleAssociation"][0] + parent = [base for base in cls.__bases__ if base.__name__ == "SubmissionSampleAssociation"][0] return max([item.id for item in parent.query()]) + 1 except ValueError as e: logger.error(f"Problem incrementing id: {e}") diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index b810c7b..e043c83 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -100,7 +100,7 @@ class SheetParser(object): Enforce that the parser has an extraction kit """ from frontend.widgets.pop_ups import ObjectSelector - if not check_not_nan(self.sub['extraction_kit']['value']): + if 'extraction_kit' not in self.sub.keys() or not check_not_nan(self.sub['extraction_kit']['value']): dlg = ObjectSelector(title="Kit Needed", message="At minimum a kit is needed. Please select one.", obj_type=KitType) if dlg.exec(): self.sub['extraction_kit'] = dict(value=dlg.parse_form(), missing=True) @@ -192,13 +192,18 @@ class InfoParser(object): for k, v in self.map.items(): # exclude from generic parsing if k in exclude_from_generic: + logger.warning(f"Key {k} is excluded due to parser_ignore") continue # If the value is hardcoded put it in the dictionary directly. if isinstance(v, str): dicto[k] = dict(value=v, missing=False) continue logger.debug(f"Looking for {k} in self.map") - if sheet in self.map[k]['sheets']: + try: + check = sheet in self.map[k]['sheets'] + except TypeError: + continue + if check: relevant[k] = v logger.debug(f"relevant map for {sheet}: {pformat(relevant)}") if relevant == {}: @@ -592,7 +597,7 @@ class PCRParser(object): self.plate_num = namer.parsed_name self.submission_type = namer.submission_type logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") - parser = BasicSubmission.find_polymorphic_subclass(self.submission_type) + parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.samples = parser.parse_pcr(xl=self.xl, rsl_number=self.plate_num) def parse_general(self, sheet_name:str): diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 4343ab4..e0112fa 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -576,6 +576,7 @@ class PydSubmission(BaseModel, extra='allow'): association.save() logger.debug(f"Equipment association SQL object to be added to submission: {association.__dict__}") instance.submission_equipment_associations.append(association) + # TODO: case item if item in instance.jsons() case _: try: instance.set_attribute(key=key, value=value) diff --git a/src/submissions/frontend/widgets/submission_details.py b/src/submissions/frontend/widgets/submission_details.py index 2a1b7db..a349b8a 100644 --- a/src/submissions/frontend/widgets/submission_details.py +++ b/src/submissions/frontend/widgets/submission_details.py @@ -82,7 +82,8 @@ class SubmissionDetails(QDialog): """ logger.debug(f"Details for: {submission}") if isinstance(submission, str): - submission = BasicSubmission.query(rsl_number=submission) + # submission = BasicSubmission.query(rsl_number=submission) + submission = BasicSubmission.query(rsl_plate_num=submission) self.base_dict = submission.to_dict(full_data=True) logger.debug(f"Submission details data:\n{pformat({k:v for k,v in self.base_dict.items() if k != 'samples'})}") # don't want id @@ -103,7 +104,8 @@ class SubmissionDetails(QDialog): def sign_off(self, submission:str|BasicSubmission): logger.debug(f"Signing off on {submission} - ({getuser()})") if isinstance(submission, str): - submission = BasicSubmission.query(rsl_number=submission) + # submission = BasicSubmission.query(rsl_number=submission) + submission = BasicSubmission.query(rsl_plate_number=submission) submission.signed_by = getuser() submission.save() self.submission_details(submission=self.rsl_plate_num) diff --git a/src/submissions/frontend/widgets/submission_table.py b/src/submissions/frontend/widgets/submission_table.py index 891b8b1..0c45b6e 100644 --- a/src/submissions/frontend/widgets/submission_table.py +++ b/src/submissions/frontend/widgets/submission_table.py @@ -166,9 +166,8 @@ class SubmissionsSheet(QTableView): for ii in range(6, len(run)): new_run[f"column{str(ii-5)}_vol"] = run[ii] # Lookup imported submissions - # sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num']) - # sub = lookup_submissions(ctx=obj.ctx, rsl_number=new_run['rsl_plate_num']) - sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) + # sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) + sub = BasicSubmission.query(rsl_plate_num=new_run['rsl_plate_num']) # If no such submission exists, move onto the next run if sub == None: continue