2317 lines
89 KiB
Python
2317 lines
89 KiB
Python
"""
|
|
All kittype and reagent related models
|
|
"""
|
|
from __future__ import annotations
|
|
import zipfile, logging, re, numpy as np
|
|
from operator import itemgetter
|
|
from pprint import pformat
|
|
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, func
|
|
from sqlalchemy.ext.hybrid import hybrid_property
|
|
from sqlalchemy.orm import relationship, validates, Query, declared_attr
|
|
from sqlalchemy.ext.associationproxy import association_proxy
|
|
from datetime import date, datetime, timedelta
|
|
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, timezone, \
|
|
jinja_template_loading, flatten_list
|
|
from typing import List, Literal, Generator, Any, Tuple, TYPE_CHECKING
|
|
from . import BaseClass, ClientLab, LogMixin
|
|
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError
|
|
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
|
|
|
|
if TYPE_CHECKING:
|
|
from backend.db.models.submissions import Run, ProcedureSampleAssociation
|
|
from backend.validators.pydant import PydSample
|
|
|
|
logger = logging.getLogger(f'submissions.{__name__}')
|
|
|
|
reagentrole_reagent = Table(
|
|
"_reagentrole_reagent",
|
|
BaseClass.__base__.metadata,
|
|
Column("reagent_id", INTEGER, ForeignKey("_reagent.id")),
|
|
Column("reagentrole_id", INTEGER, ForeignKey("_reagentrole.id")),
|
|
extend_existing=True
|
|
)
|
|
|
|
equipment_process = Table(
|
|
"_equipment_process",
|
|
BaseClass.__base__.metadata,
|
|
Column("process_id", INTEGER, ForeignKey("_process.id")),
|
|
Column("equipment_id", INTEGER, ForeignKey("_equipment.id")),
|
|
extend_existing=True
|
|
)
|
|
|
|
process_tips = Table(
|
|
"_process_tips",
|
|
BaseClass.__base__.metadata,
|
|
Column("process_id", INTEGER, ForeignKey("_process.id")),
|
|
Column("tips_id", INTEGER, ForeignKey("_tips.id")),
|
|
extend_existing=True
|
|
)
|
|
|
|
submissiontype_proceduretype = Table(
|
|
"_submissiontype_proceduretype",
|
|
BaseClass.__base__.metadata,
|
|
Column("submissiontype_id", INTEGER, ForeignKey("_submissiontype.id")),
|
|
Column("proceduretype_id", INTEGER, ForeignKey("_proceduretype.id")),
|
|
extend_existing=True
|
|
)
|
|
|
|
|
|
class ReagentRole(BaseClass):
|
|
"""
|
|
Base of reagent type abstract
|
|
"""
|
|
|
|
skip_on_edit = False
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
name = Column(String(64)) #: name of reagentrole reagent plays
|
|
reagent = relationship("Reagent", back_populates="reagentrole",
|
|
secondary=reagentrole_reagent) #: concrete control of this reagent type
|
|
reagentroleproceduretypeassociation = relationship(
|
|
"ProcedureTypeReagentRoleAssociation",
|
|
back_populates="reagentrole",
|
|
cascade="all, delete-orphan",
|
|
) #: Relation to KitTypeReagentTypeAssociation
|
|
# creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
|
|
proceduretype = association_proxy("reagentroleproceduretypeassociation", "proceduretype",
|
|
creator=lambda proceduretype: ProcedureTypeReagentRoleAssociation(
|
|
proceduretype=proceduretype)) #: Association proxy to KitTypeReagentRoleAssociation
|
|
|
|
@classmethod
|
|
def query_or_create(cls, **kwargs) -> Tuple[ReagentRole, bool]:
|
|
new = False
|
|
disallowed = ['expiry']
|
|
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
|
|
instance = cls.query(**sanitized_kwargs)
|
|
if not instance or isinstance(instance, list):
|
|
instance = cls()
|
|
new = True
|
|
for k, v in sanitized_kwargs.items():
|
|
setattr(instance, k, v)
|
|
# logger.info(f"Instance from query or create: {instance}")
|
|
return instance, new
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
name: str | None = None,
|
|
proceduretype: ProcedureType | str | None = None,
|
|
reagent: Reagent | str | None = None,
|
|
id: int | None = None,
|
|
limit: int = 0,
|
|
**kwargs
|
|
) -> ReagentRole | List[ReagentRole]:
|
|
"""
|
|
Lookup reagent types in the database.
|
|
|
|
Args:
|
|
id (id | None, optional): Id of the object. Defaults to None.
|
|
name (str | None, optional): Reagent type name. Defaults to None.
|
|
proceduretype (ProcedureType | None, optional): Procedure the type of interest belongs to. Defaults to None.
|
|
reagent (Reagent | str | None, optional): Concrete instance of the type of interest. Defaults to None.
|
|
limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Raises:
|
|
ValueError: Raised if only kittype or reagent, not both, given.
|
|
|
|
Returns:
|
|
ReagentRole|List[ReagentRole]: ReagentRole or list of ReagentRoles matching filter.
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
if (proceduretype is not None and reagent is None) or (reagent is not None and proceduretype is None):
|
|
raise ValueError("Cannot filter without both reagent and kittype type.")
|
|
elif proceduretype is None and reagent is None:
|
|
pass
|
|
else:
|
|
match proceduretype:
|
|
case str():
|
|
proceduretype = ProcedureType.query(name=proceduretype)
|
|
case _:
|
|
pass
|
|
match reagent:
|
|
case str():
|
|
reagent = Reagent.query(lot=reagent)
|
|
case _:
|
|
pass
|
|
assert reagent.role
|
|
# NOTE: Get all roles common to the reagent and the kittype.
|
|
result = set(proceduretype.reagentrole).intersection(reagent.role)
|
|
return next((item for item in result), None)
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def to_pydantic(self) -> "PydReagent":
|
|
"""
|
|
Create default PydReagent from this object
|
|
|
|
Returns:
|
|
PydReagent: PydReagent representation of this object.
|
|
"""
|
|
from backend.validators.pydant import PydReagent
|
|
return PydReagent(lot=None, reagentrole=self.name, name=self.name, expiry=date.today())
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
def to_omni(self, expand: bool = False):
|
|
from backend.validators.omni_gui_objects import OmniReagentRole
|
|
return OmniReagentRole(instance_object=self, name=self.name, eol_ext=self.eol_ext)
|
|
|
|
def get_reagents(self, proceduretype: str | ProcedureType | None = None):
|
|
if not proceduretype:
|
|
return [reagent.to_pydantic() for reagent in self.reagent]
|
|
if isinstance(proceduretype, str):
|
|
proceduretype = ProcedureType.query(name=proceduretype)
|
|
assoc = next((item for item in self.reagentroleproceduretypeassociation if item.proceduretype == proceduretype),
|
|
None)
|
|
reagents = [reagent for reagent in self.reagent]
|
|
if assoc:
|
|
last_used = Reagent.query(name=assoc.last_used)
|
|
if isinstance(last_used, list):
|
|
last_used = None
|
|
if last_used:
|
|
reagents.insert(0, reagents.pop(reagents.index(last_used)))
|
|
return [reagent.to_pydantic(reagentrole=self.name) for reagent in reagents]
|
|
|
|
# def details_dict(self, **kwargs):
|
|
# output = super().details_dict(**kwargs)
|
|
# return output
|
|
|
|
|
|
class Reagent(BaseClass, LogMixin):
|
|
"""
|
|
Concrete reagent instance
|
|
"""
|
|
|
|
skip_on_edit = False
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
reagentrole = relationship("ReagentRole", back_populates="reagent",
|
|
secondary=reagentrole_reagent) #: joined parent ReagentRole
|
|
reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id", ondelete='SET NULL',
|
|
name="fk_REG_reagent_role_id")) #: id of parent ReagentRole
|
|
eol_ext = Column(Interval()) #: extension of life interval
|
|
name = Column(String(64)) #: reagent name
|
|
cost_per_ml = Column(FLOAT(2)) #: amount a millilitre of reagent costs
|
|
reagentlot = relationship("ReagentLot", back_populates="reagent")
|
|
|
|
def __repr__(self):
|
|
if self.name:
|
|
name = f"<Reagent({self.name})>"
|
|
else:
|
|
name = f"<Reagent({self.reagentrole.name})>"
|
|
return name
|
|
|
|
def __init__(self, name: str, eol_ext: timedelta = timedelta(0), *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.name = name
|
|
self.eol_ext = eol_ext
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def searchables(cls):
|
|
return [dict(label="Lot", field="lot")]
|
|
|
|
def update_last_used(self, proceduretype: ProcedureType) -> Report:
|
|
"""
|
|
Updates last used reagent lot for ReagentType/KitType
|
|
|
|
Args:
|
|
proceduretype (ProcedureType): ProcedureType this instance is used in.
|
|
|
|
Returns:
|
|
Report: Result of operation
|
|
"""
|
|
report = Report()
|
|
rt = ReagentRole.query(proceduretype=proceduretype, reagent=self, limit=1)
|
|
if rt is not None:
|
|
assoc = ProcedureTypeReagentRoleAssociation.query(proceduretype=proceduretype, reagentrole=rt)
|
|
if assoc is not None:
|
|
if assoc.last_used != self.lot:
|
|
assoc.last_used = self.lot
|
|
result = assoc.save()
|
|
report.add_result(result)
|
|
return report
|
|
report.add_result(Result(msg=f"Updating last used {rt} was not performed.", status="Information"))
|
|
return report
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
id: int | None = None,
|
|
reagentrole: str | ReagentRole | None = None,
|
|
lot: str | None = None,
|
|
name: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs
|
|
) -> Reagent | List[Reagent]:
|
|
"""
|
|
Lookup a list of reagents from the database.
|
|
|
|
Args:
|
|
id (int | None, optional): reagent id number
|
|
reagentrole (str | models.ReagentType | None, optional): Reagent type. Defaults to None.
|
|
lot (str | None, optional): Reagent lot number. Defaults to None.
|
|
name (str | None, optional): Reagent name. Defaults to None.
|
|
limit (int, optional): limit of results returned. Defaults to 0.
|
|
|
|
Returns:
|
|
models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter.
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match reagentrole:
|
|
case str():
|
|
query = query.join(cls.reagentrole).filter(ReagentRole.name == reagentrole)
|
|
case ReagentRole():
|
|
query = query.filter(cls.reagentrole.contains(reagentrole))
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
# NOTE: Not limited due to multiple reagents having same name.
|
|
query = query.filter(cls.name == name)
|
|
case _:
|
|
pass
|
|
match lot:
|
|
case str():
|
|
query = query.filter(cls.lot == lot)
|
|
# NOTE: In this case limit number returned.
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit, **kwargs)
|
|
|
|
def set_attribute(self, key, value):
|
|
match key:
|
|
case "lot":
|
|
value = value.upper()
|
|
case "reagentrole":
|
|
match value:
|
|
case ReagentRole():
|
|
role = value
|
|
case str():
|
|
role = ReagentRole.query(name=value, limit=1)
|
|
case _:
|
|
return
|
|
if role and role not in self.reagentrole:
|
|
self.reagentrole.append(role)
|
|
return
|
|
case "comment":
|
|
return
|
|
case _:
|
|
pass
|
|
try:
|
|
self.__setattr__(key, value)
|
|
except AttributeError as e:
|
|
logger.error(f"Could not set {key} due to {e}")
|
|
|
|
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def add_edit_tooltips(self):
|
|
return dict(
|
|
expiry="Use exact date on reagent.\nEOL will be calculated from kittype automatically"
|
|
)
|
|
|
|
def details_dict(self, reagentrole: str | None = None, **kwargs):
|
|
output = super().details_dict()
|
|
if reagentrole:
|
|
output['reagentrole'] = reagentrole
|
|
else:
|
|
output['reagentrole'] = self.reagentrole[0].name
|
|
return output
|
|
|
|
@property
|
|
def lot_dicts(self):
|
|
return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.reagentlot]
|
|
|
|
|
|
class ReagentLot(BaseClass):
|
|
|
|
pyd_model_name = "Reagent"
|
|
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
lot = Column(String(64), unique=True) #: lot number of reagent
|
|
expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically
|
|
active = Column(INTEGER, default=1)
|
|
reagent_id = Column(INTEGER, ForeignKey("_reagent.id", ondelete='SET NULL',
|
|
name="fk_REGLOT_reagent_id")) #: id of parent reagent type
|
|
reagent = relationship("Reagent", back_populates="reagentlot") #: joined parent reagent type
|
|
|
|
reagentlotprocedureassociation = relationship(
|
|
"ProcedureReagentLotAssociation",
|
|
back_populates="reagentlot",
|
|
cascade="all, delete-orphan",
|
|
) #: Relation to ClientSubmissionSampleAssociation
|
|
|
|
procedures = association_proxy("reagentlotprocedureassociation", "procedure",
|
|
creator=lambda procedure: ProcedureReagentLotAssociation(
|
|
procedure=procedure)) #: Association proxy to ClientSubmissionSampleAssociation.sample
|
|
|
|
@hybrid_property
|
|
def name(self):
|
|
return self.lot
|
|
|
|
@classmethod
|
|
def query(cls,
|
|
lot: str | None = None,
|
|
name: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs) -> ReagentLot | List[ReagentLot]:
|
|
"""
|
|
|
|
Args:
|
|
lot ( str | None, optional): Lot number of this reagent instance. Defaults to None.
|
|
name ( str | None, optional): Name of this reagent instance. Defaults to None.
|
|
limit ( int ): Limit of number of query results.
|
|
**kwargs ():
|
|
|
|
Returns:
|
|
ReagentLot | List[ReagentLot]
|
|
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match lot:
|
|
case str():
|
|
query = query.filter(cls.lot == lot)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
query = query.join(Reagent).filter(Reagent.name == name)
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def __repr__(self):
|
|
return f"<Lot({self.lot}-{self.expiry}>"
|
|
|
|
def set_attribute(self, key, value):
|
|
match key:
|
|
case "expiry":
|
|
if isinstance(value, str):
|
|
value = date(year=1970, month=1, day=1)
|
|
# NOTE: if min time is used, any reagent set to expire today (Bac postive control, eg) will have expired at midnight and therefore be flagged.
|
|
# NOTE: Make expiry at date given, plus maximum time = end of day
|
|
value = datetime.combine(value, datetime.max.time())
|
|
value = value.replace(tzinfo=timezone)
|
|
case _:
|
|
pass
|
|
setattr(self, key, value)
|
|
|
|
@check_authorization
|
|
def edit_from_search(self, obj, **kwargs):
|
|
from frontend.widgets.omni_add_edit import AddEdit
|
|
from backend.validators.pydant import PydElastic
|
|
dlg = AddEdit(parent=None, instance=self, disabled=['reagent'])
|
|
if dlg.exec():
|
|
pyd = dlg.parse_form()
|
|
logger.debug(f"Pydantic returned: {type(pyd)} {pyd.model_fields}")
|
|
fields = pyd.model_fields
|
|
if isinstance(pyd, PydElastic):
|
|
fields.update(pyd.model_extra)
|
|
for field in fields:
|
|
if field in ['instance']:
|
|
continue
|
|
field_value = pyd.__getattribute__(field)
|
|
logger.debug(f"Setting {field} in Reagent Lot to {field_value}")
|
|
self.set_attribute(field, field_value)
|
|
self.save()
|
|
|
|
def details_dict(self, **kwargs) -> dict:
|
|
output = super().details_dict(**kwargs)
|
|
output['excluded'] += ["reagentlotprocedureassociation", "procedures"]
|
|
output['reagent'] = output['reagent'].name
|
|
return output
|
|
|
|
class Discount(BaseClass):
|
|
"""
|
|
Relationship table for client labs for certain kits.
|
|
"""
|
|
|
|
skip_on_edit = True
|
|
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
proceduretype = relationship("ProcedureType") #: joined parent proceduretype
|
|
proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id", ondelete='SET NULL',
|
|
name="fk_DIS_procedure_type_id")) #: id of joined proceduretype
|
|
clientlab = relationship("ClientLab") #: joined client lab
|
|
clientlab_id = Column(INTEGER,
|
|
ForeignKey("_clientlab.id", ondelete='SET NULL',
|
|
name="fk_DIS_org_id")) #: id of joined client
|
|
name = Column(String(128)) #: Short description
|
|
amount = Column(FLOAT(2)) #: Dollar amount of discount
|
|
|
|
def __repr__(self) -> str:
|
|
"""
|
|
Returns:
|
|
str: Representation of this object
|
|
"""
|
|
return f"<Discount({self.name})>"
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
clientlab: ClientLab | str | int | None = None,
|
|
proceduretype: ProcedureType | str | int | None = None,
|
|
) -> Discount | List[Discount]:
|
|
"""
|
|
Lookup discount objects (union of kittype and clientlab)
|
|
|
|
Args:
|
|
clientlab (models.ClientLab | str | int): ClientLab receiving discount.
|
|
proceduretype (models.ProcedureType | str | int): Kit discount received on.
|
|
|
|
Returns:
|
|
models.Discount|List[models.Discount]: Discount(s) of interest.
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match clientlab:
|
|
case ClientLab():
|
|
query = query.filter(cls.clientlab == clientlab)
|
|
case str():
|
|
query = query.join(ClientLab).filter(ClientLab.name == clientlab)
|
|
case int():
|
|
query = query.join(ClientLab).filter(ClientLab.id == clientlab)
|
|
case _:
|
|
pass
|
|
match proceduretype:
|
|
case ProcedureType():
|
|
query = query.filter(cls.proceduretype == proceduretype)
|
|
case str():
|
|
query = query.join(ProcedureType).filter(ProcedureType.name == proceduretype)
|
|
case int():
|
|
query = query.join(ProcedureType).filter(ProcedureType.id == proceduretype)
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query)
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
|
|
class SubmissionType(BaseClass):
|
|
"""
|
|
Abstract of types of procedure.
|
|
"""
|
|
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
name = Column(String(128), unique=True) #: name of procedure type
|
|
defaults = Column(JSON) #: Basic information about this procedure type
|
|
clientsubmission = relationship("ClientSubmission",
|
|
back_populates="submissiontype") #: Instances of this submission type
|
|
proceduretype = relationship("ProcedureType", back_populates="submissiontype",
|
|
secondary=submissiontype_proceduretype) #: Procedures associated with this submission type
|
|
|
|
def __repr__(self) -> str:
|
|
"""
|
|
Returns:
|
|
str: Representation of this object.
|
|
"""
|
|
return f"<SubmissionType({self.name})>"
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def aliases(cls) -> List[str]:
|
|
"""
|
|
Gets other names the sql object of this class might go by.
|
|
|
|
Returns:
|
|
List[str]: List of names
|
|
"""
|
|
return super().aliases + ["submissiontypes"]
|
|
|
|
# def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
|
|
# """
|
|
# Make a map of all locations for tips or equipment.
|
|
#
|
|
# Args:
|
|
# field (Literal['equipment', 'tip']): the field to construct a map for
|
|
#
|
|
# Returns:
|
|
# Generator[(str, dict), None, None]: Generator composing key, locations for each item in the map
|
|
# """
|
|
# for item in self.__getattribute__(f"submissiontype_{field}role_associations"):
|
|
# fmap = item.uses
|
|
# if fmap is None:
|
|
# fmap = {}
|
|
# yield getattr(item, f"{field}_role").name, fmap
|
|
|
|
@classmethod
|
|
def query_or_create(cls, **kwargs) -> Tuple[SubmissionType, bool]:
|
|
new = False
|
|
disallowed = ['expiry']
|
|
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
|
|
instance = cls.query(**sanitized_kwargs)
|
|
if not instance or isinstance(instance, list):
|
|
instance = cls()
|
|
new = True
|
|
for k, v in sanitized_kwargs.items():
|
|
setattr(instance, k, v)
|
|
logger.info(f"Instance from proceduretype query or create: {instance}")
|
|
return instance, new
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
name: str | None = None,
|
|
key: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs
|
|
) -> SubmissionType | List[SubmissionType]:
|
|
"""
|
|
Lookup procedure type in the database by a number of parameters
|
|
|
|
Args:
|
|
name (str | None, optional): Name of procedure type. Defaults to None.
|
|
key (str | None, optional): A key present in the info-map to lookup. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Returns:
|
|
models.SubmissionType|List[models.SubmissionType]: SubmissionType(s) of interest.
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match name:
|
|
case str():
|
|
logger.debug(f"querying with {name}")
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match key:
|
|
case str():
|
|
query = query.filter(cls.info_map.op('->')(key) is not None)
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
"""
|
|
Adds this control to the database and commits.
|
|
"""
|
|
super().save()
|
|
|
|
def to_omni(self, expand: bool = False):
|
|
from backend.validators.omni_gui_objects import OmniSubmissionType
|
|
try:
|
|
template_file = self.template_file
|
|
except AttributeError:
|
|
template_file = bytes()
|
|
return OmniSubmissionType(
|
|
instance_object=self,
|
|
name=self.name,
|
|
info_map=self.info_map,
|
|
defaults=self.defaults,
|
|
template_file=template_file,
|
|
sample_map=self.sample_map
|
|
)
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def info_map_json_edit_fields(cls):
|
|
dicto = dict()
|
|
return dicto
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def regex(cls) -> re.Pattern:
|
|
"""
|
|
Constructs catchall regex.
|
|
|
|
Returns:
|
|
re.Pattern: Regular expression pattern to discriminate between procedure types.
|
|
"""
|
|
res = [st.defaults['regex'] for st in cls.query() if st.defaults]
|
|
rstring = rf'{"|".join(res)}'
|
|
regex = re.compile(rstring, flags=re.IGNORECASE | re.VERBOSE)
|
|
return regex
|
|
|
|
@classmethod
|
|
def get_regex(cls, submission_type: SubmissionType | str | None = None) -> re.Pattern:
|
|
"""
|
|
Gets the regex string for identifying a certain class of procedure.
|
|
|
|
Args:
|
|
submission_type (SubmissionType | str | None, optional): procedure type of interest. Defaults to None.
|
|
|
|
Returns:
|
|
str: String from which regex will be compiled.
|
|
"""
|
|
if not isinstance(submission_type, SubmissionType):
|
|
submission_type = cls.query(name=submission_type['name'])
|
|
if isinstance(submission_type, list):
|
|
if len(submission_type) > 1:
|
|
regex = "|".join([item.defaults['regex'] for item in submission_type])
|
|
else:
|
|
regex = submission_type[0].defaults['regex']
|
|
else:
|
|
try:
|
|
regex = submission_type.defaults['regex']
|
|
except AttributeError as e:
|
|
logger.error(f"Couldn't get submission type for {submission_type.name}")
|
|
regex = None
|
|
try:
|
|
regex = re.compile(rf"{regex}", flags=re.IGNORECASE | re.VERBOSE)
|
|
except re.error as e:
|
|
regex = None
|
|
return regex
|
|
|
|
|
|
class ProcedureType(BaseClass):
|
|
id = Column(INTEGER, primary_key=True)
|
|
name = Column(String(64))
|
|
plate_columns = Column(INTEGER, default=0)
|
|
plate_rows = Column(INTEGER, default=0)
|
|
allowed_result_methods = Column(JSON)
|
|
plate_cost = Column(FLOAT(2))
|
|
|
|
procedure = relationship("Procedure",
|
|
back_populates="proceduretype") #: Concrete control of this type.
|
|
|
|
submissiontype = relationship("SubmissionType", back_populates="proceduretype",
|
|
secondary=submissiontype_proceduretype) #: run this kittype was used for
|
|
|
|
proceduretypeequipmentroleassociation = relationship(
|
|
"ProcedureTypeEquipmentRoleAssociation",
|
|
back_populates="proceduretype",
|
|
cascade="all, delete-orphan"
|
|
) #: Association of equipmentroles
|
|
|
|
equipmentrole = association_proxy("proceduretypeequipmentroleassociation", "equipmentrole",
|
|
creator=lambda eq: ProcedureTypeEquipmentRoleAssociation(
|
|
equipmentrole=eq)) #: Proxy of equipmentrole associations
|
|
|
|
proceduretypereagentroleassociation = relationship(
|
|
"ProcedureTypeReagentRoleAssociation",
|
|
back_populates="proceduretype",
|
|
cascade="all, delete-orphan"
|
|
) #: triple association of KitTypes, ReagentTypes, SubmissionTypes
|
|
|
|
reagentrole = association_proxy("proceduretypereagentroleassociation", "reagentrole",
|
|
creator=lambda reagentrole: ProcedureTypeReagentRoleAssociation(
|
|
reagentrole=reagentrole)) #: Proxy of equipmentrole associations
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.allowed_result_methods = dict()
|
|
|
|
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
|
|
"""
|
|
Make a map of all locations for tips or equipment.
|
|
|
|
Args:
|
|
field (Literal['equipment', 'tip']): the field to construct a map for
|
|
|
|
Returns:
|
|
Generator[(str, dict), None, None]: Generator composing key, locations for each item in the map
|
|
"""
|
|
for item in self.__getattribute__(f"proceduretype{field}role_associations"):
|
|
fmap = item.uses
|
|
if fmap is None:
|
|
fmap = {}
|
|
yield getattr(item, f"{field}_role").name, fmap
|
|
|
|
def get_equipment(self) -> Generator['PydEquipmentRole', None, None]:
|
|
"""
|
|
Returns PydEquipmentRole of all equipment associated with this SubmissionType
|
|
|
|
Returns:
|
|
Generator['PydEquipmentRole', None, None]: List of equipment roles
|
|
"""
|
|
return (item.to_pydantic(proceduretype=self) for item in self.equipment)
|
|
|
|
def get_processes_for_role(self, equipmentrole: str | EquipmentRole) -> list:
|
|
"""
|
|
Get process associated with this SubmissionType for an EquipmentRole
|
|
|
|
Args:
|
|
equipmentrole (str | EquipmentRole): EquipmentRole of interest
|
|
kittype (str | KitType | None, optional): Kit of interest. Defaults to None.
|
|
|
|
Raises:
|
|
TypeError: Raised if wrong type given for equipmentrole
|
|
|
|
Returns:
|
|
list: list of associated process
|
|
"""
|
|
match equipmentrole:
|
|
case str():
|
|
relevant = [item.get_all_processes() for item in self.proceduretypeequipmentroleassociation if
|
|
item.equipmentrole.name == equipmentrole]
|
|
case EquipmentRole():
|
|
relevant = [item.get_all_processes() for item in self.proceduretypeequipmentroleassociation if
|
|
item.equipmentrole == equipmentrole]
|
|
case _:
|
|
raise TypeError(f"Type {type(equipmentrole)} is not allowed")
|
|
return list(set([item for items in relevant for item in items if item is not None]))
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict(**kwargs)
|
|
output['reagentrole'] = [item.details_dict() for item in output['reagentrole']]
|
|
output['equipment'] = [item.details_dict(proceduretype=self) for item in output['equipmentrole']]
|
|
return output
|
|
|
|
def construct_dummy_procedure(self, run: Run | None = None):
|
|
from backend.validators.pydant import PydProcedure
|
|
if run:
|
|
samples = run.constuct_sample_dicts_for_proceduretype(proceduretype=self)
|
|
else:
|
|
samples = []
|
|
output = dict(
|
|
proceduretype=self,
|
|
repeat=False,
|
|
run=run,
|
|
sample=samples
|
|
)
|
|
return PydProcedure(**output)
|
|
|
|
def construct_plate_map(self, sample_dicts: List["PydSample"], creation:bool=True, vw_modifier:float=1.0) -> str:
|
|
"""
|
|
Constructs an html based plate map for procedure details.
|
|
|
|
Args:
|
|
sample_list (list): List of procedure sample
|
|
plate_rows (int, optional): Number of rows in the plate. Defaults to 8.
|
|
plate_columns (int, optional): Number of columns in the plate. Defaults to 12.
|
|
|
|
Returns:
|
|
str: html output string.
|
|
"""
|
|
if self.plate_rows == 0 or self.plate_columns == 0:
|
|
return "<br/>"
|
|
sample_dicts = self.pad_sample_dicts(sample_dicts=sample_dicts)
|
|
vw = round((-0.07 * len(sample_dicts)) + (12.2 * vw_modifier), 1)
|
|
# NOTE: An overly complicated list comprehension create a list of sample locations
|
|
# NOTE: next will return a blank cell if no value found for row/column
|
|
env = jinja_template_loading()
|
|
template = env.get_template("support/plate_map.html")
|
|
html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=sample_dicts,
|
|
vw=vw, creation=creation)
|
|
return html + "<br/>"
|
|
|
|
def pad_sample_dicts(self, sample_dicts: List["PydSample"]):
|
|
from backend.validators.pydant import PydSample
|
|
output = []
|
|
for row, column in self.ranked_plate.values():
|
|
sample = next((sample for sample in sample_dicts if sample.row == row and sample.column == column),
|
|
PydSample(**dict(sample_id="", row=row, column=column, enabled=False)))
|
|
sample.background_color = "#6ffe1d" if sample.enabled else "#ffffff"
|
|
output.append(sample)
|
|
return output
|
|
|
|
@property
|
|
def ranked_plate(self):
|
|
matrix = np.array([[0 for yyy in range(1, self.plate_rows + 1)] for xxx in range(1, self.plate_columns + 1)])
|
|
return {iii: (item[0][1] + 1, item[0][0] + 1) for iii, item in enumerate(np.ndenumerate(matrix), start=1)}
|
|
|
|
@property
|
|
def total_wells(self):
|
|
return self.plate_rows * self.plate_columns
|
|
|
|
|
|
class Procedure(BaseClass):
|
|
id = Column(INTEGER, primary_key=True) #: Primary key
|
|
name = Column(String, unique=True) #: Name of the procedure (RSL number)
|
|
repeat_of_id = Column(INTEGER, ForeignKey("_procedure.id", name="fk_repeat_id"))
|
|
repeat_of = relationship("Procedure", remote_side=[id])
|
|
started_date = Column(TIMESTAMP)
|
|
completed_date = Column(TIMESTAMP)
|
|
technician = Column(String(64)) #: name of processing tech(s)
|
|
results = relationship("Results", back_populates="procedure", uselist=True)
|
|
proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id", ondelete="SET NULL",
|
|
name="fk_PRO_proceduretype_id")) #: client lab id from _organizations))
|
|
proceduretype = relationship("ProcedureType", back_populates="procedure")
|
|
run_id = Column(INTEGER, ForeignKey("_run.id", ondelete="SET NULL",
|
|
name="fk_PRO_basicrun_id")) #: client lab id from _organizations))
|
|
run = relationship("Run", back_populates="procedure")
|
|
control = relationship("Control", back_populates="procedure", uselist=True) #: A control sample added to procedure
|
|
|
|
proceduresampleassociation = relationship(
|
|
"ProcedureSampleAssociation",
|
|
back_populates="procedure",
|
|
cascade="all, delete-orphan",
|
|
)
|
|
|
|
sample = association_proxy("proceduresampleassociation",
|
|
"sample", creator=lambda sample: ProcedureSampleAssociation(sample=sample)
|
|
)
|
|
|
|
procedurereagentlotassociation = relationship(
|
|
"ProcedureReagentLotAssociation",
|
|
back_populates="procedure",
|
|
cascade="all, delete-orphan",
|
|
) #: Relation to ProcedureReagentAssociation
|
|
|
|
reagentlot = association_proxy("procedurereagentlotassociation",
|
|
"reagentlot", creator=lambda reg: ProcedureReagentLotAssociation(
|
|
reagent=reg)) #: Association proxy to RunReagentAssociation.reagent
|
|
|
|
procedureequipmentassociation = relationship(
|
|
"ProcedureEquipmentAssociation",
|
|
back_populates="procedure",
|
|
cascade="all, delete-orphan"
|
|
) #: Relation to Equipment
|
|
|
|
equipment = association_proxy("procedureequipmentassociation",
|
|
"equipment") #: Association proxy to RunEquipmentAssociation.equipment
|
|
|
|
@hybrid_property
|
|
def repeat(self) -> bool:
|
|
return self.repeat_of is not None
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls, id: int | None = None, name: str | None = None,
|
|
start_date: date | datetime | str | int | None = None,
|
|
end_date: date | datetime | str | int | None = None, limit: int = 0, **kwargs) -> Procedure | List[
|
|
Procedure]:
|
|
query: Query = cls.__database_session__.query(cls)
|
|
if start_date is not None and end_date is None:
|
|
logger.warning(f"Start date with no end date, using today.")
|
|
end_date = date.today()
|
|
if end_date is not None and start_date is None:
|
|
# NOTE: this query returns a tuple of (object, datetime), need to get only datetime.
|
|
start_date = cls.__database_session__.query(cls, func.min(cls.submitted_date)).first()[1]
|
|
logger.warning(f"End date with no start date, using first procedure date: {start_date}")
|
|
if start_date is not None:
|
|
start_date = cls.rectify_query_date(start_date)
|
|
end_date = cls.rectify_query_date(end_date, eod=True)
|
|
query = query.filter(cls.started_date.between(start_date, end_date))
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
@property
|
|
def custom_context_events(self) -> dict:
|
|
"""
|
|
Creates dictionary of str:function to be passed to context menu
|
|
|
|
Returns:
|
|
dict: dictionary of functions
|
|
"""
|
|
names = ["Add Results", "Add Equipment", "Edit", "Add Comment", "Show Details", "Delete"]
|
|
return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names}
|
|
|
|
def add_results(self, obj, resultstype_name: str):
|
|
logger.info(f"Add Results! {resultstype_name}")
|
|
from backend.managers import results
|
|
results_manager = getattr(results, f"{resultstype_name}Manager")
|
|
rs = results_manager(procedure=self, parent=obj)
|
|
procedure = rs.procedure_to_pydantic()
|
|
samples = rs.samples_to_pydantic()
|
|
procedure_sql = procedure.to_sql()
|
|
procedure_sql.save()
|
|
for sample in samples:
|
|
sample_sql = sample.to_sql()
|
|
sample_sql.save()
|
|
|
|
def add_equipment(self, obj):
|
|
"""
|
|
Creates widget for adding equipment to this submission
|
|
|
|
Args:
|
|
obj (_type_): parent widget
|
|
"""
|
|
logger.info(f"Add equipment")
|
|
from frontend.widgets.equipment_usage import EquipmentUsage
|
|
dlg = EquipmentUsage(parent=obj, procedure=self.to_pydantic())
|
|
if dlg.exec():
|
|
dlg.save_procedure()
|
|
|
|
def edit(self, obj):
|
|
from frontend.widgets.procedure_creation import ProcedureCreation
|
|
logger.debug("Edit!")
|
|
dlg = ProcedureCreation(parent=obj, procedure=self.to_pydantic(), edit=True)
|
|
if dlg.exec():
|
|
sql, _ = dlg.return_sql()
|
|
sql.save()
|
|
|
|
def add_comment(self, obj):
|
|
logger.debug("Add Comment!")
|
|
|
|
def delete(self, obj):
|
|
logger.debug("Delete!")
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict()
|
|
output['proceduretype'] = output['proceduretype'].details_dict()['name']
|
|
output['results'] = [result.details_dict() for result in output['results']]
|
|
run_samples = [sample for sample in self.run.sample]
|
|
active_samples = [sample.details_dict() for sample in output['proceduresampleassociation']
|
|
if sample.sample.sample_id in [s.sample_id for s in run_samples]]
|
|
for sample in active_samples:
|
|
sample['active'] = True
|
|
inactive_samples = [sample.details_dict() for sample in run_samples if
|
|
sample.name not in [s['sample_id'] for s in active_samples]]
|
|
for sample in inactive_samples:
|
|
sample['active'] = False
|
|
output['sample'] = active_samples + inactive_samples
|
|
output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']]
|
|
output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']]
|
|
logger.debug(f"equipment: {pformat([item for item in output['equipment']])}")
|
|
output['repeat'] = self.repeat
|
|
output['run'] = self.run.name
|
|
output['excluded'] += self.get_default_info("details_ignore")
|
|
output['sample_count'] = len(active_samples)
|
|
output['clientlab'] = self.run.clientsubmission.clientlab.name
|
|
output['cost'] = 0.00
|
|
output['platemap'] = self.make_procedure_platemap()
|
|
return output
|
|
|
|
def to_pydantic(self, **kwargs):
|
|
from backend.validators.pydant import PydReagent
|
|
output = super().to_pydantic()
|
|
output.sample = [item.to_pydantic() for item in output.proceduresampleassociation]
|
|
reagents = []
|
|
for reagent in output.reagent:
|
|
match reagent:
|
|
case dict():
|
|
reagents.append(PydReagent(**reagent))
|
|
case PydReagent():
|
|
reagents.append(reagent)
|
|
case _:
|
|
pass
|
|
output.reagent = reagents
|
|
output.result = [item.to_pydantic() for item in self.results]
|
|
output.sample_results = flatten_list(
|
|
[[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation])
|
|
|
|
return output
|
|
|
|
def create_proceduresampleassociations(self, sample):
|
|
from backend.db.models import ProcedureSampleAssociation
|
|
return ProcedureSampleAssociation(procedure=self, sample=sample)
|
|
|
|
@classmethod
|
|
def get_default_info(cls, *args) -> dict | list | str:
|
|
dicto = super().get_default_info()
|
|
recover = ['filepath', 'sample', 'csv', 'comment', 'equipment']
|
|
dicto.update(dict(
|
|
details_ignore=['excluded', 'reagents', 'sample', 'extraction_info', 'comment', 'barcode',
|
|
'platemap', 'export_map', 'equipment', 'tips', 'custom', 'reagentlot', 'reagent_lot',
|
|
"results", "proceduresampleassociation", "sample",
|
|
"procedurereagentlotassociation",
|
|
"procedureequipmentassociation", "proceduretipsassociation", "reagent", "equipment",
|
|
"tips", "control"],
|
|
# NOTE: Fields not placed in ui form
|
|
form_ignore=['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by', 'comment', 'namer',
|
|
'submission_object', "tips", 'contact_phone', 'custom', 'cost_centre', 'completed_date',
|
|
'control', "origin_plate"] + recover,
|
|
# NOTE: Fields not placed in ui form to be moved to pydantic
|
|
form_recover=recover
|
|
))
|
|
if args:
|
|
if len(args) > 1:
|
|
output = {k: v for k, v in dicto.items() if k in args}
|
|
else:
|
|
output = dicto[args[0]]
|
|
else:
|
|
output = {k: v for k, v in dicto.items()}
|
|
return output
|
|
|
|
def make_procedure_platemap(self):
|
|
dicto = [sample.to_pydantic() for sample in self.proceduresampleassociation]
|
|
html = self.proceduretype.construct_plate_map(sample_dicts=dicto, creation=False, vw_modifier=1.15)
|
|
return html
|
|
|
|
|
|
|
|
class ProcedureTypeReagentRoleAssociation(BaseClass):
|
|
"""
|
|
table containing reagenttype/kittype associations
|
|
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
|
|
"""
|
|
|
|
reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id"),
|
|
primary_key=True) #: id of associated reagentrole
|
|
proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"),
|
|
primary_key=True) #: id of associated proceduretype
|
|
uses = Column(JSON) #: map to location on excel sheets of different procedure types
|
|
required = Column(INTEGER) #: whether the reagent type is required for the kittype (Boolean 1 or 0)
|
|
last_used = Column(String(32)) #: last used lot number of this type of reagent
|
|
ml_used_per_sample = Column(FLOAT(2)) #: amount of reagent used in the procedure
|
|
|
|
# NOTE: reference to the "ReagentType" object
|
|
reagentrole = relationship(ReagentRole,
|
|
back_populates="reagentroleproceduretypeassociation") #: relationship to associated ReagentType
|
|
|
|
# NOTE: reference to the "SubmissionType" object
|
|
proceduretype = relationship(ProcedureType,
|
|
back_populates="proceduretypereagentroleassociation") #: relationship to associated SubmissionType
|
|
|
|
def __init__(self, proceduretype=None, reagentrole=None, uses=None, required=1):
|
|
self.proceduretype = proceduretype
|
|
self.reagentrole = reagentrole
|
|
self.uses = uses
|
|
self.required = required
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<ProcedureTypeReagentRoleAssociation({self.proceduretype} & {self.reagentrole})>"
|
|
|
|
@property
|
|
def name(self):
|
|
try:
|
|
return f"{self.proceduretype.name} -> {self.reagentrole.name}"
|
|
except AttributeError:
|
|
return "Blank ProcedureTypeReagentRole"
|
|
|
|
@validates('required')
|
|
def validate_required(self, key, value):
|
|
"""
|
|
Ensures only 1 & 0 used in 'required'
|
|
|
|
Args:
|
|
key (str): name of attribute
|
|
value (_type_): value of attribute
|
|
|
|
Raises:
|
|
ValueError: Raised if bad value given
|
|
|
|
Returns:
|
|
_type_: value
|
|
"""
|
|
if isinstance(value, bool):
|
|
value = int(value)
|
|
if not 0 <= value < 2:
|
|
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
|
|
return value
|
|
|
|
@validates('reagentrole')
|
|
def validate_reagentrole(self, key, value):
|
|
"""
|
|
Ensures reagenttype is an actual ReagentType
|
|
|
|
Args:
|
|
key (str)): name of attribute
|
|
value (_type_): value of attribute
|
|
|
|
Raises:
|
|
ValueError: raised if reagenttype is not a ReagentType
|
|
|
|
Returns:
|
|
_type_: ReagentType
|
|
"""
|
|
if not isinstance(value, ReagentRole):
|
|
raise ValueError(f'{value} is not a reagentrole')
|
|
return value
|
|
|
|
@classmethod
|
|
def query_or_create(cls, **kwargs) -> Tuple[ProcedureTypeReagentRoleAssociation, bool]:
|
|
new = False
|
|
disallowed = ['expiry']
|
|
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
|
|
instance = cls.query(**sanitized_kwargs)
|
|
if not instance or isinstance(instance, list):
|
|
instance = cls()
|
|
new = True
|
|
for k, v in sanitized_kwargs.items():
|
|
logger.debug(f"Key: {k} has value: {v}")
|
|
match k:
|
|
case "proceduretype":
|
|
if isinstance(v, str):
|
|
v = SubmissionType.query(name=v)
|
|
else:
|
|
v = v.instance_object
|
|
case "reagentrole":
|
|
if isinstance(v, str):
|
|
v = ReagentRole.query(name=v)
|
|
else:
|
|
v = v.instance_object
|
|
case _:
|
|
pass
|
|
setattr(instance, k, v)
|
|
# logger.info(f"Instance from query or create: {instance.__dict__}\nis new: {new}")
|
|
return instance, new
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
reagentrole: ReagentRole | str | None = None,
|
|
proceduretype: ProcedureType | str | None = None,
|
|
limit: int = 0,
|
|
**kwargs
|
|
) -> ProcedureTypeReagentRoleAssociation | List[ProcedureTypeReagentRoleAssociation]:
|
|
"""
|
|
Lookup junction of ReagentType and KitType
|
|
|
|
Args:
|
|
proceduretype (models.ProcedureType | str | None, optional): KitType of interest. Defaults to None.
|
|
reagentrole (models.ReagentRole | str | None, optional): ReagentRole of interest. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Returns:
|
|
models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest.
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match reagentrole:
|
|
case ReagentRole():
|
|
query = query.filter(cls.reagent_role == reagentrole)
|
|
case str():
|
|
query = query.join(ReagentRole).filter(ReagentRole.name == reagentrole)
|
|
case _:
|
|
pass
|
|
match proceduretype:
|
|
case ProcedureType():
|
|
query = query.filter(cls.proceduretype == proceduretype)
|
|
case str():
|
|
query = query.join(ProcedureType).filter(ProcedureType.name == proceduretype)
|
|
case _:
|
|
pass
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def get_all_relevant_reagents(self) -> Generator[Reagent, None, None]:
|
|
"""
|
|
Creates a generator that will resolve in to a list filling the reagentrole associated with this object.
|
|
|
|
Returns:
|
|
Generator: Generates of reagents.
|
|
"""
|
|
reagents = self.reagentrole.control
|
|
try:
|
|
regex = self.uses['exclude_regex']
|
|
except KeyError:
|
|
regex = "^$"
|
|
relevant_reagents = [reagent for reagent in reagents if
|
|
not check_regex_match(pattern=regex, check=str(reagent.lot))]
|
|
for rel_reagent in relevant_reagents:
|
|
yield rel_reagent
|
|
|
|
@property
|
|
def omnigui_instance_dict(self) -> dict:
|
|
dicto = super().omnigui_instance_dict
|
|
dicto['required']['instance_attr'] = bool(dicto['required']['instance_attr'])
|
|
return dicto
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def json_edit_fields(cls) -> dict:
|
|
dicto = dict(
|
|
sheet="str",
|
|
expiry=dict(column="int", row="int"),
|
|
lot=dict(column="int", row="int"),
|
|
name=dict(column="int", row="int")
|
|
)
|
|
return dicto
|
|
|
|
def to_omni(self, expand: bool = False) -> "OmniReagentRole":
|
|
from backend.validators.omni_gui_objects import OmniKitTypeReagentRoleAssociation
|
|
try:
|
|
eol_ext = self.reagentrole.eol_ext
|
|
except AttributeError:
|
|
eol_ext = timedelta(days=0)
|
|
if expand:
|
|
try:
|
|
submission_type = self.proceduretype.to_omni()
|
|
except AttributeError:
|
|
submission_type = ""
|
|
try:
|
|
kit_type = self.kittype.to_omni()
|
|
except AttributeError:
|
|
kit_type = ""
|
|
try:
|
|
reagent_role = self.reagentrole.to_omni()
|
|
except AttributeError:
|
|
reagent_role = ""
|
|
else:
|
|
submission_type = self.proceduretype.name
|
|
kit_type = self.kittype.name
|
|
reagent_role = self.reagentrole.name
|
|
return OmniKitTypeReagentRoleAssociation(
|
|
instance_object=self,
|
|
reagent_role=reagent_role,
|
|
eol_ext=eol_ext,
|
|
required=self.required,
|
|
submission_type=submission_type,
|
|
kit_type=kit_type,
|
|
uses=self.uses
|
|
)
|
|
|
|
|
|
class ProcedureReagentLotAssociation(BaseClass):
|
|
"""
|
|
table containing procedure/reagent associations
|
|
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
|
|
"""
|
|
|
|
skip_on_edit = True
|
|
|
|
reagentlot_id = Column(INTEGER, ForeignKey("_reagentlot.id"), primary_key=True) #: id of associated reagent
|
|
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
|
|
reagentrole = Column(String(64)) #: Name of associated reagentrole (for some reason can't be relationship).
|
|
comments = Column(String(1024)) #: Comments about reagents
|
|
|
|
procedure = relationship("Procedure",
|
|
back_populates="procedurereagentlotassociation") #: associated procedure
|
|
|
|
reagentlot = relationship(ReagentLot, back_populates="reagentlotprocedureassociation") #: associated reagent
|
|
|
|
def __repr__(self) -> str:
|
|
"""
|
|
Returns:
|
|
str: Representation of this RunReagentAssociation
|
|
"""
|
|
try:
|
|
return f"<ProcedureReagentLotAssociation({self.procedure.name} & {self.reagent.lot})>"
|
|
except AttributeError:
|
|
try:
|
|
logger.error(f"Reagent {self.reagent.lot} procedure association {self.reagent_id} has no procedure!")
|
|
except AttributeError:
|
|
return "<ProcedureReagentAssociation(Unknown Submission & Unknown Reagent)>"
|
|
return f"<ProcedureReagentAssociation(Unknown Submission & {self.reagent.lot})>"
|
|
|
|
def __init__(self, reagentlot=None, procedure=None, reagentrole=""):
|
|
if isinstance(reagentlot, list):
|
|
logger.warning(f"Got list for reagent. Likely no lot was provided. Using {reagentlot[0]}")
|
|
reagentlot = reagentlot[0]
|
|
self.reagentlot = reagentlot
|
|
self.procedure = procedure
|
|
self.reagentrole = reagentrole
|
|
self.comments = ""
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
procedure: Procedure | str | int | None = None,
|
|
reagentlot: Reagent | str | None = None,
|
|
reagentrole: str | None = None,
|
|
limit: int = 0) -> ProcedureReagentLotAssociation | List[ProcedureReagentLotAssociation]:
|
|
"""
|
|
Lookup SubmissionReagentAssociations of interest.
|
|
|
|
Args:
|
|
procedure (Procedure | str | int | None, optional): Identifier of joined procedure. Defaults to None.
|
|
reagentlot (ReagentLot | str | None, optional): Identifier of joined reagent. Defaults to None.
|
|
reagent (Reagent | str | None, optional): Identifier of joined reagent. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Returns:
|
|
RunReagentAssociation|List[RunReagentAssociation]: SubmissionReagentAssociation(s) of interest
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match reagentlot:
|
|
case ReagentLot() | str():
|
|
if isinstance(reagentlot, str):
|
|
reagentlot = ReagentLot.query(lot=reagentlot)
|
|
query = query.filter(cls.reagentlot == reagentlot)
|
|
case _:
|
|
pass
|
|
match procedure:
|
|
case Procedure() | str():
|
|
if isinstance(procedure, str):
|
|
procedure = Procedure.query(name=procedure)
|
|
query = query.filter(cls.procedure == procedure)
|
|
case int():
|
|
query = query.join(Procedure).filter(Procedure.id == procedure)
|
|
case _:
|
|
pass
|
|
if reagentrole:
|
|
query = query.filter(cls.reagentrole == reagentrole)
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def to_pydantic(self):
|
|
from backend.validators import PydReagent
|
|
return PydReagent(**self.details_dict())
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict()
|
|
# NOTE: Figure out how to merge the misc_info if doing .update instead.
|
|
relevant = {k: v for k, v in output.items() if k not in ['reagent']}
|
|
output = output['reagentlot'].details_dict()
|
|
output['reagent_name'] = self.reagentlot.reagent.name
|
|
misc = output['misc_info']
|
|
output.update(relevant)
|
|
output['reagentrole'] = self.reagentrole
|
|
output['misc_info'] = misc
|
|
# logger.debug(f"Output: {pformat(output)}")
|
|
return output
|
|
|
|
def delete(self, **kwargs):
|
|
self.__database_session__.delete(self)
|
|
try:
|
|
self.__database_session__.commit()
|
|
except (SQLIntegrityError, SQLOperationalError, AlcIntegrityError, AlcOperationalError) as e:
|
|
self.__database_session__.rollback()
|
|
raise e
|
|
|
|
|
|
class EquipmentRole(BaseClass):
|
|
"""
|
|
Abstract roles for equipment
|
|
"""
|
|
|
|
id = Column(INTEGER, primary_key=True) #: Role id, primary key
|
|
name = Column(String(32)) #: Common name
|
|
|
|
equipmentroleproceduretypeassociation = relationship(
|
|
"ProcedureTypeEquipmentRoleAssociation",
|
|
back_populates="equipmentrole",
|
|
cascade="all, delete-orphan",
|
|
) #: relation to SubmissionTypes
|
|
|
|
proceduretype = association_proxy("equipmentroleproceduretypeassociation",
|
|
"proceduretype") #: proxy to equipmentroleproceduretypeassociation.proceduretype
|
|
|
|
equipmentroleequipmentassociation = relationship(
|
|
"EquipmentRoleEquipmentAssociation",
|
|
back_populates="equipmentrole",
|
|
cascade="all, delete-orphan",
|
|
)
|
|
|
|
equipment = association_proxy("equipmentroleequipmentassociation",
|
|
"equipmentrole", creator=lambda equipment: EquipmentRoleEquipmentAssociation(
|
|
equipment=equipment))
|
|
|
|
def to_dict(self) -> dict:
|
|
"""
|
|
This EquipmentRole as a dictionary
|
|
|
|
Returns:
|
|
dict: This EquipmentRole dict
|
|
"""
|
|
return {key: value for key, value in self.__dict__.items() if key != "process" and key != "equipment"}
|
|
|
|
def to_pydantic(self, proceduretype: ProcedureType) -> "PydEquipmentRole":
|
|
"""
|
|
Creates a PydEquipmentRole of this EquipmentRole
|
|
|
|
Args:
|
|
proceduretype (SubmissionType): SubmissionType of interest
|
|
kittype (str | KitType | None, optional): KitType of interest. Defaults to None.
|
|
|
|
Returns:
|
|
PydEquipmentRole: This EquipmentRole as PydEquipmentRole
|
|
"""
|
|
from backend.validators.pydant import PydEquipmentRole
|
|
equipment = [item.to_pydantic(proceduretype=proceduretype, equipmentrole=self) for item in
|
|
self.equipment]
|
|
pyd_dict = self.to_dict()
|
|
pyd_dict['process'] = self.get_processes(proceduretype=proceduretype)
|
|
return PydEquipmentRole(equipment=equipment, **pyd_dict)
|
|
|
|
@classmethod
|
|
def query_or_create(cls, **kwargs) -> Tuple[EquipmentRole, bool]:
|
|
new = False
|
|
disallowed = ['expiry']
|
|
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
|
|
instance = cls.query(**sanitized_kwargs)
|
|
if not instance or isinstance(instance, list):
|
|
instance = cls()
|
|
new = True
|
|
for k, v in sanitized_kwargs.items():
|
|
setattr(instance, k, v)
|
|
# logger.info(f"Instance from query or create: {instance}")
|
|
return instance, new
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
name: str | None = None,
|
|
id: int | None = None,
|
|
limit: int = 0,
|
|
**kwargs
|
|
) -> EquipmentRole | List[EquipmentRole]:
|
|
"""
|
|
Lookup Equipment roles.
|
|
|
|
Args:
|
|
name (str | None, optional): EquipmentRole name. Defaults to None.
|
|
id (int | None, optional): EquipmentRole id. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Returns:
|
|
EquipmentRole|List[EquipmentRole]: List of EquipmentRoles matching criteria
|
|
"""
|
|
query = cls.__database_session__.query(cls)
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def get_processes(self, proceduretype: str | ProcedureType | None) -> Generator[Process, None, None]:
|
|
"""
|
|
Get process used by this EquipmentRole
|
|
|
|
Args:
|
|
proceduretype (str | SubmissionType | None): SubmissionType of interest
|
|
kittype (str | KitType | None, optional): KitType of interest. Defaults to None.
|
|
|
|
Returns:
|
|
List[Process]: List of process
|
|
"""
|
|
if isinstance(proceduretype, str):
|
|
proceduretype = SubmissionType.query(name=proceduretype)
|
|
for process in self.process:
|
|
if proceduretype and proceduretype not in process.proceduretype:
|
|
continue
|
|
yield process.name
|
|
|
|
def to_omni(self, expand: bool = False) -> "OmniEquipmentRole":
|
|
from backend.validators.omni_gui_objects import OmniEquipmentRole
|
|
return OmniEquipmentRole(instance_object=self, name=self.name)
|
|
|
|
def details_dict(self, **kwargs):
|
|
if "proceduretype" in kwargs:
|
|
proceduretype = kwargs['proceduretype']
|
|
else:
|
|
proceduretype = None
|
|
match proceduretype:
|
|
case ProcedureType():
|
|
pass
|
|
case str():
|
|
proceduretype = ProcedureType.query(name=proceduretype, limit=1)
|
|
case _:
|
|
proceduretype = None
|
|
output = super().details_dict(**kwargs)
|
|
output['equipment'] = [item.details_dict()['equipment'] for item in self.equipmentroleequipmentassociation]
|
|
equip = []
|
|
for eq in output['equipment']:
|
|
dicto = dict(name=eq['name'], asset_number=eq['asset_number'])
|
|
dicto['process'] = [
|
|
{'name': process['name'], 'tips': process['tips']}
|
|
for process in eq['process']
|
|
]
|
|
for process in dicto['process']:
|
|
# try:
|
|
process['tips'] = [tr['name'] for tr in process['tips']]
|
|
# except KeyError:
|
|
# raise KeyError("Problem ")
|
|
equip.append(dicto)
|
|
output['equipment'] = equip
|
|
return output
|
|
|
|
|
|
class Equipment(BaseClass, LogMixin):
|
|
"""
|
|
A concrete instance of equipment
|
|
"""
|
|
|
|
id = Column(INTEGER, primary_key=True) #: id, primary key
|
|
name = Column(String(64)) #: equipment name
|
|
nickname = Column(String(64)) #: equipment nickname
|
|
asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will)
|
|
|
|
equipmentprocedureassociation = relationship(
|
|
"ProcedureEquipmentAssociation",
|
|
back_populates="equipment",
|
|
cascade="all, delete-orphan",
|
|
) #: Association with BasicRun
|
|
|
|
procedure = association_proxy("equipmentprocedureassociation",
|
|
"procedure") #: proxy to equipmentprocedureassociation.procedure
|
|
|
|
equipmentequipmentroleassociation = relationship(
|
|
"EquipmentRoleEquipmentAssociation",
|
|
back_populates="equipment",
|
|
cascade="all, delete-orphan",
|
|
)
|
|
|
|
equipmentrole = association_proxy("equipmentequipmentroleassociation",
|
|
"equipmentrole", creator=lambda equipmentrole: EquipmentRoleEquipmentAssociation(
|
|
equipmentrole=equipmentrole)
|
|
)
|
|
|
|
def __init__(self, name: str, nickname: str | None = None, asset_number: str = ""):
|
|
self.name = name
|
|
if nickname:
|
|
self.nickname = nickname
|
|
else:
|
|
self.nickname = self.name
|
|
self.asset_number = asset_number
|
|
|
|
def to_dict(self, processes: bool = False) -> dict:
|
|
"""
|
|
This Equipment as a dictionary
|
|
|
|
Args:
|
|
processes (bool, optional): Whether to include process. Defaults to False.
|
|
|
|
Returns:
|
|
dict: Dictionary representation of this equipment
|
|
"""
|
|
if not processes:
|
|
return {k: v for k, v in self.__dict__.items() if k != 'process'}
|
|
else:
|
|
return {k: v for k, v in self.__dict__.items()}
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
id: int | None = None,
|
|
name: str | None = None,
|
|
nickname: str | None = None,
|
|
asset_number: str | None = None,
|
|
limit: int = 0
|
|
) -> Equipment | List[Equipment]:
|
|
"""
|
|
Lookup a list of or single Equipment.
|
|
|
|
Args:
|
|
name (str | None, optional): Equipment name. Defaults to None.
|
|
nickname (str | None, optional): Equipment nickname. Defaults to None.
|
|
asset_number (str | None, optional): Equipment asset number. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
|
|
|
Returns:
|
|
Equipment|List[Equipment]: Equipment or list of equipment matching query parameters.
|
|
"""
|
|
query = cls.__database_session__.query(cls)
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match nickname:
|
|
case str():
|
|
query = query.filter(cls.nickname == nickname)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match asset_number:
|
|
case str():
|
|
query = query.filter(cls.asset_number == asset_number)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
def to_pydantic(self, equipmentrole: str = None) -> "PydEquipment":
|
|
"""
|
|
Creates PydEquipment of this Equipment
|
|
|
|
Args:
|
|
proceduretype (ProcedureType): Relevant SubmissionType
|
|
kittype (str | KitType | None, optional): Relevant KitType. Defaults to None.
|
|
|
|
Returns:
|
|
PydEquipment: pydantic equipment object
|
|
"""
|
|
from backend.validators.pydant import PydEquipment
|
|
creation_dict = self.details_dict()
|
|
processes = self.get_processes(equipmentrole=equipmentrole)
|
|
creation_dict['processes'] = processes
|
|
creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole']
|
|
return PydEquipment(**creation_dict)
|
|
|
|
@classmethod
|
|
@declared_attr
|
|
def manufacturer_regex(cls) -> re.Pattern:
|
|
"""
|
|
Creates regex to determine tip manufacturer
|
|
|
|
Returns:
|
|
re.Pattern: regex
|
|
"""
|
|
return re.compile(r"""
|
|
(?P<PHAC>50\d{5}$)|
|
|
(?P<HC>HC-\d{6}$)|
|
|
(?P<Beckman>[^\d][A-Z0-9]{6}$)|
|
|
(?P<Axygen>[A-Z]{3}-\d{2}-[A-Z]-[A-Z]$)|
|
|
(?P<Labcon>\d{4}-\d{3}-\d{3}-\d$)""",
|
|
re.VERBOSE)
|
|
|
|
@classmethod
|
|
def assign_equipment(cls, equipmentrole: EquipmentRole | str) -> List[Equipment]:
|
|
"""
|
|
Creates a list of equipment from user input to be used in Submission Type creation
|
|
|
|
Args:
|
|
equipmentrole (EquipmentRole): Equipment reagentrole to be added to.
|
|
|
|
Returns:
|
|
List[Equipment]: User selected equipment.
|
|
"""
|
|
if isinstance(equipmentrole, str):
|
|
equipmentrole = EquipmentRole.query(name=equipmentrole)
|
|
equipment = cls.query()
|
|
options = "\n".join([f"{ii}. {item.name}" for ii, item in enumerate(equipment)])
|
|
choices = input(f"Enter equipment numbers to add to {equipmentrole.name} (space separated):\n{options}\n\n")
|
|
output = []
|
|
for choice in choices.split(" "):
|
|
try:
|
|
choice = int(choice)
|
|
except (AttributeError, ValueError):
|
|
continue
|
|
output.append(equipment[choice])
|
|
return output
|
|
|
|
def get_processes(self, equipmentrole: str):
|
|
output = []
|
|
for assoc in self.equipmentequipmentroleassociation:
|
|
if assoc.equipmentrole.name != equipmentrole:
|
|
continue
|
|
output.append(assoc.process.to_pydantic())
|
|
return output
|
|
|
|
|
|
class EquipmentRoleEquipmentAssociation(BaseClass):
|
|
equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated reagent
|
|
equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated procedure
|
|
process_id = Column(INTEGER, ForeignKey("_process.id"))
|
|
|
|
equipmentrole = relationship("EquipmentRole",
|
|
back_populates="equipmentroleequipmentassociation") #: associated procedure
|
|
|
|
equipment = relationship("Equipment",
|
|
back_populates="equipmentequipmentroleassociation") #: associated procedure
|
|
|
|
process = relationship("Process",
|
|
back_populates="equipmentroleeequipmentassociation") #: associated procedure
|
|
|
|
def details_dict(self, **kwargs) -> dict:
|
|
output = super().details_dict(**kwargs)
|
|
output['equipment'] = self.equipment.details_dict()
|
|
output['equipment']['process'] = [item.details_dict() for item in self.process.processversion if
|
|
bool(item.active)]
|
|
return output
|
|
|
|
|
|
class Process(BaseClass):
|
|
"""
|
|
A Process is a method used by a piece of equipment.
|
|
"""
|
|
|
|
id = Column(INTEGER, primary_key=True) #: Process id, primary key
|
|
name = Column(String(64), unique=True) #: Process name
|
|
tips = relationship("Tips", back_populates='process',
|
|
secondary=process_tips) #: relation to KitType
|
|
|
|
processversion = relationship("ProcessVersion", back_populates="process")
|
|
|
|
equipmentroleeequipmentassociation = relationship("EquipmentRoleEquipmentAssociation", back_populates="process")
|
|
|
|
def set_attribute(self, key, value):
|
|
match key:
|
|
case "name":
|
|
self.name = value
|
|
case _:
|
|
field = getattr(self, key)
|
|
if value not in field:
|
|
field.append(value)
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
name: str | None = None,
|
|
id: int | None = None,
|
|
proceduretype: str | ProcedureType | None = None,
|
|
# kittype: str | KitType | None = None,
|
|
equipmentrole: str | EquipmentRole | None = None,
|
|
limit: int = 0,
|
|
**kwargs) -> Process | List[Process]:
|
|
"""
|
|
Lookup Processes
|
|
|
|
Args:
|
|
id (int | None, optional): Process id. Defaults to None.
|
|
name (str | None, optional): Process name. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0=all). Defaults to 0.
|
|
|
|
Returns:
|
|
Process|List[Process]: Process(es) matching criteria
|
|
"""
|
|
query = cls.__database_session__.query(cls)
|
|
match proceduretype:
|
|
case str():
|
|
proceduretype = ProcedureType.query(name=proceduretype)
|
|
query = query.filter(cls.proceduretype.contains(proceduretype))
|
|
case ProcedureType():
|
|
query = query.filter(cls.proceduretype.contains(proceduretype))
|
|
case _:
|
|
pass
|
|
|
|
match equipmentrole:
|
|
case str():
|
|
equipmentrole = EquipmentRole.query(name=equipmentrole)
|
|
query = query.filter(cls.equipmentrole.contains(equipmentrole))
|
|
case EquipmentRole():
|
|
query = query.filter(cls.equipmentrole.contains(equipmentrole))
|
|
case _:
|
|
pass
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match id:
|
|
case int():
|
|
query = query.filter(cls.id == id)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
def to_omni(self, expand: bool = False):
|
|
from backend.validators.omni_gui_objects import OmniProcess
|
|
if expand:
|
|
proceduretype = [item.to_omni() for item in self.proceduretype]
|
|
equipmentrole = [item.to_omni() for item in self.equipmentrole]
|
|
tiprole = [item.to_omni() for item in self.tiprole]
|
|
else:
|
|
proceduretype = [item.name for item in self.proceduretype]
|
|
equipmentrole = [item.name for item in self.equipmentrole]
|
|
tiprole = [item.name for item in self.tiprole]
|
|
return OmniProcess(
|
|
instance_object=self,
|
|
name=self.name,
|
|
proceduretype=proceduretype,
|
|
equipmentrole=equipmentrole,
|
|
tiprole=tiprole
|
|
)
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict(**kwargs)
|
|
output['processversion'] = [item.details_dict() for item in self.processversion]
|
|
tips = flatten_list([tipslot for tipslot in [tips.tipslot for tips in self.tips]])
|
|
output['tips'] = [tipslot.details_dict() for tipslot in tips]
|
|
return output
|
|
|
|
def to_pydantic(self):
|
|
output = super().to_pydantic()
|
|
return output
|
|
|
|
|
|
class ProcessVersion(BaseClass):
|
|
|
|
pyd_model_name = "Process"
|
|
|
|
id = Column(INTEGER, primary_key=True) #: Process id, primary key
|
|
version = Column(FLOAT(2), default=1.00) #: Version number
|
|
date_verified = Column(TIMESTAMP) #: Date this version was deemed worthy
|
|
project = Column(String(128)) #: Name of the project this belonds to.
|
|
active = Column(INTEGER, default=1) #: Is this version in use?
|
|
process = relationship("Process", back_populates="processversion")
|
|
process_id = Column(INTEGER, ForeignKey("_process.id", ondelete="SET NULL",
|
|
name="fk_version_process_id"))
|
|
procedureequipmentassociation = relationship("ProcedureEquipmentAssociation",
|
|
back_populates='processversion') #: relation to RunEquipmentAssociation
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return f"{self.process.name}-v{str(self.version)}"
|
|
|
|
@validates('active')
|
|
def validate_active(self, key, value):
|
|
"""
|
|
Ensures only 1 & 0 used in 'active'
|
|
|
|
Args:
|
|
key (str): name of attribute
|
|
value (_type_): value of attribute
|
|
|
|
Raises:
|
|
ValueError: Raised if bad value given
|
|
|
|
Returns:
|
|
_type_: value
|
|
"""
|
|
if not 0 <= value < 2:
|
|
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
|
|
return value
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict(**kwargs)
|
|
output['name'] = self.name
|
|
if not output['project']:
|
|
output['project'] = ""
|
|
output['tips'] = flatten_list(
|
|
[[lot.details_dict() for lot in tips.tipslot if bool(lot.active)] for tips in self.process.tips])
|
|
return output
|
|
|
|
def set_attribute(self, key, value):
|
|
setattr(self, key, value)
|
|
|
|
@classmethod
|
|
def query(cls,
|
|
version: str | float | None = None,
|
|
name: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs) -> ProcessVersion | List[ProcessVersion]:
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match name:
|
|
case str():
|
|
query = query.join(Process).filter(Process.name == name)
|
|
case _:
|
|
pass
|
|
match version:
|
|
case str() | float():
|
|
query = query.filter(cls.version == float(version))
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
# def to_pydantic(self, pyd_model_name: str | None = None, **kwargs):
|
|
# output = super().to_pydantic(pyd_model_name=pyd_model_name, **kwargs)
|
|
|
|
|
|
class Tips(BaseClass):
|
|
"""
|
|
An abstract reagentrole that a tip fills during a process
|
|
"""
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
tipslot = relationship("TipsLot", back_populates="tips") #: concrete instance of this tip type
|
|
manufacturer = Column(String(64)) #: Name of manufacturer
|
|
capacity = Column(INTEGER) #: How many uL the tip can hold.
|
|
ref = Column(String(64)) #: tip reference number
|
|
process = relationship("Process", back_populates="tips", secondary=process_tips) #: Associated process
|
|
|
|
@hybrid_property
|
|
def name(self):
|
|
return f"{self.manufacturer}-{self.ref}"
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
name: str | None = None,
|
|
manufacturer: str | None = None,
|
|
capacity: str | None = None,
|
|
ref: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs) -> Tips | List[Tips]:
|
|
query = cls.__database_session__.query(cls)
|
|
match name:
|
|
case str():
|
|
query = query.filter(cls.name == name)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
match manufacturer:
|
|
case str():
|
|
query = query.filter(cls.manufacturer == manufacturer)
|
|
case _:
|
|
pass
|
|
match capacity:
|
|
case int():
|
|
query = query.filter(cls.capacity == capacity)
|
|
case _:
|
|
pass
|
|
match ref:
|
|
case str():
|
|
query = query.filter(cls.ref == ref)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
def to_omni(self, expand: bool = False):
|
|
from backend.validators.omni_gui_objects import OmniTipRole
|
|
if expand:
|
|
tips = [item.to_omni() for item in self.tips]
|
|
else:
|
|
tips = [item.name for item in self.tips]
|
|
return OmniTipRole(
|
|
instance_object=self,
|
|
name=self.name,
|
|
tips=tips
|
|
)
|
|
|
|
|
|
class TipsLot(BaseClass, LogMixin):
|
|
"""
|
|
A concrete instance of tips.
|
|
"""
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
tips = relationship("Tips", back_populates="tipslot") #: joined parent tip type
|
|
tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete='SET NULL',
|
|
name="fk_tips_id")) #: id of parent tip type
|
|
lot = Column(String(64), unique=True) #: lot number
|
|
expiry = Column(TIMESTAMP) #: date of expiry
|
|
active = Column(INTEGER, default=1) #: whether or not these tips are currently in use.
|
|
|
|
@validates('active')
|
|
def validate_active(self, key, value):
|
|
"""
|
|
Ensures only 1 & 0 used in 'active'
|
|
|
|
Args:
|
|
key (str): name of attribute
|
|
value (_type_): value of attribute
|
|
|
|
Raises:
|
|
ValueError: Raised if bad value given
|
|
|
|
Returns:
|
|
_type_: value
|
|
"""
|
|
if not 0 <= value < 2:
|
|
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
|
|
return value
|
|
|
|
@property
|
|
def size(self) -> str:
|
|
return f"{self.capacity}ul"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return f"{self.tips.manufacturer}-{self.tips.ref}-{self.lot}"
|
|
|
|
@classmethod
|
|
def query(cls,
|
|
manufacturer: str | None = None,
|
|
ref: str | None = None,
|
|
lot: str | None = None,
|
|
limit: int = 0,
|
|
**kwargs) -> Tips | List[Tips]:
|
|
"""
|
|
Lookup tips
|
|
|
|
Args:
|
|
manufacturer (str | None, optional): Name of parent tip manufacturer. Defaults to None.
|
|
ref (str | None, optional): Name of parent tip reference number. Defaults to None.
|
|
lot (str | None, optional): Lot number. Defaults to None.
|
|
limit (int, optional): Maximum number of results to return (0=all). Defaults to 0.
|
|
|
|
Returns:
|
|
Tips | List[Tips]: Tips matching criteria
|
|
"""
|
|
query = cls.__database_session__.query(cls)
|
|
if manufacturer is not None and ref is not None:
|
|
manufacturer = None
|
|
match manufacturer:
|
|
case str():
|
|
logger.debug(f"Looking for {manufacturer}")
|
|
query = query.join(Tips).filter(Tips.manufacturer == manufacturer)
|
|
case _:
|
|
pass
|
|
match ref:
|
|
case str():
|
|
query = query.join(Tips).filter(Tips.ref == ref)
|
|
case _:
|
|
pass
|
|
match lot:
|
|
case str():
|
|
query = query.filter(cls.lot == lot)
|
|
limit = 1
|
|
case _:
|
|
pass
|
|
return cls.execute_query(query=query, limit=limit)
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
def to_omni(self, expand: bool = True):
|
|
from backend.validators.omni_gui_objects import OmniTips
|
|
return OmniTips(
|
|
instance_object=self,
|
|
name=self.name
|
|
)
|
|
|
|
def to_sub_dict(self, full_data: bool = False, **kwargs) -> dict:
|
|
"""
|
|
dictionary containing values necessary for gui
|
|
|
|
Args:
|
|
full_data (bool, optional): Whether to include procedure in data for details. Defaults to False.
|
|
|
|
Returns:
|
|
dict: representation of the equipment's attributes
|
|
"""
|
|
output = dict(
|
|
name=self.name,
|
|
lot=self.lot,
|
|
)
|
|
if full_data:
|
|
subs = [
|
|
dict(plate=item.procedure.procedure.rsl_plate_number, role=item.role_name,
|
|
sub_date=item.procedure.procedure.clientsubmission.submitted_date)
|
|
for item in self.tipsprocedureassociation]
|
|
output['procedure'] = sorted(subs, key=itemgetter("sub_date"), reverse=True)
|
|
output['excluded'] = ['missing', 'procedure', 'excluded', 'editable']
|
|
return output
|
|
|
|
def details_dict(self, **kwargs) -> dict:
|
|
output = super().details_dict()
|
|
output['name'] = self.name
|
|
return output
|
|
|
|
|
|
class ProcedureEquipmentAssociation(BaseClass):
|
|
"""
|
|
Abstract association between BasicRun and Equipment
|
|
"""
|
|
|
|
equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated equipment
|
|
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
|
|
equipmentrole = Column(String(64), primary_key=True) #: name of the role the equipment fills
|
|
processversion_id = Column(INTEGER, ForeignKey("_processversion.id", ondelete="SET NULL",
|
|
name="SEA_Process_id")) #: Foreign key of process id
|
|
start_time = Column(TIMESTAMP) #: start time of equipment use
|
|
end_time = Column(TIMESTAMP) #: end time of equipment use
|
|
comments = Column(String(1024)) #: comments about equipment
|
|
|
|
procedure = relationship(Procedure,
|
|
back_populates="procedureequipmentassociation") #: associated procedure
|
|
|
|
equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment
|
|
|
|
processversion = relationship(ProcessVersion,
|
|
back_populates="procedureequipmentassociation") #: Associated process version
|
|
|
|
tipslot_id = Column(INTEGER, ForeignKey("_tipslot.id", ondelete="SET NULL",
|
|
name="SEA_Tipslot_id"))
|
|
|
|
tipslot = relationship(TipsLot)
|
|
|
|
def __repr__(self) -> str:
|
|
try:
|
|
return f"<ProcedureEquipmentAssociation({self.name})>"
|
|
except AttributeError:
|
|
return "<ProcedureEquipmentAssociation(Unknown)>"
|
|
|
|
def __init__(self, procedure=None, equipment=None, procedure_id: int | None = None, equipment_id: int | None = None,
|
|
equipmentrole: str = "None"):
|
|
if not procedure:
|
|
if procedure_id:
|
|
procedure = Procedure.query(id=procedure_id)
|
|
else:
|
|
logger.error("Creation error")
|
|
self.procedure = procedure
|
|
if not equipment:
|
|
if equipment_id:
|
|
equipment = Equipment.query(id=equipment_id)
|
|
else:
|
|
logger.error("Creation error")
|
|
self.equipment = equipment
|
|
if isinstance(equipmentrole, list):
|
|
equipmentrole = equipmentrole[0]
|
|
if isinstance(equipmentrole, EquipmentRole):
|
|
equipmentrole = equipmentrole.name
|
|
self.equipmentrole = equipmentrole
|
|
|
|
@property
|
|
def name(self):
|
|
return f"{self.procedure.name} & {self.equipment.name}"
|
|
|
|
@property
|
|
def process(self):
|
|
return ProcessVersion.query(id=self.processversion_id)
|
|
|
|
@property
|
|
def tips(self):
|
|
try:
|
|
return Tips.query(id=self.tips_id, limit=1)
|
|
except AttributeError:
|
|
return None
|
|
|
|
def to_sub_dict(self) -> dict:
|
|
"""
|
|
This RunEquipmentAssociation as a dictionary
|
|
|
|
Returns:
|
|
dict: This RunEquipmentAssociation as a dictionary
|
|
"""
|
|
try:
|
|
process = self.process.name
|
|
except AttributeError:
|
|
process = "No process found"
|
|
output = dict(name=self.equipment.name, asset_number=self.equipment.asset_number, comment=self.comments,
|
|
processes=[process], role=self.equipmentrole, nickname=self.equipment.nickname)
|
|
return output
|
|
|
|
def to_pydantic(self) -> "PydEquipment":
|
|
"""
|
|
Returns a pydantic model based on this object.
|
|
|
|
Returns:
|
|
PydEquipment: pydantic equipment model
|
|
"""
|
|
from backend.validators import PydEquipment
|
|
return PydEquipment(**self.details_dict())
|
|
|
|
@classmethod
|
|
@setup_lookup
|
|
def query(cls,
|
|
equipment: int | Equipment | None = None,
|
|
procedure: int | Procedure | None = None,
|
|
equipmentrole: str | None = None,
|
|
limit: int = 0, **kwargs) \
|
|
-> Any | List[Any]:
|
|
"""
|
|
|
|
Args:
|
|
equipment ( int | Equipment | None, optional): The associated equipment of interest. Defaults to None.
|
|
procedure ( int | Procedure | None, optional): The associated procedure of interest. Defaults to None.
|
|
equipmentrole ( str | None, optional): The associated equipmentrole. Defaults to None.
|
|
limit ( int ): Maximum number of results to return (0=all). Defaults to 0.
|
|
**kwargs ():
|
|
|
|
Returns:
|
|
Any | List[Any]
|
|
"""
|
|
query: Query = cls.__database_session__.query(cls)
|
|
match equipment:
|
|
case int():
|
|
query = query.filter(cls.equipment_id == equipment)
|
|
case Equipment():
|
|
query = query.filter(cls.equipment == equipment)
|
|
case _:
|
|
pass
|
|
match procedure:
|
|
case int():
|
|
query = query.filter(cls.procedure_id == procedure)
|
|
case Procedure():
|
|
query = query.filter(cls.procedure == procedure)
|
|
case _:
|
|
pass
|
|
if equipmentrole is not None:
|
|
query = query.filter(cls.equipmentrole == equipmentrole)
|
|
return cls.execute_query(query=query, limit=limit, **kwargs)
|
|
|
|
def details_dict(self, **kwargs):
|
|
output = super().details_dict()
|
|
# NOTE: Figure out how to merge the misc_info if doing .update instead.
|
|
relevant = {k: v for k, v in output.items() if k not in ['equipment']}
|
|
output = output['equipment'].details_dict()
|
|
misc = output['misc_info']
|
|
output.update(relevant)
|
|
output['misc_info'] = misc
|
|
output['equipment_role'] = self.equipmentrole
|
|
output['processes'] = [item for item in self.equipment.get_processes(equipmentrole=output['equipment_role'])]
|
|
try:
|
|
output['processversion'] = self.processversion.details_dict()
|
|
except AttributeError:
|
|
output['processversion'] = None
|
|
try:
|
|
output['tips'] = self.tipslot.details_dict()
|
|
except AttributeError:
|
|
output['tips'] = None
|
|
return output
|
|
|
|
|
|
class ProcedureTypeEquipmentRoleAssociation(BaseClass):
|
|
"""
|
|
Abstract association between SubmissionType and EquipmentRole
|
|
"""
|
|
equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment
|
|
proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated procedure
|
|
uses = Column(JSON) #: locations of equipment on the procedure type excel sheet.
|
|
static = Column(INTEGER,
|
|
default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list?
|
|
proceduretype = relationship(ProcedureType,
|
|
back_populates="proceduretypeequipmentroleassociation",
|
|
foreign_keys=[proceduretype_id]) #: associated procedure
|
|
equipmentrole = relationship(EquipmentRole,
|
|
back_populates="equipmentroleproceduretypeassociation",
|
|
foreign_keys=[equipmentrole_id]) #: associated equipment
|
|
|
|
@validates('static')
|
|
def validate_static(self, key, value):
|
|
"""
|
|
Ensures only 1 & 0 used in 'static'
|
|
|
|
Args:
|
|
key (str): name of attribute
|
|
value (_type_): value of attribute
|
|
|
|
Raises:
|
|
ValueError: Raised if bad value given
|
|
|
|
Returns:
|
|
_type_: value
|
|
"""
|
|
if not 0 <= value < 2:
|
|
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
|
|
return value
|
|
|
|
@check_authorization
|
|
def save(self):
|
|
super().save()
|
|
|
|
|
|
class Results(BaseClass):
|
|
id = Column(INTEGER, primary_key=True) #: primary key
|
|
result_type = Column(String(32)) #: Name of the type of this result.
|
|
result = Column(JSON) #:
|
|
date_analyzed = Column(TIMESTAMP)
|
|
procedure_id = Column(INTEGER, ForeignKey("_procedure.id", ondelete='SET NULL',
|
|
name="fk_RES_procedure_id"))
|
|
procedure = relationship("Procedure", back_populates="results")
|
|
assoc_id = Column(INTEGER, ForeignKey("_proceduresampleassociation.id", ondelete='SET NULL',
|
|
name="fk_RES_ASSOC_id"))
|
|
sampleprocedureassociation = relationship("ProcedureSampleAssociation", back_populates="results")
|
|
_img = Column(String(128))
|
|
|
|
@property
|
|
def sample_id(self):
|
|
if self.assoc_id:
|
|
return self.sampleprocedureassociation.sample.sample_id
|
|
else:
|
|
return None
|
|
|
|
@property
|
|
def image(self) -> bytes | None:
|
|
dir = self.__directory_path__.joinpath("submission_imgs.zip")
|
|
try:
|
|
assert dir.exists()
|
|
except AssertionError:
|
|
return None
|
|
with zipfile.ZipFile(dir) as zf:
|
|
with zf.open(self._img) as f:
|
|
return f.read()
|
|
|
|
@image.setter
|
|
def image(self, value):
|
|
self._img = value
|
|
|
|
def to_pydantic(self, pyd_model_name: str | None = None, **kwargs):
|
|
output = super().to_pydantic(pyd_model_name=pyd_model_name, **kwargs)
|
|
if self.sample_id:
|
|
output.sample_id = self.sample_id
|
|
return output
|