Pre-sample/control connect

This commit is contained in:
Landon Wark
2023-12-05 10:20:46 -06:00
parent 283e77fee5
commit cddb947ec8
29 changed files with 1357 additions and 1042 deletions

View File

@@ -1,8 +1,50 @@
'''
Contains all models for sqlalchemy
'''
import sys
from sqlalchemy.orm import DeclarativeMeta, declarative_base
from sqlalchemy.ext.declarative import declared_attr
if 'pytest' in sys.modules:
from pathlib import Path
sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__())
Base: DeclarativeMeta = declarative_base()
class BaseClass(Base):
"""
Abstract class to pass ctx values to all SQLAlchemy objects.
Args:
Base (DeclarativeMeta): Declarative base for metadata.
"""
__abstract__ = True
__table_args__ = {'extend_existing': True}
@declared_attr
def __database_session__(cls):
if not 'pytest' in sys.modules:
from tools import ctx
else:
from test_settings import ctx
return ctx.database_session
@declared_attr
def __directory_path__(cls):
if not 'pytest' in sys.modules:
from tools import ctx
else:
from test_settings import ctx
return ctx.directory_path
@declared_attr
def __backup_path__(cls):
if not 'pytest' in sys.modules:
from tools import ctx
else:
from test_settings import ctx
return ctx.backup_path
from tools import Base
from .controls import *
# import order must go: orgs, kit, subs due to circular import issues
from .organizations import *

View File

