From bbcbd35127d03de8fde4960302e58719411b8e9b Mon Sep 17 00:00:00 2001 From: lwark Date: Thu, 16 May 2024 14:07:18 -0500 Subject: [PATCH] Sanity checking. --- CHANGELOG.md | 6 + alembic.ini | 6 +- src/submissions/backend/db/models/__init__.py | 28 +- src/submissions/backend/db/models/controls.py | 24 +- src/submissions/backend/db/models/kits.py | 717 ++++++++++-------- src/submissions/tools.py | 50 +- 6 files changed, 466 insertions(+), 365 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 644f7bd..dc2d0bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 202405.03 + +- Settings can now pull values from the db. +- Improved generic and WW specific PCR parsers. +- Various bug fixes. + ## 202405.01 - New Excel writers diff --git a/alembic.ini b/alembic.ini index 62b4ad9..0cddafa 100644 --- a/alembic.ini +++ b/alembic.ini @@ -55,10 +55,10 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne # are written from script.py.mako # output_encoding = utf-8 -sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db -;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-demo.db +; sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db +; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-demo.db ;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-prototypes.db -;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db +sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db [post_write_hooks] diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index f799190..a6d3bba 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -3,6 +3,8 @@ Contains all models for sqlalchemy ''' from __future__ import annotations import sys, logging + +from sqlalchemy import Column, INTEGER, String, JSON from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.exc import ArgumentError @@ -11,8 +13,6 @@ from pathlib import Path # Load testing environment if 'pytest' in sys.modules: - from pathlib import Path - sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__()) Base: DeclarativeMeta = declarative_base() @@ -46,7 +46,7 @@ class BaseClass(Base): Returns: Session: DB session from ctx settings. """ - if not 'pytest' in sys.modules: + if 'pytest' not in sys.modules: from tools import ctx else: from test_settings import ctx @@ -60,7 +60,7 @@ class BaseClass(Base): Returns: Path: Location of the Submissions directory in Settings object """ - if not 'pytest' in sys.modules: + if 'pytest' not in sys.modules: from tools import ctx else: from test_settings import ctx @@ -74,7 +74,7 @@ class BaseClass(Base): Returns: Path: Location of the Submissions backup directory in Settings object """ - if not 'pytest' in sys.modules: + if 'pytest' not in sys.modules: from tools import ctx else: from test_settings import ctx @@ -104,8 +104,9 @@ class BaseClass(Base): Execute sqlalchemy query. Args: - query (Query): input query object - limit (int): Maximum number of results. (0 = all) + model (Any, optional): model to be queried. Defaults to None + query (Query, optional): input query object. Defaults to None + limit (int): Maximum number of results. (0 = all). Defaults to 0 Returns: Any | List[Any]: Single result if limit = 1 or List if other. @@ -153,6 +154,19 @@ class BaseClass(Base): self.__database_session__.rollback() +class ConfigItem(BaseClass): + id = Column(INTEGER, primary_key=True) + key = Column(String(32)) + value = Column(JSON) + + def __repr__(self): + return f"ConfigItem({self.key} : {self.value})" + + @classmethod + def get_config_items(cls): + return cls.__database_session__.query(cls).all() + + from .controls import * # import order must go: orgs, kit, subs due to circular import issues from .organizations import * diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index f5afca0..6b2a491 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -43,7 +43,7 @@ class ControlType(BaseClass): Returns: ControlType | List[ControlType]: Single result if the limit = 1, else a list. - """ + """ query = cls.__database_session__.query(cls) match name: case str(): @@ -83,7 +83,7 @@ class ControlType(BaseClass): Returns: List[ControlType]: Control types that have targets - """ + """ return [item for item in cls.query() if item.targets != []] @classmethod @@ -93,7 +93,7 @@ class ControlType(BaseClass): Returns: Pattern: Constructed pattern - """ + """ strings = list(set([item.name.split("-")[0] for item in cls.get_positive_control_types()])) return re.compile(rf"(^{'|^'.join(strings)})-.*", flags=re.IGNORECASE) @@ -175,19 +175,19 @@ class Control(BaseClass): output = [] # logger.debug("load json string for mode (i.e. contains, matches, kraken2)") try: - # data = json.loads(getattr(self, mode)) data = self.__getattribute__(mode) except TypeError: data = {} logger.debug(f"Length of data: {len(data)}") # logger.debug("dict keys are genera of bacteria, e.g. 'Streptococcus'") for genus in data: - _dict = {} - _dict['name'] = self.name - _dict['submitted_date'] = self.submitted_date - _dict['genus'] = genus + _dict = dict( + name=self.name, + submitted_date=self.submitted_date, + genus=genus, + target='Target' if genus.strip("*") in self.controltype.targets else "Off-target" + ) # logger.debug("get Target or Off-target of genus") - _dict['target'] = 'Target' if genus.strip("*") in self.controltype.targets else "Off-target" # logger.debug("set 'contains_hashes', etc for genus") for key in data[genus]: _dict[key] = data[genus][key] @@ -247,13 +247,13 @@ class Control(BaseClass): case _: pass # by date range - if start_date != None and end_date == None: + if start_date is not None and end_date is None: logger.warning(f"Start date with no end date, using today.") end_date = date.today() - if end_date != None and start_date == None: + if end_date is not None and start_date is None: logger.warning(f"End date with no start date, using Jan 1, 2023") start_date = date(2023, 1, 1) - if start_date != None: + if start_date is not None: match start_date: case date(): # logger.debug(f"Lookup control by start date({start_date})") diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 65a0d69..94038dc 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -1,10 +1,8 @@ -''' +""" All kit and reagent related models -''' +""" from __future__ import annotations - from copy import copy - from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.ext.associationproxy import association_proxy @@ -20,12 +18,12 @@ logger = logging.getLogger(f'submissions.{__name__}') # logger.debug("Table for ReagentType/Reagent relations") reagenttypes_reagents = Table( - "_reagenttypes_reagents", - Base.metadata, - Column("reagent_id", INTEGER, ForeignKey("_reagent.id")), - Column("reagenttype_id", INTEGER, ForeignKey("_reagenttype.id")), - extend_existing = True - ) + "_reagenttypes_reagents", + Base.metadata, + Column("reagent_id", INTEGER, ForeignKey("_reagent.id")), + Column("reagenttype_id", INTEGER, ForeignKey("_reagenttype.id")), + extend_existing=True +) # logger.debug("Table for EquipmentRole/Equipment relations") equipmentroles_equipment = Table( @@ -72,16 +70,18 @@ kittypes_processes = Table( extend_existing=True ) + class KitType(BaseClass): """ Base of kits used in submission processing - """ - - id = Column(INTEGER, primary_key=True) #: primary key - name = Column(String(64), unique=True) #: name of kit - submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for - processes = relationship("Process", back_populates="kit_types", secondary=kittypes_processes) #: equipment processes used by this kit - + """ + + id = Column(INTEGER, primary_key=True) #: primary key + name = Column(String(64), unique=True) #: name of kit + submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for + processes = relationship("Process", back_populates="kit_types", + secondary=kittypes_processes) #: equipment processes used by this kit + kit_reagenttype_associations = relationship( "KitTypeReagentTypeAssociation", back_populates="kit_type", @@ -89,24 +89,27 @@ class KitType(BaseClass): ) # creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291 - reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type", creator=lambda RT: KitTypeReagentTypeAssociation(reagent_type=RT)) #: Association proxy to KitTypeReagentTypeAssociation + reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type", + creator=lambda RT: KitTypeReagentTypeAssociation( + reagent_type=RT)) #: Association proxy to KitTypeReagentTypeAssociation kit_submissiontype_associations = relationship( "SubmissionTypeKitTypeAssociation", back_populates="kit_type", cascade="all, delete-orphan", - ) #: Relation to SubmissionType + ) #: Relation to SubmissionType - used_for = association_proxy("kit_submissiontype_associations", "submission_type") #: Association proxy to SubmissionTypeKitTypeAssociation + used_for = association_proxy("kit_submissiontype_associations", + "submission_type") #: Association proxy to SubmissionTypeKitTypeAssociation def __repr__(self) -> str: """ Returns: str: A representation of the object. - """ + """ return f"" - - def get_reagents(self, required:bool=False, submission_type:str|SubmissionType|None=None) -> List[ReagentType]: + + def get_reagents(self, required: bool = False, submission_type: str | SubmissionType | None = None) -> List[ReagentType]: """ Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation. @@ -116,14 +119,16 @@ class KitType(BaseClass): Returns: list: List of reagent types - """ + """ match submission_type: case SubmissionType(): # logger.debug(f"Getting reagents by SubmissionType {submission_type}") - relevant_associations = [item for item in self.kit_reagenttype_associations if item.submission_type==submission_type] + relevant_associations = [item for item in self.kit_reagenttype_associations if + item.submission_type == submission_type] case str(): # logger.debug(f"Getting reagents by str {submission_type}") - relevant_associations = [item for item in self.kit_reagenttype_associations if item.submission_type.name==submission_type] + relevant_associations = [item for item in self.kit_reagenttype_associations if + item.submission_type.name == submission_type] case _: # logger.debug(f"Getting reagents") relevant_associations = [item for item in self.kit_reagenttype_associations] @@ -134,51 +139,46 @@ class KitType(BaseClass): return [item.reagent_type for item in relevant_associations] # TODO: Move to BasicSubmission? - def construct_xl_map_for_use(self, submission_type:str|SubmissionType) -> dict: + def construct_xl_map_for_use(self, submission_type: str | SubmissionType) -> dict: """ Creates map of locations in excel workbook for a SubmissionType Args: - use (str | SubmissionType): Submissiontype.name + submission_type (str | SubmissionType): Submissiontype.name Returns: dict: Dictionary containing information locations. - """ - map = {} + """ + info_map = {} # Account for submission_type variable type. match submission_type: case str(): # logger.debug(f"Constructing xl map with str {submission_type}") - assocs = [item for item in self.kit_reagenttype_associations if item.submission_type.name==submission_type] + assocs = [item for item in self.kit_reagenttype_associations if + item.submission_type.name == submission_type] st_assoc = [item for item in self.used_for if submission_type == item.name][0] case SubmissionType(): # logger.debug(f"Constructing xl map with SubmissionType {submission_type}") - assocs = [item for item in self.kit_reagenttype_associations if item.submission_type==submission_type] + assocs = [item for item in self.kit_reagenttype_associations if item.submission_type == submission_type] st_assoc = submission_type case _: raise ValueError(f"Wrong variable type: {type(submission_type)} used!") # logger.debug("Get all KitTypeReagentTypeAssociation for SubmissionType") for assoc in assocs: try: - map[assoc.reagent_type.name] = assoc.uses + info_map[assoc.reagent_type.name] = assoc.uses except TypeError: continue - # # Get SubmissionType info map - # try: - # # map['info'] = st_assoc.info_map - # map['info'] = st_assoc.construct_info_map(mode="write") - # except IndexError as e: - # map['info'] = {} - return map + return info_map @classmethod @setup_lookup - def query(cls, - name:str=None, - used_for:str|SubmissionType|None=None, - id:int|None=None, - limit:int=0 - ) -> KitType|List[KitType]: + def query(cls, + name: str = None, + used_for: str | SubmissionType | None = None, + id: int | None = None, + limit: int = 0 + ) -> KitType | List[KitType]: """ Lookup a list of or single KitType. @@ -190,7 +190,7 @@ class KitType(BaseClass): Returns: models.KitType|List[models.KitType]: KitType(s) of interest. - """ + """ query: Query = cls.__database_session__.query(cls) match used_for: case str(): @@ -204,61 +204,65 @@ class KitType(BaseClass): match name: case str(): # logger.debug(f"Looking up kit type by name str: {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass match id: case int(): # logger.debug(f"Looking up kit type by id int: {id}") - query = query.filter(cls.id==id) + query = query.filter(cls.id == id) limit = 1 case str(): # logger.debug(f"Looking up kit type by id str: {id}") - query = query.filter(cls.id==int(id)) + query = query.filter(cls.id == int(id)) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - + @check_authorization def save(self): super().save() + class ReagentType(BaseClass): """ Base of reagent type abstract - """ - - id = Column(INTEGER, primary_key=True) #: primary key - name = Column(String(64)) #: name of reagent type - instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type - eol_ext = Column(Interval()) #: extension of life interval - + """ + + id = Column(INTEGER, primary_key=True) #: primary key + name = Column(String(64)) #: name of reagent type + instances = relationship("Reagent", back_populates="type", + secondary=reagenttypes_reagents) #: concrete instances of this reagent type + eol_ext = Column(Interval()) #: extension of life interval + reagenttype_kit_associations = relationship( "KitTypeReagentTypeAssociation", back_populates="reagent_type", cascade="all, delete-orphan", - ) #: Relation to KitTypeReagentTypeAssociation + ) #: Relation to KitTypeReagentTypeAssociation # creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291 - kit_types = association_proxy("reagenttype_kit_associations", "kit_type", creator=lambda kit: KitTypeReagentTypeAssociation(kit_type=kit)) #: Association proxy to KitTypeReagentTypeAssociation + kit_types = association_proxy("reagenttype_kit_associations", "kit_type", + creator=lambda kit: KitTypeReagentTypeAssociation( + kit_type=kit)) #: Association proxy to KitTypeReagentTypeAssociation def __repr__(self) -> str: """ Returns: str: Representation of object - """ + """ return f"" - + @classmethod @setup_lookup - def query(cls, - name: str|None=None, - kit_type: KitType|str|None=None, - reagent: Reagent|str|None=None, - limit:int=0, - ) -> ReagentType|List[ReagentType]: + def query(cls, + name: str | None = None, + kit_type: KitType | str | None = None, + reagent: Reagent | str | None = None, + limit: int = 0, + ) -> ReagentType | List[ReagentType]: """ Lookup reagent types in the database. @@ -273,7 +277,7 @@ class ReagentType(BaseClass): Returns: ReagentType|List[ReagentType]: ReagentType or list of ReagentTypes matching filter. - """ + """ query: Query = cls.__database_session__.query(cls) if (kit_type != None and reagent == None) or (reagent != None and kit_type == None): raise ValueError("Cannot filter without both reagent and kit type.") @@ -304,53 +308,57 @@ class ReagentType(BaseClass): match name: case str(): # logger.debug(f"Looking up reagent type by name str: {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - + def to_pydantic(self) -> "PydReagent": """ Create default PydReagent from this object Returns: PydReagent: PydReagent representation of this object. - """ + """ from backend.validators.pydant import PydReagent return PydReagent(lot=None, type=self.name, name=self.name, expiry=date.today()) - + @check_authorization def save(self): super().save() + class Reagent(BaseClass): """ Concrete reagent instance """ - - id = Column(INTEGER, primary_key=True) #: primary key - type = relationship("ReagentType", back_populates="instances", secondary=reagenttypes_reagents) #: joined parent reagent type - type_id = Column(INTEGER, ForeignKey("_reagenttype.id", ondelete='SET NULL', name="fk_reagent_type_id")) #: id of parent reagent type - name = Column(String(64)) #: reagent name - lot = Column(String(64)) #: lot number of reagent - expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically + + id = Column(INTEGER, primary_key=True) #: primary key + type = relationship("ReagentType", back_populates="instances", + secondary=reagenttypes_reagents) #: joined parent reagent type + type_id = Column(INTEGER, ForeignKey("_reagenttype.id", ondelete='SET NULL', + name="fk_reagent_type_id")) #: id of parent reagent type + name = Column(String(64)) #: reagent name + lot = Column(String(64)) #: lot number of reagent + expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically reagent_submission_associations = relationship( "SubmissionReagentAssociation", back_populates="reagent", cascade="all, delete-orphan", - ) #: Relation to SubmissionSampleAssociation - - submissions = association_proxy("reagent_submission_associations", "submission") #: Association proxy to SubmissionSampleAssociation.samples + ) #: Relation to SubmissionSampleAssociation + + submissions = association_proxy("reagent_submission_associations", + "submission") #: Association proxy to SubmissionSampleAssociation.samples def __repr__(self): if self.name != None: return f"" else: return f"" - - def to_sub_dict(self, extraction_kit:KitType=None) -> dict: + + def to_sub_dict(self, extraction_kit: KitType = None) -> dict: """ dictionary containing values necessary for gui @@ -359,7 +367,7 @@ class Reagent(BaseClass): Returns: dict: representation of the reagent's attributes - """ + """ if extraction_kit != None: # Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType try: @@ -390,8 +398,8 @@ class Reagent(BaseClass): expiry=place_holder, missing=False ) - - def update_last_used(self, kit:KitType) -> Report: + + def update_last_used(self, kit: KitType) -> Report: """ Updates last used reagent lot for ReagentType/KitType @@ -400,7 +408,7 @@ class Reagent(BaseClass): Returns: Report: Result of operation - """ + """ report = Report() logger.debug(f"Attempting update of reagent type at intersection of ({self}), ({kit})") rt = ReagentType.query(kit_type=kit, reagent=self, limit=1) @@ -416,16 +424,16 @@ class Reagent(BaseClass): return report report.add_result(Result(msg=f"Updating last used {rt} was not performed.", status="Information")) return report - + @classmethod @setup_lookup - def query(cls, - id:int|None=None, - reagent_type:str|ReagentType|None=None, - lot_number:str|None=None, - name:str|None=None, - limit:int=0 - ) -> Reagent|List[Reagent]: + def query(cls, + id: int | None = None, + reagent_type: str | ReagentType | None = None, + lot_number: str | None = None, + name: str | None = None, + limit: int = 0 + ) -> Reagent | List[Reagent]: """ Lookup a list of reagents from the database. @@ -437,18 +445,18 @@ class Reagent(BaseClass): Returns: models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter. - """ + """ query: Query = cls.__database_session__.query(cls) match id: case int(): - query = query.filter(cls.id==id) + query = query.filter(cls.id == id) limit = 1 case _: pass match reagent_type: case str(): # logger.debug(f"Looking up reagents by reagent type str: {reagent_type}") - query = query.join(cls.type).filter(ReagentType.name==reagent_type) + query = query.join(cls.type).filter(ReagentType.name == reagent_type) case ReagentType(): # logger.debug(f"Looking up reagents by reagent type ReagentType: {reagent_type}") query = query.filter(cls.type.contains(reagent_type)) @@ -458,45 +466,47 @@ class Reagent(BaseClass): case str(): # logger.debug(f"Looking up reagent by name str: {name}") # Not limited due to multiple reagents having same name. - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) case _: pass match lot_number: case str(): # logger.debug(f"Looking up reagent by lot number str: {lot_number}") - query = query.filter(cls.lot==lot_number) + query = query.filter(cls.lot == lot_number) # In this case limit number returned. limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - + + class Discount(BaseClass): """ Relationship table for client labs for certain kits. """ - - id = Column(INTEGER, primary_key=True) #: primary key - kit = relationship("KitType") #: joined parent reagent type - kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete='SET NULL', name="fk_kit_type_id")) #: id of joined kit - client = relationship("Organization") #: joined client lab - client_id = Column(INTEGER, ForeignKey("_organization.id", ondelete='SET NULL', name="fk_org_id")) #: id of joined client - name = Column(String(128)) #: Short description - amount = Column(FLOAT(2)) #: Dollar amount of discount + + id = Column(INTEGER, primary_key=True) #: primary key + kit = relationship("KitType") #: joined parent reagent type + kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete='SET NULL', name="fk_kit_type_id")) #: id of joined kit + client = relationship("Organization") #: joined client lab + client_id = Column(INTEGER, + ForeignKey("_organization.id", ondelete='SET NULL', name="fk_org_id")) #: id of joined client + name = Column(String(128)) #: Short description + amount = Column(FLOAT(2)) #: Dollar amount of discount def __repr__(self) -> str: """ Returns: str: Representation of this object - """ + """ return f"" - + @classmethod @setup_lookup - def query(cls, - organization:Organization|str|int|None=None, - kit_type:KitType|str|int|None=None, - ) -> Discount|List[Discount]: + def query(cls, + organization: Organization | str | int | None = None, + kit_type: KitType | str | int | None = None, + ) -> Discount | List[Discount]: """ Lookup discount objects (union of kit and organization) @@ -510,93 +520,96 @@ class Discount(BaseClass): Returns: models.Discount|List[models.Discount]: Discount(s) of interest. - """ + """ query: Query = cls.__database_session__.query(cls) match organization: case Organization(): # logger.debug(f"Looking up discount with organization Organization: {organization}") - query = query.filter(cls.client==Organization) + query = query.filter(cls.client == Organization) case str(): # logger.debug(f"Looking up discount with organization str: {organization}") - query = query.join(Organization).filter(Organization.name==organization) + query = query.join(Organization).filter(Organization.name == organization) case int(): # logger.debug(f"Looking up discount with organization id: {organization}") - query = query.join(Organization).filter(Organization.id==organization) + query = query.join(Organization).filter(Organization.id == organization) case _: # raise ValueError(f"Invalid value for organization: {organization}") pass match kit_type: case KitType(): # logger.debug(f"Looking up discount with kit type KitType: {kit_type}") - query = query.filter(cls.kit==kit_type) + query = query.filter(cls.kit == kit_type) case str(): # logger.debug(f"Looking up discount with kit type str: {kit_type}") - query = query.join(KitType).filter(KitType.name==kit_type) + query = query.join(KitType).filter(KitType.name == kit_type) case int(): # logger.debug(f"Looking up discount with kit type id: {kit_type}") - query = query.join(KitType).filter(KitType.id==kit_type) + query = query.join(KitType).filter(KitType.id == kit_type) case _: # raise ValueError(f"Invalid value for kit type: {kit_type}") pass return cls.execute_query(query=query) - + @check_authorization def save(self): super().save() - + + class SubmissionType(BaseClass): """ Abstract of types of submissions. - """ - - id = Column(INTEGER, primary_key=True) #: primary key - name = Column(String(128), unique=True) #: name of submission type - info_map = Column(JSON) #: Where basic information is found in the excel workbook corresponding to this type. - defaults = Column(JSON) #: Basic information about this submission type - instances = relationship("BasicSubmission", backref="submission_type") #: Concrete instances of this type. - template_file = Column(BLOB) #: Blank form for this type stored as binary. - processes = relationship("Process", back_populates="submission_types", secondary=submissiontypes_processes) #: Relation to equipment processes used for this type. - sample_map = Column(JSON) #: Where sample information is found in the excel sheet corresponding to this type. + """ + + id = Column(INTEGER, primary_key=True) #: primary key + name = Column(String(128), unique=True) #: name of submission type + info_map = Column(JSON) #: Where basic information is found in the excel workbook corresponding to this type. + defaults = Column(JSON) #: Basic information about this submission type + instances = relationship("BasicSubmission", backref="submission_type") #: Concrete instances of this type. + template_file = Column(BLOB) #: Blank form for this type stored as binary. + processes = relationship("Process", back_populates="submission_types", + secondary=submissiontypes_processes) #: Relation to equipment processes used for this type. + sample_map = Column(JSON) #: Where sample information is found in the excel sheet corresponding to this type. submissiontype_kit_associations = relationship( "SubmissionTypeKitTypeAssociation", back_populates="submission_type", cascade="all, delete-orphan", - ) #: Association of kittypes + ) #: Association of kittypes - kit_types = association_proxy("submissiontype_kit_associations", "kit_type") #: Proxy of kittype association + kit_types = association_proxy("submissiontype_kit_associations", "kit_type") #: Proxy of kittype association submissiontype_equipmentrole_associations = relationship( "SubmissionTypeEquipmentRoleAssociation", back_populates="submission_type", cascade="all, delete-orphan" - ) #: Association of equipmentroles + ) #: Association of equipmentroles - equipment = association_proxy("submissiontype_equipmentrole_associations", "equipment_role") #: Proxy of equipmentrole associations + equipment = association_proxy("submissiontype_equipmentrole_associations", + "equipment_role") #: Proxy of equipmentrole associations submissiontype_kit_rt_associations = relationship( "KitTypeReagentTypeAssociation", back_populates="submission_type", cascade="all, delete-orphan" - ) #: triple association of KitTypes, ReagentTypes, SubmissionTypes + ) #: triple association of KitTypes, ReagentTypes, SubmissionTypes def __repr__(self) -> str: """ Returns: str: Representation of this object. - """ + """ return f"" - + def get_template_file_sheets(self) -> List[str]: """ Gets names of sheet in the stored blank form. Returns: List[str]: List of sheet names - """ + """ return ExcelFile(self.template_file).sheet_names - def set_template_file(self, filepath:Path|str): + def set_template_file(self, filepath: Path | str): """ Sets the binary store to an excel file. @@ -606,30 +619,30 @@ class SubmissionType(BaseClass): Raises: ValueError: Raised if file is not excel file. - """ + """ if isinstance(filepath, str): filepath = Path(filepath) try: xl = ExcelFile(filepath) except ValueError: raise ValueError(f"File {filepath} is not of appropriate type.") - with open (filepath, "rb") as f: + with open(filepath, "rb") as f: data = f.read() self.template_file = data - self.save() + self.save() - def construct_info_map(self, mode:Literal['read', 'write']) -> dict: + def construct_info_map(self, mode: Literal['read', 'write']) -> dict: info = self.info_map logger.debug(f"Info map: {info}") output = {} # for k,v in info.items(): - # info[k]['write'] += info[k]['read'] + # info[k]['write'] += info[k]['read'] match mode: case "read": - output = {k:v[mode] for k,v in info.items() if v[mode]} + output = {k: v[mode] for k, v in info.items() if v[mode]} case "write": - output = {k:v[mode] + v['read'] for k,v in info.items() if v[mode] or v['read']} - output = {k:v for k, v in output.items() if all([isinstance(item, dict) for item in v])} + output = {k: v[mode] + v['read'] for k, v in info.items() if v[mode] or v['read']} + output = {k: v for k, v in output.items() if all([isinstance(item, dict) for item in v])} return output def construct_sample_map(self): @@ -655,16 +668,16 @@ class SubmissionType(BaseClass): # output.append(map) return output - def get_equipment(self, extraction_kit:str|KitType|None=None) -> List['PydEquipmentRole']: + def get_equipment(self, extraction_kit: str | KitType | None = None) -> List['PydEquipmentRole']: """ Returns PydEquipmentRole of all equipment associated with this SubmissionType Returns: List['PydEquipmentRole']: List of equipment roles - """ + """ return [item.to_pydantic(submission_type=self, extraction_kit=extraction_kit) for item in self.equipment] - - def get_processes_for_role(self, equipment_role:str|EquipmentRole, kit:str|KitType|None=None) -> list: + + def get_processes_for_role(self, equipment_role: str | EquipmentRole, kit: str | KitType | None = None) -> list: """ Get processes associated with this SubmissionType for an EquipmentRole @@ -677,17 +690,19 @@ class SubmissionType(BaseClass): Returns: list: list of associated processes - """ + """ match equipment_role: case str(): # logger.debug(f"Getting processes for equipmentrole str {equipment_role}") - relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if item.equipment_role.name==equipment_role] + relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if + item.equipment_role.name == equipment_role] case EquipmentRole(): # logger.debug(f"Getting processes for equipmentrole EquipmentRole {equipment_role}") - relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if item.equipment_role==equipment_role] + relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if + item.equipment_role == equipment_role] case _: raise TypeError(f"Type {type(equipment_role)} is not allowed") - return list(set([item for items in relevant for item in items if item != None ])) + return list(set([item for items in relevant for item in items if item != None])) def get_submission_class(self): from .submissions import BasicSubmission @@ -695,11 +710,11 @@ class SubmissionType(BaseClass): @classmethod @setup_lookup - def query(cls, - name:str|None=None, - key:str|None=None, - limit:int=0 - ) -> SubmissionType|List[SubmissionType]: + def query(cls, + name: str | None = None, + key: str | None = None, + limit: int = 0 + ) -> SubmissionType | List[SubmissionType]: """ Lookup submission type in the database by a number of parameters @@ -710,45 +725,50 @@ class SubmissionType(BaseClass): Returns: models.SubmissionType|List[models.SubmissionType]: SubmissionType(s) of interest. - """ + """ query: Query = cls.__database_session__.query(cls) match name: case str(): # logger.debug(f"Looking up submission type by name str: {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass match key: case str(): # logger.debug(f"Looking up submission type by info-map key str: {key}") - query = query.filter(cls.info_map.op('->')(key)!=None) + query = query.filter(cls.info_map.op('->')(key) != None) case _: pass return cls.execute_query(query=query, limit=limit) - + @check_authorization def save(self): """ Adds this instances to the database and commits. - """ + """ super().save() - + + class SubmissionTypeKitTypeAssociation(BaseClass): """ Abstract of relationship between kits and their submission type. - """ - - submission_types_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) #: id of joined submission type - kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of joined kit - mutable_cost_column = Column(FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc) - mutable_cost_sample = Column(FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc) - constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc) + """ - kit_type = relationship(KitType, back_populates="kit_submissiontype_associations") #: joined kittype + submission_types_id = Column(INTEGER, ForeignKey("_submissiontype.id"), + primary_key=True) #: id of joined submission type + kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of joined kit + mutable_cost_column = Column( + FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc) + mutable_cost_sample = Column( + FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc) + constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc) + + kit_type = relationship(KitType, back_populates="kit_submissiontype_associations") #: joined kittype # reference to the "SubmissionType" object - submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_associations") #: joined submission type + submission_type = relationship(SubmissionType, + back_populates="submissiontype_kit_associations") #: joined submission type def __init__(self, kit_type=None, submission_type=None): self.kit_type = kit_type @@ -761,16 +781,16 @@ class SubmissionTypeKitTypeAssociation(BaseClass): """ Returns: str: Representation of this object - """ + """ return f"" @classmethod @setup_lookup - def query(cls, - submission_type:SubmissionType|str|int|None=None, - kit_type:KitType|str|int|None=None, - limit:int=0 - ) -> SubmissionTypeKitTypeAssociation|List[SubmissionTypeKitTypeAssociation]: + def query(cls, + submission_type: SubmissionType | str | int | None = None, + kit_type: KitType | str | int | None = None, + limit: int = 0 + ) -> SubmissionTypeKitTypeAssociation | List[SubmissionTypeKitTypeAssociation]: """ Lookup SubmissionTypeKitTypeAssociations of interest. @@ -781,50 +801,55 @@ class SubmissionTypeKitTypeAssociation(BaseClass): Returns: SubmissionTypeKitTypeAssociation|List[SubmissionTypeKitTypeAssociation]: SubmissionTypeKitTypeAssociation(s) of interest - """ + """ query: Query = cls.__database_session__.query(cls) match submission_type: case SubmissionType(): # logger.debug(f"Looking up {cls.__name__} by SubmissionType {submission_type}") - query = query.filter(cls.submission_type==submission_type) + query = query.filter(cls.submission_type == submission_type) case str(): # logger.debug(f"Looking up {cls.__name__} by name {submission_type}") - query = query.join(SubmissionType).filter(SubmissionType.name==submission_type) + query = query.join(SubmissionType).filter(SubmissionType.name == submission_type) case int(): # logger.debug(f"Looking up {cls.__name__} by id {submission_type}") - query = query.join(SubmissionType).filter(SubmissionType.id==submission_type) + query = query.join(SubmissionType).filter(SubmissionType.id == submission_type) match kit_type: case KitType(): # logger.debug(f"Looking up {cls.__name__} by KitType {kit_type}") - query = query.filter(cls.kit_type==kit_type) + query = query.filter(cls.kit_type == kit_type) case str(): # logger.debug(f"Looking up {cls.__name__} by name {kit_type}") - query = query.join(KitType).filter(KitType.name==kit_type) + query = query.join(KitType).filter(KitType.name == kit_type) case int(): # logger.debug(f"Looking up {cls.__name__} by id {kit_type}") - query = query.join(KitType).filter(KitType.id==kit_type) + query = query.join(KitType).filter(KitType.id == kit_type) limit = query.count() return cls.execute_query(query=query, limit=limit) + class KitTypeReagentTypeAssociation(BaseClass): """ table containing reagenttype/kittype associations DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html - """ - - reagent_types_id = Column(INTEGER, ForeignKey("_reagenttype.id"), primary_key=True) #: id of associated reagent type - kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of associated reagent type - submission_type_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) - uses = Column(JSON) #: map to location on excel sheets of different submission types - required = Column(INTEGER) #: whether the reagent type is required for the kit (Boolean 1 or 0) - last_used = Column(String(32)) #: last used lot number of this type of reagent + """ - kit_type = relationship(KitType, back_populates="kit_reagenttype_associations") #: relationship to associated KitType + reagent_types_id = Column(INTEGER, ForeignKey("_reagenttype.id"), + primary_key=True) #: id of associated reagent type + kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of associated reagent type + submission_type_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) + uses = Column(JSON) #: map to location on excel sheets of different submission types + required = Column(INTEGER) #: whether the reagent type is required for the kit (Boolean 1 or 0) + last_used = Column(String(32)) #: last used lot number of this type of reagent + + kit_type = relationship(KitType, + back_populates="kit_reagenttype_associations") #: relationship to associated KitType # reference to the "ReagentType" object - reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations") #: relationship to associated ReagentType + reagent_type = relationship(ReagentType, + back_populates="reagenttype_kit_associations") #: relationship to associated ReagentType - submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_rt_associations") #: relationship to associated SubmissionType + submission_type = relationship(SubmissionType, + back_populates="submissiontype_kit_rt_associations") #: relationship to associated SubmissionType def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1): self.kit_type = kit_type @@ -849,11 +874,11 @@ class KitTypeReagentTypeAssociation(BaseClass): Returns: _type_: value - """ + """ if not 0 <= value < 2: raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') return value - + @validates('reagenttype') def validate_reagenttype(self, key, value): """ @@ -868,18 +893,18 @@ class KitTypeReagentTypeAssociation(BaseClass): Returns: _type_: ReagentType - """ + """ if not isinstance(value, ReagentType): raise ValueError(f'{value} is not a reagenttype') return value - + @classmethod @setup_lookup def query(cls, - kit_type:KitType|str|None=None, - reagent_type:ReagentType|str|None=None, - limit:int=0 - ) -> KitTypeReagentTypeAssociation|List[KitTypeReagentTypeAssociation]: + kit_type: KitType | str | None = None, + reagent_type: ReagentType | str | None = None, + limit: int = 0 + ) -> KitTypeReagentTypeAssociation | List[KitTypeReagentTypeAssociation]: """ Lookup junction of ReagentType and KitType @@ -890,49 +915,51 @@ class KitTypeReagentTypeAssociation(BaseClass): Returns: models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest. - """ + """ query: Query = cls.__database_session__.query(cls) match kit_type: case KitType(): # logger.debug(f"Lookup KitTypeReagentTypeAssociation by kit_type KitType {kit_type}") - query = query.filter(cls.kit_type==kit_type) + query = query.filter(cls.kit_type == kit_type) case str(): # logger.debug(f"Lookup KitTypeReagentTypeAssociation by kit_type str {kit_type}") - query = query.join(KitType).filter(KitType.name==kit_type) + query = query.join(KitType).filter(KitType.name == kit_type) case _: pass match reagent_type: case ReagentType(): # logger.debug(f"Lookup KitTypeReagentTypeAssociation by reagent_type ReagentType {reagent_type}") - query = query.filter(cls.reagent_type==reagent_type) + query = query.filter(cls.reagent_type == reagent_type) case str(): # logger.debug(f"Lookup KitTypeReagentTypeAssociation by reagent_type ReagentType {reagent_type}") - query = query.join(ReagentType).filter(ReagentType.name==reagent_type) + query = query.join(ReagentType).filter(ReagentType.name == reagent_type) case _: pass if kit_type != None and reagent_type != None: limit = 1 return cls.execute_query(query=query, limit=limit) + class SubmissionReagentAssociation(BaseClass): """ table containing submission/reagent associations DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html - """ - - reagent_id = Column(INTEGER, ForeignKey("_reagent.id"), primary_key=True) #: id of associated reagent - submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission - comments = Column(String(1024)) #: Comments about reagents + """ - submission = relationship("BasicSubmission", back_populates="submission_reagent_associations") #: associated submission + reagent_id = Column(INTEGER, ForeignKey("_reagent.id"), primary_key=True) #: id of associated reagent + submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission + comments = Column(String(1024)) #: Comments about reagents - reagent = relationship(Reagent, back_populates="reagent_submission_associations") #: associated reagent + submission = relationship("BasicSubmission", + back_populates="submission_reagent_associations") #: associated submission + + reagent = relationship(Reagent, back_populates="reagent_submission_associations") #: associated reagent def __repr__(self) -> str: """ Returns: str: Representation of this SubmissionReagentAssociation - """ + """ return f"<{self.submission.rsl_plate_num}&{self.reagent.lot}>" def __init__(self, reagent=None, submission=None): @@ -945,10 +972,10 @@ class SubmissionReagentAssociation(BaseClass): @classmethod @setup_lookup - def query(cls, - submission:"BasicSubmission"|str|int|None=None, - reagent:Reagent|str|None=None, - limit:int=0) -> SubmissionReagentAssociation|List[SubmissionReagentAssociation]: + def query(cls, + submission: "BasicSubmission" | str | int | None = None, + reagent: Reagent | str | None = None, + limit: int = 0) -> SubmissionReagentAssociation | List[SubmissionReagentAssociation]: """ Lookup SubmissionReagentAssociations of interest. @@ -959,7 +986,7 @@ class SubmissionReagentAssociation(BaseClass): Returns: SubmissionReagentAssociation|List[SubmissionReagentAssociation]: SubmissionReagentAssociation(s) of interest - """ + """ from . import BasicSubmission query: Query = cls.__database_session__.query(cls) match reagent: @@ -967,7 +994,7 @@ class SubmissionReagentAssociation(BaseClass): # logger.debug(f"Lookup SubmissionReagentAssociation by reagent Reagent {reagent}") if isinstance(reagent, str): reagent = Reagent.query(lot_number=reagent) - query = query.filter(cls.reagent==reagent) + query = query.filter(cls.reagent == reagent) case _: pass match submission: @@ -976,11 +1003,11 @@ class SubmissionReagentAssociation(BaseClass): # submission = BasicSubmission.query(rsl_number=submission) submission = BasicSubmission.query(rsl_plate_num=submission) # logger.debug(f"Lookup SubmissionReagentAssociation by submission BasicSubmission {submission}") - query = query.filter(cls.submission==submission) + query = query.filter(cls.submission == submission) case int(): # logger.debug(f"Lookup SubmissionReagentAssociation by submission id {submission}") - submission = BasicSubmission.query(id=submission) - query = query.join(BasicSubmission).filter(BasicSubmission.id==submission) + submission = BasicSubmission.query(id=submission) + query = query.join(BasicSubmission).filter(BasicSubmission.id == submission) case _: pass return cls.execute_query(query=query, limit=limit) @@ -994,39 +1021,43 @@ class SubmissionReagentAssociation(BaseClass): Returns: dict: This SubmissionReagentAssociation as dict - """ + """ output = self.reagent.to_sub_dict(extraction_kit) output['comments'] = self.comments return output + class Equipment(BaseClass): """ A concrete instance of equipment """ - - id = Column(INTEGER, primary_key=True) #: id, primary key - name = Column(String(64)) #: equipment name - nickname = Column(String(64)) #: equipment nickname - asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will) - roles = relationship("EquipmentRole", back_populates="instances", secondary=equipmentroles_equipment) #: relation to EquipmentRoles - processes = relationship("Process", back_populates="equipment", secondary=equipment_processes) #: relation to Processes + + id = Column(INTEGER, primary_key=True) #: id, primary key + name = Column(String(64)) #: equipment name + nickname = Column(String(64)) #: equipment nickname + asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will) + roles = relationship("EquipmentRole", back_populates="instances", + secondary=equipmentroles_equipment) #: relation to EquipmentRoles + processes = relationship("Process", back_populates="equipment", + secondary=equipment_processes) #: relation to Processes equipment_submission_associations = relationship( "SubmissionEquipmentAssociation", back_populates="equipment", cascade="all, delete-orphan", - ) #: Association with BasicSubmission + ) #: Association with BasicSubmission - submissions = association_proxy("equipment_submission_associations", "submission") #: proxy to equipment_submission_associations.submission + submissions = association_proxy("equipment_submission_associations", + "submission") #: proxy to equipment_submission_associations.submission def __repr__(self) -> str: """ Returns: str: represenation of this Equipment - """ + """ return f"" - - def to_dict(self, processes:bool=False) -> dict: + + def to_dict(self, processes: bool = False) -> dict: """ This Equipment as a dictionary @@ -1035,13 +1066,13 @@ class Equipment(BaseClass): Returns: dict: _description_ - """ + """ if not processes: - return {k:v for k,v in self.__dict__.items() if k != 'processes'} + return {k: v for k, v in self.__dict__.items() if k != 'processes'} else: - return {k:v for k,v in self.__dict__.items()} + return {k: v for k, v in self.__dict__.items()} - def get_processes(self, submission_type:SubmissionType, extraction_kit:str|KitType|None=None) -> List[str]: + def get_processes(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None) -> List[str]: """ Get all processes associated with this Equipment for a given SubmissionType @@ -1051,12 +1082,13 @@ class Equipment(BaseClass): Returns: List[Process]: List of process names - """ + """ processes = [process for process in self.processes if submission_type in process.submission_types] match extraction_kit: case str(): # logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}") - processes = [process for process in processes if extraction_kit in [kit.name for kit in process.kit_types]] + processes = [process for process in processes if + extraction_kit in [kit.name for kit in process.kit_types]] case KitType(): # logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}") processes = [process for process in processes if extraction_kit in process.kit_types] @@ -1070,12 +1102,12 @@ class Equipment(BaseClass): @classmethod @setup_lookup - def query(cls, - name:str|None=None, - nickname:str|None=None, - asset_number:str|None=None, - limit:int=0 - ) -> Equipment|List[Equipment]: + def query(cls, + name: str | None = None, + nickname: str | None = None, + asset_number: str | None = None, + limit: int = 0 + ) -> Equipment | List[Equipment]: """ Lookup a list of or single Equipment. @@ -1087,32 +1119,33 @@ class Equipment(BaseClass): Returns: Equipment|List[Equipment]: Equipment or list of equipment matching query parameters. - """ + """ query = cls.__database_session__.query(cls) match name: case str(): # logger.debug(f"Lookup Equipment by name str {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass match nickname: case str(): # logger.debug(f"Lookup Equipment by nickname str {nickname}") - query = query.filter(cls.nickname==nickname) + query = query.filter(cls.nickname == nickname) limit = 1 case _: pass match asset_number: case str(): # logger.debug(f"Lookup Equipment by asset_number str {asset_number}") - query = query.filter(cls.asset_number==asset_number) + query = query.filter(cls.asset_number == asset_number) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - - def to_pydantic(self, submission_type:SubmissionType, extraction_kit:str|KitType|None=None) -> "PydEquipment": + + def to_pydantic(self, submission_type: SubmissionType, + extraction_kit: str | KitType | None = None) -> "PydEquipment": """ Creates PydEquipment of this Equipment @@ -1124,7 +1157,9 @@ class Equipment(BaseClass): PydEquipment: _description_ """ from backend.validators.pydant import PydEquipment - return PydEquipment(processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=None, **self.to_dict(processes=False)) + return PydEquipment( + processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=None, + **self.to_dict(processes=False)) @classmethod def get_regex(cls) -> re.Pattern: @@ -1133,48 +1168,52 @@ class Equipment(BaseClass): Returns: re.Pattern: regex - """ + """ return re.compile(r""" (?P50\d{5}$)| (?PHC-\d{6}$)| (?P[^\d][A-Z0-9]{6}$)| (?P[A-Z]{3}-\d{2}-[A-Z]-[A-Z]$)| - (?P\d{4}-\d{3}-\d{3}-\d$)""", + (?P\d{4}-\d{3}-\d{3}-\d$)""", re.VERBOSE) + class EquipmentRole(BaseClass): """ Abstract roles for equipment """ - - id = Column(INTEGER, primary_key=True) #: Role id, primary key - name = Column(String(32)) #: Common name - instances = relationship("Equipment", back_populates="roles", secondary=equipmentroles_equipment) #: Concrete instances (Equipment) of role - processes = relationship("Process", back_populates='equipment_roles', secondary=equipmentroles_processes) #: Associated Processes + + id = Column(INTEGER, primary_key=True) #: Role id, primary key + name = Column(String(32)) #: Common name + instances = relationship("Equipment", back_populates="roles", + secondary=equipmentroles_equipment) #: Concrete instances (Equipment) of role + processes = relationship("Process", back_populates='equipment_roles', + secondary=equipmentroles_processes) #: Associated Processes equipmentrole_submissiontype_associations = relationship( "SubmissionTypeEquipmentRoleAssociation", back_populates="equipment_role", cascade="all, delete-orphan", - ) #: relation to SubmissionTypes + ) #: relation to SubmissionTypes - submission_types = association_proxy("equipmentrole_submissiontype_associations", "submission_type") #: proxy to equipmentrole_submissiontype_associations.submission_type + submission_types = association_proxy("equipmentrole_submissiontype_associations", + "submission_type") #: proxy to equipmentrole_submissiontype_associations.submission_type def __repr__(self) -> str: """ Returns: str: Representation of this EquipmentRole - """ + """ return f"" - + def to_dict(self) -> dict: """ This EquipmentRole as a dictionary Returns: dict: This EquipmentRole dict - """ + """ output = {} for key, value in self.__dict__.items(): match key: @@ -1185,7 +1224,8 @@ class EquipmentRole(BaseClass): output[key] = value return output - def to_pydantic(self, submission_type:SubmissionType, extraction_kit:str|KitType|None=None) -> "PydEquipmentRole": + def to_pydantic(self, submission_type: SubmissionType, + extraction_kit: str | KitType | None = None) -> "PydEquipmentRole": """ Creates a PydEquipmentRole of this EquipmentRole @@ -1195,18 +1235,20 @@ class EquipmentRole(BaseClass): Returns: PydEquipmentRole: This EquipmentRole as PydEquipmentRole - """ + """ from backend.validators.pydant import PydEquipmentRole # logger.debug("Creating list of PydEquipment in this role") - equipment = [item.to_pydantic(submission_type=submission_type, extraction_kit=extraction_kit) for item in self.instances] + equipment = [item.to_pydantic(submission_type=submission_type, extraction_kit=extraction_kit) for item in + self.instances] pyd_dict = self.to_dict() # logger.debug("Creating list of Processes in this role") pyd_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit) return PydEquipmentRole(equipment=equipment, **pyd_dict) - + @classmethod @setup_lookup - def query(cls, name:str|None=None, id:int|None=None, limit:int=0) -> EquipmentRole|List[EquipmentRole]: + def query(cls, name: str | None = None, id: int | None = None, limit: int = 0) -> EquipmentRole | List[ + EquipmentRole]: """ Lookup Equipment roles. @@ -1217,25 +1259,26 @@ class EquipmentRole(BaseClass): Returns: EquipmentRole|List[EquipmentRole]: List of EquipmentRoles matching criteria - """ + """ query = cls.__database_session__.query(cls) match id: case int(): # logger.debug(f"Lookup EquipmentRole by id {id}") - query = query.filter(cls.id==id) + query = query.filter(cls.id == id) limit = 1 case _: pass match name: case str(): # logger.debug(f"Lookup EquipmentRole by name str {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - - def get_processes(self, submission_type:str|SubmissionType|None, extraction_kit:str|KitType|None=None) -> List[Process]: + + def get_processes(self, submission_type: str | SubmissionType | None, + extraction_kit: str | KitType | None = None) -> List[Process]: """ Get processes used by this EquipmentRole @@ -1245,7 +1288,7 @@ class EquipmentRole(BaseClass): Returns: List[Process]: _description_ - """ + """ if isinstance(submission_type, str): # logger.debug(f"Checking if str {submission_type} exists") submission_type = SubmissionType.query(name=submission_type) @@ -1269,27 +1312,30 @@ class EquipmentRole(BaseClass): else: return output + class SubmissionEquipmentAssociation(BaseClass): """ Abstract association between BasicSubmission and Equipment """ - - equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated equipment - submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission - role = Column(String(64), primary_key=True) #: name of the role the equipment fills - process_id = Column(INTEGER, ForeignKey("_process.id",ondelete="SET NULL", name="SEA_Process_id")) #: Foreign key of process id - start_time = Column(TIMESTAMP) #: start time of equipment use - end_time = Column(TIMESTAMP) #: end time of equipment use - comments = Column(String(1024)) #: comments about equipment - - submission = relationship("BasicSubmission", back_populates="submission_equipment_associations") #: associated submission - equipment = relationship(Equipment, back_populates="equipment_submission_associations") #: associated equipment + equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated equipment + submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission + role = Column(String(64), primary_key=True) #: name of the role the equipment fills + process_id = Column(INTEGER, ForeignKey("_process.id", ondelete="SET NULL", + name="SEA_Process_id")) #: Foreign key of process id + start_time = Column(TIMESTAMP) #: start time of equipment use + end_time = Column(TIMESTAMP) #: end time of equipment use + comments = Column(String(1024)) #: comments about equipment + + submission = relationship("BasicSubmission", + back_populates="submission_equipment_associations") #: associated submission + + equipment = relationship(Equipment, back_populates="equipment_submission_associations") #: associated equipment def __repr__(self): return f"" - def __init__(self, submission, equipment, role:str="None"): + def __init__(self, submission, equipment, role: str = "None"): self.submission = submission self.equipment = equipment self.role = role @@ -1300,26 +1346,32 @@ class SubmissionEquipmentAssociation(BaseClass): Returns: dict: This SubmissionEquipmentAssociation as a dictionary - """ + """ try: process = self.process.name except AttributeError: process = "No process found" - output = dict(name=self.equipment.name, asset_number=self.equipment.asset_number, comment=self.comments, processes=[process], role=self.role, nickname=self.equipment.nickname) + output = dict(name=self.equipment.name, asset_number=self.equipment.asset_number, comment=self.comments, + processes=[process], role=self.role, nickname=self.equipment.nickname) return output + class SubmissionTypeEquipmentRoleAssociation(BaseClass): """ Abstract association between SubmissionType and EquipmentRole """ - equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment - submissiontype_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) #: id of associated submission - uses = Column(JSON) #: locations of equipment on the submission type excel sheet. - static = Column(INTEGER, default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? + equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment + submissiontype_id = Column(INTEGER, ForeignKey("_submissiontype.id"), + primary_key=True) #: id of associated submission + uses = Column(JSON) #: locations of equipment on the submission type excel sheet. + static = Column(INTEGER, + default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list? - submission_type = relationship(SubmissionType, back_populates="submissiontype_equipmentrole_associations") #: associated submission + submission_type = relationship(SubmissionType, + back_populates="submissiontype_equipmentrole_associations") #: associated submission - equipment_role = relationship(EquipmentRole, back_populates="equipmentrole_submissiontype_associations") #: associated equipment + equipment_role = relationship(EquipmentRole, + back_populates="equipmentrole_submissiontype_associations") #: associated equipment @validates('static') def validate_age(self, key, value): @@ -1335,12 +1387,12 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): Returns: _type_: value - """ + """ if not 0 <= value < 2: raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') return value - - def get_all_processes(self, extraction_kit:KitType|str|None=None) -> List[Process]: + + def get_all_processes(self, extraction_kit: KitType | str | None = None) -> List[Process]: """ Get all processes associated with this SubmissionTypeEquipmentRole @@ -1349,10 +1401,10 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): Returns: List[Process]: All associated processes - """ + """ processes = [equipment.get_processes(self.submission_type) for equipment in self.equipment_role.instances] # flatten list - processes = [item for items in processes for item in items if item != None ] + processes = [item for items in processes for item in items if item != None] match extraction_kit: case str(): # logger.debug(f"Filtering Processes by extraction_kit str {extraction_kit}") @@ -1368,29 +1420,35 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): def save(self): super().save() + class Process(BaseClass): """ A Process is a method used by a piece of equipment. - """ - - id = Column(INTEGER, primary_key=True) #: Process id, primary key - name = Column(String(64)) #: Process name - submission_types = relationship("SubmissionType", back_populates='processes', secondary=submissiontypes_processes) #: relation to SubmissionType - equipment = relationship("Equipment", back_populates='processes', secondary=equipment_processes) #: relation to Equipment - equipment_roles = relationship("EquipmentRole", back_populates='processes', secondary=equipmentroles_processes) #: relation to EquipmentRoles - submissions = relationship("SubmissionEquipmentAssociation", backref='process') #: relation to SubmissionEquipmentAssociation - kit_types = relationship("KitType", back_populates='processes', secondary=kittypes_processes) #: relation to KitType + """ + + id = Column(INTEGER, primary_key=True) #: Process id, primary key + name = Column(String(64)) #: Process name + submission_types = relationship("SubmissionType", back_populates='processes', + secondary=submissiontypes_processes) #: relation to SubmissionType + equipment = relationship("Equipment", back_populates='processes', + secondary=equipment_processes) #: relation to Equipment + equipment_roles = relationship("EquipmentRole", back_populates='processes', + secondary=equipmentroles_processes) #: relation to EquipmentRoles + submissions = relationship("SubmissionEquipmentAssociation", + backref='process') #: relation to SubmissionEquipmentAssociation + kit_types = relationship("KitType", back_populates='processes', + secondary=kittypes_processes) #: relation to KitType def __repr__(self) -> str: """ Returns: str: Representation of this Process - """ + """ return f"" - + @classmethod @setup_lookup - def query(cls, name:str|None=None, limit:int=0) -> Process|List[Process]: + def query(cls, name: str | None = None, limit: int = 0) -> Process | List[Process]: """ Lookup Processes @@ -1400,14 +1458,13 @@ class Process(BaseClass): Returns: Process|List[Process]: Process(es) matching criteria - """ + """ query = cls.__database_session__.query(cls) match name: case str(): # logger.debug(f"Lookup Process with name str {name}") - query = query.filter(cls.name==name) + query = query.filter(cls.name == name) limit = 1 case _: pass return cls.execute_query(query=query, limit=limit) - diff --git a/src/submissions/tools.py b/src/submissions/tools.py index 38188b4..f03296d 100644 --- a/src/submissions/tools.py +++ b/src/submissions/tools.py @@ -2,6 +2,8 @@ Contains miscellaenous functions used by both frontend and backend. ''' from __future__ import annotations + +import json from pathlib import Path import numpy as np import logging, re, yaml, sys, os, stat, platform, getpass, inspect, csv @@ -10,7 +12,7 @@ from jinja2 import Environment, FileSystemLoader from logging import handlers from pathlib import Path from sqlalchemy.orm import Session -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text from pydantic import field_validator, BaseModel, Field from pydantic_settings import BaseSettings, SettingsConfigDict from typing import Any, Tuple, Literal, List @@ -137,7 +139,7 @@ def get_first_blank_df_row(df:pd.DataFrame) -> int: # Settings -class Settings(BaseSettings): +class Settings(BaseSettings, extra="allow"): """ Pydantic model to hold settings @@ -147,21 +149,26 @@ class Settings(BaseSettings): """ directory_path: Path database_path: Path|str|None = None - backup_path: Path - super_users: list|None = None - power_users: list|None = None - rerun_regex: str + backup_path: Path|str|None = None + # super_users: list|None = None + # power_users: list|None = None + # rerun_regex: str submission_types: dict|None = None database_session: Session|None = None package: Any|None = None model_config = SettingsConfigDict(env_file_encoding='utf-8') - @field_validator('backup_path') + @field_validator('backup_path', mode="before") @classmethod - def set_backup_path(cls, value): - if isinstance(value, str): - value = Path(value) + def set_backup_path(cls, value, values): + match value: + case str(): + value = Path(value) + case None: + value = values.data['directory_path'].joinpath("Database backups") + if not value.exists(): + value.mkdir(parents=True) # metadata.backup_path = value return value @@ -177,11 +184,14 @@ class Settings(BaseSettings): @field_validator('database_path', mode="before") @classmethod - def ensure_database_exists(cls, value): + def ensure_database_exists(cls, value, values): if value == ":memory:": return value - if isinstance(value, str): - value = Path(value) + match value: + case str(): + value = Path(value) + case None: + value = values.data['directory_path'].joinpath("submissions.db") if value.exists(): return value else: @@ -225,6 +235,20 @@ class Settings(BaseSettings): if value == None: return package + def __init__(self, *args, **kwargs): + + super().__init__(*args, **kwargs) + # self.set_from_db(db_path=kwargs['database_path']) + + def set_from_db(self, db_path:Path): + session = Session(create_engine(f"sqlite:///{db_path}")) + config_items = session.execute(text("SELECT * FROM _configitem")).all() + session.close() + config_items = {item[1]:json.loads(item[2]) for item in config_items} + for k, v in config_items.items(): + if not hasattr(self, k): + self.__setattr__(k, v) + def get_config(settings_path: Path|str|None=None) -> Settings: """ Get configuration settings from path or default if blank.