@@ -7,7 +7,7 @@ from sqlalchemy.orm import relationship, Query
import logging
from operator import itemgetter
import json
from . import Base
from . import BaseClass
from tools import setup_lookup, query_return
from datetime import date, datetime
from typing import List
@@ -15,12 +15,11 @@ from dateutil.parser import parse
logger = logging.getLogger(f"submissions.{__name__}")
class ControlType(Base):
class ControlType(BaseClass):
"""
Base class of a control archetype.
"""
__tablename__ = '_control_types'
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(255), unique=True) #: controltype name (e.g. MCS)
@@ -29,7 +28,7 @@ class ControlType(Base):
@classmethod
@setup_lookup
def query(cls,
def query(cls,
name:str=None,
limit:int=0
) -> ControlType|List[ControlType]:
@@ -37,14 +36,13 @@ class ControlType(Base):
Lookup control archetypes in the database
Args:
ctx (Settings): Settings object passed down from gui.
name (str, optional): Control type name (limits results to 1). Defaults to None.
limit (int, optional): Maximum number of results to return. Defaults to 0.
Returns:
models.ControlType|List[models.ControlType]: ControlType(s) of interest.
"""
query = cls.metadata.session.query(cls)
query = cls.__database_session__.query(cls)
match name:
case str():
query = query.filter(cls.name==name)
@@ -52,14 +50,13 @@ class ControlType(Base):
case _:
pass
return query_return(query=query, limit=limit)
class Control(Base):
class Control(BaseClass):
"""
Base class of a control sample.
"""
__tablename__ = '_control_samples'
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
parent_id = Column(String, ForeignKey("_control_types.id", name="fk_control_parent_id")) #: primary key of control type
@@ -114,10 +111,9 @@ class Control(Base):
def convert_by_mode(self, mode:str) -> list[dict]:
"""
split control object into analysis types for controls graphs
split this instance into analysis types for controls graphs
Args:
control (models.Control): control to be parsed into list
mode (str): analysis type, 'contains', etc
Returns:
@@ -168,6 +164,21 @@ class Control(Base):
data = {}
return data
@classmethod
def get_modes(cls) -> List[str]:
"""
Get all control modes from database
Returns:
List[str]: List of control mode names.
"""
try:
cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.error(f"Failed to get available modes from db: {e}")
cols = []
return cols
@classmethod
@setup_lookup
def query(cls,
@@ -190,15 +201,14 @@ class Control(Base):
Returns:
models.Control|List[models.Control]: Control object of interest.
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
# by control type
match control_type:
case ControlType():
logger.debug(f"Looking up control by control type: {control_type}")
# query = query.join(models.ControlType).filter(models.ControlType==control_type)
# logger.debug(f"Looking up control by control type: {control_type}")
query = query.filter(cls.controltype==control_type)
case str():
logger.debug(f"Looking up control by control type: {control_type}")
# logger.debug(f"Looking up control by control type: {control_type}")
query = query.join(ControlType).filter(ControlType.name==control_type)
case _:
pass
@@ -224,7 +234,7 @@ class Control(Base):
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date().strftime("%Y-%m-%d")
case _:
end_date = parse(end_date).strftime("%Y-%m-%d")
logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
# logger.debug(f"Looking up BasicSubmissions from start date: {start_date} and end date: {end_date}")
query = query.filter(cls.submitted_date.between(start_date, end_date))
match control_name:
case str():
@@ -233,23 +243,3 @@ class Control(Base):
case _:
pass
return query_return(query=query, limit=limit)
@classmethod
def get_modes(cls):
"""
Get all control modes from database
Args:
ctx (Settings): Settings object passed down from gui.
Returns:
List[str]: List of control mode names.
"""
rel = cls.metadata.session.query(cls).first()
try:
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.debug(f"Failed to get available modes from db: {e}")
cols = []
return cols

View File

@@ -2,14 +2,15 @@
All kit and reagent related models
'''
from __future__ import annotations
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, func, BLOB
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date
import logging
from tools import check_authorization, setup_lookup, query_return, Report, Result
from tools import check_authorization, setup_lookup, query_return, Report, Result, Settings
from typing import List
from . import Base, Organization
from pandas import ExcelFile
from . import Base, BaseClass, Organization
logger = logging.getLogger(f'submissions.{__name__}')
@@ -21,12 +22,12 @@ reagenttypes_reagents = Table(
extend_existing = True
)
class KitType(Base):
class KitType(BaseClass):
"""
Base of kits used in submission processing
"""
__tablename__ = "_kits"
__table_args__ = {'extend_existing': True}
# __table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64), unique=True) #: name of kit
@@ -54,16 +55,7 @@ class KitType(Base):
def __repr__(self) -> str:
return f"<KitType({self.name})>"
def __str__(self) -> str:
"""
a string representing this object
Returns:
str: a string representing this object's name
"""
return self.name
def get_reagents(self, required:bool=False, submission_type:str|None=None) -> list:
def get_reagents(self, required:bool=False, submission_type:str|SubmissionType|None=None) -> list:
"""
Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation.
@@ -74,10 +66,13 @@ class KitType(Base):
Returns:
list: List of reagent types
"""
if submission_type != None:
relevant_associations = [item for item in self.kit_reagenttype_associations if submission_type in item.uses.keys()]
else:
relevant_associations = [item for item in self.kit_reagenttype_associations]
match submission_type:
case SubmissionType():
relevant_associations = [item for item in self.kit_reagenttype_associations if submission_type.name in item.uses.keys()]
case str():
relevant_associations = [item for item in self.kit_reagenttype_associations if submission_type in item.uses.keys()]
case _:
relevant_associations = [item for item in self.kit_reagenttype_associations]
if required:
return [item.reagent_type for item in relevant_associations if item.required == 1]
else:
@@ -109,14 +104,9 @@ class KitType(Base):
map['info'] = {}
return map
@check_authorization
def save(self):
self.metadata.session.add(self)
self.metadata.session.commit()
@classmethod
@setup_lookup
def query(cls,
def query(cls,
name:str=None,
used_for:str|SubmissionType|None=None,
id:int|None=None,
@@ -126,7 +116,6 @@ class KitType(Base):
Lookup a list of or single KitType.
Args:
ctx (Settings): Settings object passed down from gui
name (str, optional): Name of desired kit (returns single instance). Defaults to None.
used_for (str | models.Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
id (int | None, optional): Kit id in the database. Defaults to None.
@@ -135,10 +124,10 @@ class KitType(Base):
Returns:
models.KitType|List[models.KitType]: KitType(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
match used_for:
case str():
logger.debug(f"Looking up kit type by use: {used_for}")
# logger.debug(f"Looking up kit type by use: {used_for}")
query = query.filter(cls.used_for.any(name=used_for))
case SubmissionType():
query = query.filter(cls.used_for.contains(used_for))
@@ -146,30 +135,37 @@ class KitType(Base):
pass
match name:
case str():
logger.debug(f"Looking up kit type by name: {name}")
# logger.debug(f"Looking up kit type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
match id:
case int():
logger.debug(f"Looking up kit type by id: {id}")
# logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(cls.id==id)
limit = 1
case str():
logger.debug(f"Looking up kit type by id: {id}")
# logger.debug(f"Looking up kit type by id: {id}")
query = query.filter(cls.id==int(id))
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
@check_authorization
def save(self, ctx:Settings):
"""
Add this instance to database and commit
"""
self.__database_session__.add(self)
self.__database_session__.commit()
class ReagentType(Base):
class ReagentType(BaseClass):
"""
Base of reagent type abstract
"""
__tablename__ = "_reagent_types"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of reagent type
@@ -187,21 +183,21 @@ class ReagentType(Base):
# creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
kit_types = association_proxy("reagenttype_kit_associations", "kit_type", creator=lambda kit: KitTypeReagentTypeAssociation(kit_type=kit))
def __str__(self) -> str:
"""
string representing this object
# def __str__(self) -> str:
# """
# string representing this object
Returns:
str: string representing this object's name
"""
return self.name
# Returns:
# str: string representing this object's name
# """
# return self.name
def __repr__(self):
return f"ReagentType({self.name})"
return f"<ReagentType({self.name})>"
@classmethod
@setup_lookup
def query(cls,
def query(cls,
name: str|None=None,
kit_type: KitType|str|None=None,
reagent: Reagent|str|None=None,
@@ -211,14 +207,18 @@ class ReagentType(Base):
Lookup reagent types in the database.
Args:
ctx (Settings): Settings object passed down from gui.
name (str | None, optional): Reagent type name. Defaults to None.
kit_type (KitType | str | None, optional): Kit the type of interest belongs to. Defaults to None.
reagent (Reagent | str | None, optional): Concrete instance of the type of interest. Defaults to None.
limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0.
Raises:
ValueError: Raised if only kit_type or reagent, not both, given.
Returns:
models.ReagentType|List[models.ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query: Query = cls.metadata.session.query(cls)
ReagentType|List[ReagentType]: ReagentType or list of ReagentTypes matching filter.
"""
query: Query = cls.__database_session__.query(cls)
if (kit_type != None and reagent == None) or (reagent != None and kit_type == None):
raise ValueError("Cannot filter without both reagent and kit type.")
elif kit_type == None and reagent == None:
@@ -235,9 +235,8 @@ class ReagentType(Base):
case _:
pass
assert reagent.type != []
logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
# logger.debug(f"Reagent reagent types: {reagent._sa_instance_state}")
# logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
# logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
result = list(set(kit_type.reagent_types).intersection(reagent.type))
logger.debug(f"Result: {result}")
try:
@@ -246,34 +245,33 @@ class ReagentType(Base):
return None
match name:
case str():
logger.debug(f"Looking up reagent type by name: {name}")
# logger.debug(f"Looking up reagent type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
class KitTypeReagentTypeAssociation(Base):
class KitTypeReagentTypeAssociation(BaseClass):
"""
table containing reagenttype/kittype associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
__tablename__ = "_reagenttypes_kittypes"
__table_args__ = {'extend_existing': True}
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
uses = Column(JSON)
required = Column(INTEGER)
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True) #: id of associated reagent type
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True) #: id of associated reagent type
uses = Column(JSON) #: map to location on excel sheets of different submission types
required = Column(INTEGER) #: whether the reagent type is required for the kit (Boolean 1 or 0)
last_used = Column(String(32)) #: last used lot number of this type of reagent
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations")
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations") #: relationship to associated kit
# reference to the "ReagentType" object
reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations")
reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations") #: relationship to associated reagent type
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
logger.debug(f"Parameters: Kit={kit_type}, RT={reagent_type}, Uses={uses}, Required={required}")
# logger.debug(f"Parameters: Kit={kit_type}, RT={reagent_type}, Uses={uses}, Required={required}")
self.kit_type = kit_type
self.reagent_type = reagent_type
self.uses = uses
@@ -284,12 +282,38 @@ class KitTypeReagentTypeAssociation(Base):
@validates('required')
def validate_age(self, key, value):
"""
Ensures only 1 & 0 used in 'required'
Args:
key (str): name of attribute
value (_type_): value of attribute
Raises:
ValueError: Raised if bad value given
Returns:
_type_: value
"""
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
@validates('reagenttype')
def validate_reagenttype(self, key, value):
"""
Ensures reagenttype is an actual ReagentType
Args:
key (str)): name of attribute
value (_type_): value of attribute
Raises:
ValueError: raised if reagenttype is not a ReagentType
Returns:
_type_: ReagentType
"""
if not isinstance(value, ReagentType):
raise ValueError(f'{value} is not a reagenttype')
return value
@@ -297,15 +321,14 @@ class KitTypeReagentTypeAssociation(Base):
@classmethod
@setup_lookup
def query(cls,
kit_type:KitType|str|None,
reagent_type:ReagentType|str|None,
kit_type:KitType|str|None=None,
reagent_type:ReagentType|str|None=None,
limit:int=0
) -> KitTypeReagentTypeAssociation|List[KitTypeReagentTypeAssociation]:
"""
Lookup junction of ReagentType and KitType
Args:
ctx (Settings): Settings object passed down from gui.
kit_type (models.KitType | str | None): KitType of interest.
reagent_type (models.ReagentType | str | None): ReagentType of interest.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
@@ -313,7 +336,7 @@ class KitTypeReagentTypeAssociation(Base):
Returns:
models.KitTypeReagentTypeAssociation|List[models.KitTypeReagentTypeAssociation]: Junction of interest.
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
match kit_type:
case KitType():
query = query.filter(cls.kit_type==kit_type)
@@ -333,17 +356,22 @@ class KitTypeReagentTypeAssociation(Base):
return query_return(query=query, limit=limit)
def save(self) -> Report:
"""
Adds this instance to the database and commits.
Returns:
Report: Result of save action
"""
report = Report()
self.metadata.session.add(self)
self.metadata.session.commit()
self.__database_session__.add(self)
self.__database_session__.commit()
return report
class Reagent(Base):
class Reagent(BaseClass):
"""
Concrete reagent instance
"""
__tablename__ = "_reagents"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
type = relationship("ReagentType", back_populates="instances", secondary=reagenttypes_reagents) #: joined parent reagent type
@@ -358,16 +386,7 @@ class Reagent(Base):
return f"<Reagent({self.name}-{self.lot})>"
else:
return f"<Reagent({self.type.name}-{self.lot})>"
def __str__(self) -> str:
"""
string representing this object
Returns:
str: string representing this object's type and lot number
"""
return str(self.lot)
def to_sub_dict(self, extraction_kit:KitType=None) -> dict:
"""
dictionary containing values necessary for gui
@@ -376,7 +395,7 @@ class Reagent(Base):
extraction_kit (KitType, optional): KitType to use to get reagent type. Defaults to None.
Returns:
dict: _description_
dict: representation of the reagent's attributes
"""
if extraction_kit != None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
@@ -388,73 +407,59 @@ class Reagent(Base):
else:
reagent_role = self.type[0]
try:
rtype = reagent_role.name.replace("_", " ").title()
rtype = reagent_role.name.replace("_", " ")
except AttributeError:
rtype = "Unknown"
# Calculate expiry with EOL from ReagentType
try:
place_holder = self.expiry + reagent_role.eol_ext
except TypeError as e:
except (TypeError, AttributeError) as e:
place_holder = date.today()
logger.debug(f"We got a type error setting {self.lot} expiry: {e}. setting to today for testing")
except AttributeError as e:
place_holder = date.today()
logger.debug(f"We got an attribute error setting {self.lot} expiry: {e}. Setting to today for testing")
return {
"type": rtype,
"lot": self.lot,
"expiry": place_holder.strftime("%Y-%m-%d")
}
return dict(
name=self.name,
type=rtype,
lot=self.lot,
expiry=place_holder.strftime("%Y-%m-%d")
)
def to_reagent_dict(self, extraction_kit:KitType|str=None) -> dict:
def update_last_used(self, kit:KitType) -> Report:
"""
Returns basic reagent dictionary.
Updates last used reagent lot for ReagentType/KitType
Args:
extraction_kit (KitType, optional): KitType to use to get reagent type. Defaults to None.
kit (KitType): Kit this instance is used in.
Returns:
dict: Basic reagent dictionary of 'type', 'lot', 'expiry'
Report: Result of operation
"""
if extraction_kit != None:
# Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
try:
reagent_role = list(set(self.type).intersection(extraction_kit.reagent_types))[0]
# Most will be able to fall back to first ReagentType in itself because most will only have 1.
except:
reagent_role = self.type[0]
else:
reagent_role = self.type[0]
try:
rtype = reagent_role.name
except AttributeError:
rtype = "Unknown"
try:
expiry = self.expiry.strftime("%Y-%m-%d")
except:
expiry = date.today()
return {
"name":self.name,
"type": rtype,
"lot": self.lot,
"expiry": self.expiry.strftime("%Y-%m-%d")
}
def save(self):
self.metadata.session.add(self)
self.metadata.session.commit()
report = Report()
logger.debug(f"Attempting update of reagent type at intersection of ({self}), ({kit})")
rt = ReagentType.query(kit_type=kit, reagent=self, limit=1)
if rt != None:
logger.debug(f"got reagenttype {rt}")
assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt)
if assoc != None:
if assoc.last_used != self.lot:
logger.debug(f"Updating {assoc} last used to {self.lot}")
assoc.last_used = self.lot
result = assoc.save()
report.add_result(result)
return report
report.add_result(Result(msg=f"Updating last used {rt} was not performed.", status="Information"))
return report
@classmethod
@setup_lookup
def query(cls, reagent_type:str|ReagentType|None=None,
lot_number:str|None=None,
limit:int=0
) -> Reagent|List[Reagent]:
def query(cls,
reagent_type:str|ReagentType|None=None,
lot_number:str|None=None,
limit:int=0
) -> Reagent|List[Reagent]:
"""
Lookup a list of reagents from the database.
Args:
ctx (Settings): Settings object passed down from gui
reagent_type (str | models.ReagentType | None, optional): Reagent type. Defaults to None.
lot_number (str | None, optional): Reagent lot number. Defaults to None.
limit (int, optional): limit of results returned. Defaults to 0.
@@ -462,13 +467,14 @@ class Reagent(Base):
Returns:
models.Reagent | List[models.Reagent]: reagent or list of reagents matching filter.
"""
query: Query = cls.metadata.session.query(cls)
# super().query(session)
query: Query = cls.__database_session__.query(cls)
match reagent_type:
case str():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.join(cls.type, aliased=True).filter(ReagentType.name==reagent_type)
# logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.join(cls.type).filter(ReagentType.name==reagent_type)
case ReagentType():
logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
# logger.debug(f"Looking up reagents by reagent type: {reagent_type}")
query = query.filter(cls.type.contains(reagent_type))
case _:
pass
@@ -482,42 +488,33 @@ class Reagent(Base):
pass
return query_return(query=query, limit=limit)
def update_last_used(self, kit:KitType):
report = Report()
logger.debug(f"Attempting update of reagent type at intersection of ({self}), ({kit})")
rt = ReagentType.query(kit_type=kit, reagent=self, limit=1)
if rt != None:
logger.debug(f"got reagenttype {rt}")
assoc = KitTypeReagentTypeAssociation.query(kit_type=kit, reagent_type=rt)
if assoc != None:
if assoc.last_used != self.lot:
logger.debug(f"Updating {assoc} last used to {self.lot}")
assoc.last_used = self.lot
result = assoc.save()
return(report.add_result(result))
return report.add_result(Result(msg=f"Updating last used {rt} was not performed.", status="Information"))
class Discount(Base):
def save(self):
"""
Add this instance to the database and commit
"""
self.__database_session__.add(self)
self.__database_session__.commit()
class Discount(BaseClass):
"""
Relationship table for client labs for certain kits.
"""
__tablename__ = "_discounts"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
kit = relationship("KitType") #: joined parent reagent type
kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete='SET NULL', name="fk_kit_type_id"))
kit_id = Column(INTEGER, ForeignKey("_kits.id", ondelete='SET NULL', name="fk_kit_type_id")) #: id of joined kit
client = relationship("Organization") #: joined client lab
client_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete='SET NULL', name="fk_org_id"))
name = Column(String(128))
amount = Column(FLOAT(2))
client_id = Column(INTEGER, ForeignKey("_organizations.id", ondelete='SET NULL', name="fk_org_id")) #: id of joined client
name = Column(String(128)) #: Short description
amount = Column(FLOAT(2)) #: Dollar amount of discount
def __repr__(self) -> str:
return f"<Discount({self.name})>"
@classmethod
@setup_lookup
def query(cls,
def query(cls,
organization:Organization|str|int|None=None,
kit_type:KitType|str|int|None=None,
) -> Discount|List[Discount]:
@@ -525,7 +522,6 @@ class Discount(Base):
Lookup discount objects (union of kit and organization)
Args:
ctx (Settings): Settings object passed down from the gui.
organization (models.Organization | str | int): Organization receiving discount.
kit_type (models.KitType | str | int): Kit discount received on.
@@ -536,60 +532,68 @@ class Discount(Base):
Returns:
models.Discount|List[models.Discount]: Discount(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
match organization:
case Organization():
logger.debug(f"Looking up discount with organization: {organization}")
# logger.debug(f"Looking up discount with organization: {organization}")
query = query.filter(cls.client==Organization)
case str():
logger.debug(f"Looking up discount with organization: {organization}")
# logger.debug(f"Looking up discount with organization: {organization}")
query = query.join(Organization).filter(Organization.name==organization)
case int():
logger.debug(f"Looking up discount with organization id: {organization}")
# logger.debug(f"Looking up discount with organization id: {organization}")
query = query.join(Organization).filter(Organization.id==organization)
case _:
# raise ValueError(f"Invalid value for organization: {organization}")
pass
match kit_type:
case KitType():
logger.debug(f"Looking up discount with kit type: {kit_type}")
# logger.debug(f"Looking up discount with kit type: {kit_type}")
query = query.filter(cls.kit==kit_type)
case str():
logger.debug(f"Looking up discount with kit type: {kit_type}")
# logger.debug(f"Looking up discount with kit type: {kit_type}")
query = query.join(KitType).filter(KitType.name==kit_type)
case int():
logger.debug(f"Looking up discount with kit type id: {organization}")
# logger.debug(f"Looking up discount with kit type id: {organization}")
query = query.join(KitType).filter(KitType.id==kit_type)
case _:
# raise ValueError(f"Invalid value for kit type: {kit_type}")
pass
return query.all()
class SubmissionType(Base):
class SubmissionType(BaseClass):
"""
Abstract of types of submissions.
"""
__tablename__ = "_submission_types"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(128), unique=True) #: name of submission type
info_map = Column(JSON) #: Where basic information is found in the excel workbook corresponding to this type.
instances = relationship("BasicSubmission", backref="submission_type")
instances = relationship("BasicSubmission", backref="submission_type") #: Concrete instances of this type.
# regex = Column(String(512))
# template_file = Column(BLOB)
template_file = Column(BLOB) #: Blank form for this type stored as binary.
submissiontype_kit_associations = relationship(
"SubmissionTypeKitTypeAssociation",
back_populates="submission_type",
cascade="all, delete-orphan",
)
) #: Association of kittypes
kit_types = association_proxy("submissiontype_kit_associations", "kit_type")
kit_types = association_proxy("submissiontype_kit_associations", "kit_type") #: Proxy of kittype association
def __repr__(self) -> str:
return f"<SubmissionType({self.name})>"
def get_template_file_sheets(self) -> List[str]:
"""
Gets names of sheet in the stored blank form.
Returns:
List[str]: List of sheet names
"""
return ExcelFile(self.template_file).sheet_names
@classmethod
@setup_lookup
def query(cls,
@@ -603,15 +607,16 @@ class SubmissionType(Base):
Args:
ctx (Settings): Settings object passed down from gui
name (str | None, optional): Name of submission type. Defaults to None.
key (str | None, optional): A key present in the info-map to lookup. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
models.SubmissionType|List[models.SubmissionType]: SubmissionType(s) of interest.
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
match name:
case str():
logger.debug(f"Looking up submission type by name: {name}")
# logger.debug(f"Looking up submission type by name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
@@ -624,27 +629,28 @@ class SubmissionType(Base):
return query_return(query=query, limit=limit)
def save(self):
self.metadata.session.add(self)
self.metadata.session.commit()
return None
"""
Adds this instances to the database and commits.
"""
self.__database_session__.add(self)
self.__database_session__.commit()
class SubmissionTypeKitTypeAssociation(Base):
class SubmissionTypeKitTypeAssociation(BaseClass):
"""
Abstract of relationship between kits and their submission type.
"""
__tablename__ = "_submissiontypes_kittypes"
__table_args__ = {'extend_existing': True}
submission_types_id = Column(INTEGER, ForeignKey("_submission_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
submission_types_id = Column(INTEGER, ForeignKey("_submission_types.id"), primary_key=True) #: id of joined submission type
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True) #: id of joined kit
mutable_cost_column = Column(FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc)
mutable_cost_sample = Column(FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc)
constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc)
kit_type = relationship(KitType, back_populates="kit_submissiontype_associations")
kit_type = relationship(KitType, back_populates="kit_submissiontype_associations") #: joined kittype
# reference to the "SubmissionType" object
submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_associations")
submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_associations") #: joined submission type
def __init__(self, kit_type=None, submission_type=None):
self.kit_type = kit_type
@@ -661,32 +667,42 @@ class SubmissionTypeKitTypeAssociation(Base):
@classmethod
@setup_lookup
def query(cls,
submission_type:SubmissionType|str|int|None=None,
def query(cls,
submission_type:SubmissionType|str|int|None=None,
kit_type:KitType|str|int|None=None,
limit:int=0
):
query: Query = cls.metadata.session.query(cls)
) -> SubmissionTypeKitTypeAssociation|List[SubmissionTypeKitTypeAssociation]:
"""
Lookup SubmissionTypeKitTypeAssociations of interest.
Args:
submission_type (SubmissionType | str | int | None, optional): Identifier of submission type. Defaults to None.
kit_type (KitType | str | int | None, optional): Identifier of kit type. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
SubmissionTypeKitTypeAssociation|List[SubmissionTypeKitTypeAssociation]: SubmissionTypeKitTypeAssociation(s) of interest
"""
query: Query = cls.__database_session__.query(cls)
match submission_type:
case SubmissionType():
logger.debug(f"Looking up {cls.__name__} by SubmissionType {submission_type}")
# logger.debug(f"Looking up {cls.__name__} by SubmissionType {submission_type}")
query = query.filter(cls.submission_type==submission_type)
case str():
logger.debug(f"Looking up {cls.__name__} by name {submission_type}")
# logger.debug(f"Looking up {cls.__name__} by name {submission_type}")
query = query.join(SubmissionType).filter(SubmissionType.name==submission_type)
case int():
logger.debug(f"Looking up {cls.__name__} by id {submission_type}")
# logger.debug(f"Looking up {cls.__name__} by id {submission_type}")
query = query.join(SubmissionType).filter(SubmissionType.id==submission_type)
match kit_type:
case KitType():
logger.debug(f"Looking up {cls.__name__} by KitType {kit_type}")
# logger.debug(f"Looking up {cls.__name__} by KitType {kit_type}")
query = query.filter(cls.kit_type==kit_type)
case str():
logger.debug(f"Looking up {cls.__name__} by name {kit_type}")
# logger.debug(f"Looking up {cls.__name__} by name {kit_type}")
query = query.join(KitType).filter(KitType.name==kit_type)
case int():
logger.debug(f"Looking up {cls.__name__} by id {kit_type}")
# logger.debug(f"Looking up {cls.__name__} by id {kit_type}")
query = query.join(KitType).filter(KitType.id==kit_type)
limit = query.count()
return query_return(query=query, limit=limit)

View File

@@ -4,8 +4,8 @@ All client organization related models.
from __future__ import annotations
from sqlalchemy import Column, String, INTEGER, ForeignKey, Table
from sqlalchemy.orm import relationship, Query
from . import Base
from tools import check_authorization, setup_lookup, query_return
from . import Base, BaseClass
from tools import check_authorization, setup_lookup, query_return, Settings
from typing import List
import logging
@@ -21,12 +21,11 @@ orgs_contacts = Table(
extend_existing = True
)
class Organization(Base):
class Organization(BaseClass):
"""
Base of organization
"""
__tablename__ = "_organizations"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: organization name
@@ -34,29 +33,15 @@ class Organization(Base):
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization", secondary=orgs_contacts) #: contacts involved with this org
def __str__(self) -> str:
"""
String representing organization
Returns:
str: string representing organization name
"""
return self.name.replace("_", " ").title()
def __repr__(self) -> str:
return f"<Organization({self.name})>"
@check_authorization
def save(self, ctx):
ctx.database_session.add(self)
ctx.database_session.commit()
def set_attribute(self, name:str, value):
setattr(self, name, value)
@classmethod
@setup_lookup
def query(cls,
def query(cls,
name:str|None=None,
limit:int=0,
) -> Organization|List[Organization]:
@@ -68,24 +53,34 @@ class Organization(Base):
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
Organization|List[Organization]: _description_
Organization|List[Organization]:
"""
query: Query = cls.metadata.session.query(cls)
query: Query = cls.__database_session__.query(cls)
match name:
case str():
logger.debug(f"Looking up organization with name: {name}")
# logger.debug(f"Looking up organization with name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)
@check_authorization
def save(self, ctx:Settings):
"""
Adds this instance to the database and commits
class Contact(Base):
Args:
ctx (Settings): Settings object passed down from GUI. Necessary to check authorization
"""
ctx.database_session.add(self)
ctx.database_session.commit()
class Contact(BaseClass):
"""
Base of Contact
"""
__tablename__ = "_contacts"
__table_args__ = {'extend_existing': True}
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: contact name
@@ -98,7 +93,7 @@ class Contact(Base):
@classmethod
@setup_lookup
def query(cls,
def query(cls,
name:str|None=None,
email:str|None=None,
phone:str|None=None,
@@ -109,32 +104,35 @@ class Contact(Base):
Args:
name (str | None, optional): Name of the contact. Defaults to None.
email (str | None, optional): Email of the contact. Defaults to None.
phone (str | None, optional): Phone number of the contact. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
Contact|List[Contact]: _description_
"""
query: Query = cls.metadata.session.query(cls)
Contact|List[Contact]: Contact(s) of interest.
"""
# super().query(session)
query: Query = cls.__database_session__.query(cls)
match name:
case str():
logger.debug(f"Looking up contact with name: {name}")
# logger.debug(f"Looking up contact with name: {name}")
query = query.filter(cls.name==name)
limit = 1
case _:
pass
match email:
case str():
logger.debug(f"Looking up contact with email: {name}")
# logger.debug(f"Looking up contact with email: {name}")
query = query.filter(cls.email==email)
limit = 1
case _:
pass
match phone:
case str():
logger.debug(f"Looking up contact with phone: {name}")
# logger.debug(f"Looking up contact with phone: {name}")
query = query.filter(cls.phone==phone)
limit = 1
case _:
pass
return query_return(query=query, limit=limit)

File diff suppressed because it is too large Load Diff