Sanity checking.

This commit is contained in:
lwark
2024-05-16 14:07:18 -05:00
parent 84fac23890
commit bbcbd35127
6 changed files with 466 additions and 365 deletions

View File

@@ -1,3 +1,9 @@
## 202405.03
- Settings can now pull values from the db.
- Improved generic and WW specific PCR parsers.
- Various bug fixes.
## 202405.01
- New Excel writers

View File

@@ -55,10 +55,10 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db
; sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db
; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-demo.db
;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-prototypes.db
;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db
sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db
[post_write_hooks]

View File

@@ -3,6 +3,8 @@ Contains all models for sqlalchemy
'''
from __future__ import annotations
import sys, logging
from sqlalchemy import Column, INTEGER, String, JSON
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.exc import ArgumentError
@@ -11,8 +13,6 @@ from pathlib import Path
# Load testing environment
if 'pytest' in sys.modules:
from pathlib import Path
sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__())
Base: DeclarativeMeta = declarative_base()
@@ -46,7 +46,7 @@ class BaseClass(Base):
Returns:
Session: DB session from ctx settings.
"""
if not 'pytest' in sys.modules:
if 'pytest' not in sys.modules:
from tools import ctx
else:
from test_settings import ctx
@@ -60,7 +60,7 @@ class BaseClass(Base):
Returns:
Path: Location of the Submissions directory in Settings object
"""
if not 'pytest' in sys.modules:
if 'pytest' not in sys.modules:
from tools import ctx
else:
from test_settings import ctx
@@ -74,7 +74,7 @@ class BaseClass(Base):
Returns:
Path: Location of the Submissions backup directory in Settings object
"""
if not 'pytest' in sys.modules:
if 'pytest' not in sys.modules:
from tools import ctx
else:
from test_settings import ctx
@@ -104,8 +104,9 @@ class BaseClass(Base):
Execute sqlalchemy query.
Args:
query (Query): input query object
limit (int): Maximum number of results. (0 = all)
model (Any, optional): model to be queried. Defaults to None
query (Query, optional): input query object. Defaults to None
limit (int): Maximum number of results. (0 = all). Defaults to 0
Returns:
Any | List[Any]: Single result if limit = 1 or List if other.
@@ -153,6 +154,19 @@ class BaseClass(Base):
self.__database_session__.rollback()
class ConfigItem(BaseClass):
id = Column(INTEGER, primary_key=True)
key = Column(String(32))
value = Column(JSON)
def __repr__(self):
return f"ConfigItem({self.key} : {self.value})"
@classmethod
def get_config_items(cls):
return cls.__database_session__.query(cls).all()
from .controls import *
# import order must go: orgs, kit, subs due to circular import issues
from .organizations import *

View File

@@ -175,19 +175,19 @@ class Control(BaseClass):
output = []
# logger.debug("load json string for mode (i.e. contains, matches, kraken2)")
try:
# data = json.loads(getattr(self, mode))
data = self.__getattribute__(mode)
except TypeError:
data = {}
logger.debug(f"Length of data: {len(data)}")
# logger.debug("dict keys are genera of bacteria, e.g. 'Streptococcus'")
for genus in data:
_dict = {}
_dict['name'] = self.name
_dict['submitted_date'] = self.submitted_date
_dict['genus'] = genus
_dict = dict(
name=self.name,
submitted_date=self.submitted_date,
genus=genus,
target='Target' if genus.strip("*") in self.controltype.targets else "Off-target"
)
# logger.debug("get Target or Off-target of genus")
_dict['target'] = 'Target' if genus.strip("*") in self.controltype.targets else "Off-target"
# logger.debug("set 'contains_hashes', etc for genus")
for key in data[genus]:
_dict[key] = data[genus][key]
@@ -247,13 +247,13 @@ class Control(BaseClass):
case _:
pass
# by date range
if start_date != None and end_date == None:
if start_date is not None and end_date is None:
logger.warning(f"Start date with no end date, using today.")
end_date = date.today()
if end_date != None and start_date == None:
if end_date is not None and start_date is None:
logger.warning(f"End date with no start date, using Jan 1, 2023")
start_date = date(2023, 1, 1)
if start_date != None:
if start_date is not None:
match start_date:
case date():
# logger.debug(f"Lookup control by start date({start_date})")

View File

@@ -1,10 +1,8 @@
'''
"""
All kit and reagent related models
'''
"""
from __future__ import annotations
from copy import copy
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy
@@ -72,6 +70,7 @@ kittypes_processes = Table(
extend_existing=True
)
class KitType(BaseClass):
"""
Base of kits used in submission processing
@@ -80,7 +79,8 @@ class KitType(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64), unique=True) #: name of kit
submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for
processes = relationship("Process", back_populates="kit_types", secondary=kittypes_processes) #: equipment processes used by this kit
processes = relationship("Process", back_populates="kit_types",
secondary=kittypes_processes) #: equipment processes used by this kit
kit_reagenttype_associations = relationship(
"KitTypeReagentTypeAssociation",
@@ -89,7 +89,9 @@ class KitType(BaseClass):
)
# creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type", creator=lambda RT: KitTypeReagentTypeAssociation(reagent_type=RT)) #: Association proxy to KitTypeReagentTypeAssociation
reagent_types = association_proxy("kit_reagenttype_associations", "reagent_type",
creator=lambda RT: KitTypeReagentTypeAssociation(
reagent_type=RT)) #: Association proxy to KitTypeReagentTypeAssociation
kit_submissiontype_associations = relationship(
"SubmissionTypeKitTypeAssociation",
@@ -97,7 +99,8 @@ class KitType(BaseClass):
cascade="all, delete-orphan",
) #: Relation to SubmissionType
used_for = association_proxy("kit_submissiontype_associations", "submission_type") #: Association proxy to SubmissionTypeKitTypeAssociation
used_for = association_proxy("kit_submissiontype_associations",
"submission_type") #: Association proxy to SubmissionTypeKitTypeAssociation
def __repr__(self) -> str:
"""
@@ -120,10 +123,12 @@ class KitType(BaseClass):
match submission_type:
case SubmissionType():
# logger.debug(f"Getting reagents by SubmissionType {submission_type}")
relevant_associations = [item for item in self.kit_reagenttype_associations if item.submission_type==submission_type]
relevant_associations = [item for item in self.kit_reagenttype_associations if
item.submission_type == submission_type]
case str():
# logger.debug(f"Getting reagents by str {submission_type}")
relevant_associations = [item for item in self.kit_reagenttype_associations if item.submission_type.name==submission_type]
relevant_associations = [item for item in self.kit_reagenttype_associations if
item.submission_type.name == submission_type]
case _:
# logger.debug(f"Getting reagents")
relevant_associations = [item for item in self.kit_reagenttype_associations]
@@ -139,17 +144,18 @@ class KitType(BaseClass):
Creates map of locations in excel workbook for a SubmissionType
Args:
use (str | SubmissionType): Submissiontype.name
submission_type (str | SubmissionType): Submissiontype.name
Returns:
dict: Dictionary containing information locations.
"""
map = {}
info_map = {}
# Account for submission_type variable type.
match submission_type:
case str():
# logger.debug(f"Constructing xl map with str {submission_type}")
assocs = [item for item in self.kit_reagenttype_associations if item.submission_type.name==submission_type]
assocs = [item for item in self.kit_reagenttype_associations if
item.submission_type.name == submission_type]
st_assoc = [item for item in self.used_for if submission_type == item.name][0]
case SubmissionType():
# logger.debug(f"Constructing xl map with SubmissionType {submission_type}")
@@ -160,16 +166,10 @@ class KitType(BaseClass):
# logger.debug("Get all KitTypeReagentTypeAssociation for SubmissionType")
for assoc in assocs:
try:
map[assoc.reagent_type.name] = assoc.uses
info_map[assoc.reagent_type.name] = assoc.uses
except TypeError:
continue
# # Get SubmissionType info map
# try:
# # map['info'] = st_assoc.info_map
# map['info'] = st_assoc.construct_info_map(mode="write")
# except IndexError as e:
# map['info'] = {}
return map
return info_map
@classmethod
@setup_lookup
@@ -225,6 +225,7 @@ class KitType(BaseClass):
def save(self):
super().save()
class ReagentType(BaseClass):
"""
Base of reagent type abstract
@@ -232,7 +233,8 @@ class ReagentType(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of reagent type
instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type
instances = relationship("Reagent", back_populates="type",
secondary=reagenttypes_reagents) #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
reagenttype_kit_associations = relationship(
@@ -242,7 +244,9 @@ class ReagentType(BaseClass):
) #: Relation to KitTypeReagentTypeAssociation
# creator function: https://stackoverflow.com/questions/11091491/keyerror-when-adding-objects-to-sqlalchemy-association-object/11116291#11116291
kit_types = association_proxy("reagenttype_kit_associations", "kit_type", creator=lambda kit: KitTypeReagentTypeAssociation(kit_type=kit)) #: Association proxy to KitTypeReagentTypeAssociation
kit_types = association_proxy("reagenttype_kit_associations", "kit_type",
creator=lambda kit: KitTypeReagentTypeAssociation(
kit_type=kit)) #: Association proxy to KitTypeReagentTypeAssociation
def __repr__(self) -> str:
"""
@@ -324,14 +328,17 @@ class ReagentType(BaseClass):
def save(self):
super().save()
class Reagent(BaseClass):
"""
Concrete reagent instance
"""
id = Column(INTEGER, primary_key=True) #: primary key
type = relationship("ReagentType", back_populates="instances", secondary=reagenttypes_reagents) #: joined parent reagent type
type_id = Column(INTEGER, ForeignKey("_reagenttype.id", ondelete='SET NULL', name="fk_reagent_type_id")) #: id of parent reagent type
type = relationship("ReagentType", back_populates="instances",
secondary=reagenttypes_reagents) #: joined parent reagent type
type_id = Column(INTEGER, ForeignKey("_reagenttype.id", ondelete='SET NULL',
name="fk_reagent_type_id")) #: id of parent reagent type
name = Column(String(64)) #: reagent name
lot = Column(String(64)) #: lot number of reagent
expiry = Column(TIMESTAMP) #: expiry date - extended by eol_ext of parent programmatically
@@ -342,7 +349,8 @@ class Reagent(BaseClass):
cascade="all, delete-orphan",
) #: Relation to SubmissionSampleAssociation
submissions = association_proxy("reagent_submission_associations", "submission") #: Association proxy to SubmissionSampleAssociation.samples
submissions = association_proxy("reagent_submission_associations",
"submission") #: Association proxy to SubmissionSampleAssociation.samples
def __repr__(self):
if self.name != None:
@@ -471,6 +479,7 @@ class Reagent(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
class Discount(BaseClass):
"""
Relationship table for client labs for certain kits.
@@ -480,7 +489,8 @@ class Discount(BaseClass):
kit = relationship("KitType") #: joined parent reagent type
kit_id = Column(INTEGER, ForeignKey("_kittype.id", ondelete='SET NULL', name="fk_kit_type_id")) #: id of joined kit
client = relationship("Organization") #: joined client lab
client_id = Column(INTEGER, ForeignKey("_organization.id", ondelete='SET NULL', name="fk_org_id")) #: id of joined client
client_id = Column(INTEGER,
ForeignKey("_organization.id", ondelete='SET NULL', name="fk_org_id")) #: id of joined client
name = Column(String(128)) #: Short description
amount = Column(FLOAT(2)) #: Dollar amount of discount
@@ -544,6 +554,7 @@ class Discount(BaseClass):
def save(self):
super().save()
class SubmissionType(BaseClass):
"""
Abstract of types of submissions.
@@ -555,7 +566,8 @@ class SubmissionType(BaseClass):
defaults = Column(JSON) #: Basic information about this submission type
instances = relationship("BasicSubmission", backref="submission_type") #: Concrete instances of this type.
template_file = Column(BLOB) #: Blank form for this type stored as binary.
processes = relationship("Process", back_populates="submission_types", secondary=submissiontypes_processes) #: Relation to equipment processes used for this type.
processes = relationship("Process", back_populates="submission_types",
secondary=submissiontypes_processes) #: Relation to equipment processes used for this type.
sample_map = Column(JSON) #: Where sample information is found in the excel sheet corresponding to this type.
submissiontype_kit_associations = relationship(
@@ -572,7 +584,8 @@ class SubmissionType(BaseClass):
cascade="all, delete-orphan"
) #: Association of equipmentroles
equipment = association_proxy("submissiontype_equipmentrole_associations", "equipment_role") #: Proxy of equipmentrole associations
equipment = association_proxy("submissiontype_equipmentrole_associations",
"equipment_role") #: Proxy of equipmentrole associations
submissiontype_kit_rt_associations = relationship(
"KitTypeReagentTypeAssociation",
@@ -681,10 +694,12 @@ class SubmissionType(BaseClass):
match equipment_role:
case str():
# logger.debug(f"Getting processes for equipmentrole str {equipment_role}")
relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if item.equipment_role.name==equipment_role]
relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if
item.equipment_role.name == equipment_role]
case EquipmentRole():
# logger.debug(f"Getting processes for equipmentrole EquipmentRole {equipment_role}")
relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if item.equipment_role==equipment_role]
relevant = [item.get_all_processes(kit) for item in self.submissiontype_equipmentrole_associations if
item.equipment_role == equipment_role]
case _:
raise TypeError(f"Type {type(equipment_role)} is not allowed")
return list(set([item for items in relevant for item in items if item != None]))
@@ -734,21 +749,26 @@ class SubmissionType(BaseClass):
"""
super().save()
class SubmissionTypeKitTypeAssociation(BaseClass):
"""
Abstract of relationship between kits and their submission type.
"""
submission_types_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) #: id of joined submission type
submission_types_id = Column(INTEGER, ForeignKey("_submissiontype.id"),
primary_key=True) #: id of joined submission type
kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of joined kit
mutable_cost_column = Column(FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc)
mutable_cost_sample = Column(FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc)
mutable_cost_column = Column(
FLOAT(2)) #: dollar amount per 96 well plate that can change with number of columns (reagents, tips, etc)
mutable_cost_sample = Column(
FLOAT(2)) #: dollar amount that can change with number of samples (reagents, tips, etc)
constant_cost = Column(FLOAT(2)) #: dollar amount per plate that will remain constant (plates, man hours, etc)
kit_type = relationship(KitType, back_populates="kit_submissiontype_associations") #: joined kittype
# reference to the "SubmissionType" object
submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_associations") #: joined submission type
submission_type = relationship(SubmissionType,
back_populates="submissiontype_kit_associations") #: joined submission type
def __init__(self, kit_type=None, submission_type=None):
self.kit_type = kit_type
@@ -806,25 +826,30 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
limit = query.count()
return cls.execute_query(query=query, limit=limit)
class KitTypeReagentTypeAssociation(BaseClass):
"""
table containing reagenttype/kittype associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
reagent_types_id = Column(INTEGER, ForeignKey("_reagenttype.id"), primary_key=True) #: id of associated reagent type
reagent_types_id = Column(INTEGER, ForeignKey("_reagenttype.id"),
primary_key=True) #: id of associated reagent type
kits_id = Column(INTEGER, ForeignKey("_kittype.id"), primary_key=True) #: id of associated reagent type
submission_type_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True)
uses = Column(JSON) #: map to location on excel sheets of different submission types
required = Column(INTEGER) #: whether the reagent type is required for the kit (Boolean 1 or 0)
last_used = Column(String(32)) #: last used lot number of this type of reagent
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations") #: relationship to associated KitType
kit_type = relationship(KitType,
back_populates="kit_reagenttype_associations") #: relationship to associated KitType
# reference to the "ReagentType" object
reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations") #: relationship to associated ReagentType
reagent_type = relationship(ReagentType,
back_populates="reagenttype_kit_associations") #: relationship to associated ReagentType
submission_type = relationship(SubmissionType, back_populates="submissiontype_kit_rt_associations") #: relationship to associated SubmissionType
submission_type = relationship(SubmissionType,
back_populates="submissiontype_kit_rt_associations") #: relationship to associated SubmissionType
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
self.kit_type = kit_type
@@ -914,6 +939,7 @@ class KitTypeReagentTypeAssociation(BaseClass):
limit = 1
return cls.execute_query(query=query, limit=limit)
class SubmissionReagentAssociation(BaseClass):
"""
table containing submission/reagent associations
@@ -924,7 +950,8 @@ class SubmissionReagentAssociation(BaseClass):
submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission
comments = Column(String(1024)) #: Comments about reagents
submission = relationship("BasicSubmission", back_populates="submission_reagent_associations") #: associated submission
submission = relationship("BasicSubmission",
back_populates="submission_reagent_associations") #: associated submission
reagent = relationship(Reagent, back_populates="reagent_submission_associations") #: associated reagent
@@ -999,6 +1026,7 @@ class SubmissionReagentAssociation(BaseClass):
output['comments'] = self.comments
return output
class Equipment(BaseClass):
"""
A concrete instance of equipment
@@ -1008,8 +1036,10 @@ class Equipment(BaseClass):
name = Column(String(64)) #: equipment name
nickname = Column(String(64)) #: equipment nickname
asset_number = Column(String(16)) #: Given asset number (corpo nickname if you will)
roles = relationship("EquipmentRole", back_populates="instances", secondary=equipmentroles_equipment) #: relation to EquipmentRoles
processes = relationship("Process", back_populates="equipment", secondary=equipment_processes) #: relation to Processes
roles = relationship("EquipmentRole", back_populates="instances",
secondary=equipmentroles_equipment) #: relation to EquipmentRoles
processes = relationship("Process", back_populates="equipment",
secondary=equipment_processes) #: relation to Processes
equipment_submission_associations = relationship(
"SubmissionEquipmentAssociation",
@@ -1017,7 +1047,8 @@ class Equipment(BaseClass):
cascade="all, delete-orphan",
) #: Association with BasicSubmission
submissions = association_proxy("equipment_submission_associations", "submission") #: proxy to equipment_submission_associations.submission
submissions = association_proxy("equipment_submission_associations",
"submission") #: proxy to equipment_submission_associations.submission
def __repr__(self) -> str:
"""
@@ -1056,7 +1087,8 @@ class Equipment(BaseClass):
match extraction_kit:
case str():
# logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}")
processes = [process for process in processes if extraction_kit in [kit.name for kit in process.kit_types]]
processes = [process for process in processes if
extraction_kit in [kit.name for kit in process.kit_types]]
case KitType():
# logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}")
processes = [process for process in processes if extraction_kit in process.kit_types]
@@ -1112,7 +1144,8 @@ class Equipment(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
def to_pydantic(self, submission_type:SubmissionType, extraction_kit:str|KitType|None=None) -> "PydEquipment":
def to_pydantic(self, submission_type: SubmissionType,
extraction_kit: str | KitType | None = None) -> "PydEquipment":
"""
Creates PydEquipment of this Equipment
@@ -1124,7 +1157,9 @@ class Equipment(BaseClass):
PydEquipment: _description_
"""
from backend.validators.pydant import PydEquipment
return PydEquipment(processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=None, **self.to_dict(processes=False))
return PydEquipment(
processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=None,
**self.to_dict(processes=False))
@classmethod
def get_regex(cls) -> re.Pattern:
@@ -1142,6 +1177,7 @@ class Equipment(BaseClass):
(?P<Labcon>\d{4}-\d{3}-\d{3}-\d$)""",
re.VERBOSE)
class EquipmentRole(BaseClass):
"""
Abstract roles for equipment
@@ -1150,8 +1186,10 @@ class EquipmentRole(BaseClass):
id = Column(INTEGER, primary_key=True) #: Role id, primary key
name = Column(String(32)) #: Common name
instances = relationship("Equipment", back_populates="roles", secondary=equipmentroles_equipment) #: Concrete instances (Equipment) of role
processes = relationship("Process", back_populates='equipment_roles', secondary=equipmentroles_processes) #: Associated Processes
instances = relationship("Equipment", back_populates="roles",
secondary=equipmentroles_equipment) #: Concrete instances (Equipment) of role
processes = relationship("Process", back_populates='equipment_roles',
secondary=equipmentroles_processes) #: Associated Processes
equipmentrole_submissiontype_associations = relationship(
"SubmissionTypeEquipmentRoleAssociation",
@@ -1159,7 +1197,8 @@ class EquipmentRole(BaseClass):
cascade="all, delete-orphan",
) #: relation to SubmissionTypes
submission_types = association_proxy("equipmentrole_submissiontype_associations", "submission_type") #: proxy to equipmentrole_submissiontype_associations.submission_type
submission_types = association_proxy("equipmentrole_submissiontype_associations",
"submission_type") #: proxy to equipmentrole_submissiontype_associations.submission_type
def __repr__(self) -> str:
"""
@@ -1185,7 +1224,8 @@ class EquipmentRole(BaseClass):
output[key] = value
return output
def to_pydantic(self, submission_type:SubmissionType, extraction_kit:str|KitType|None=None) -> "PydEquipmentRole":
def to_pydantic(self, submission_type: SubmissionType,
extraction_kit: str | KitType | None = None) -> "PydEquipmentRole":
"""
Creates a PydEquipmentRole of this EquipmentRole
@@ -1198,7 +1238,8 @@ class EquipmentRole(BaseClass):
"""
from backend.validators.pydant import PydEquipmentRole
# logger.debug("Creating list of PydEquipment in this role")
equipment = [item.to_pydantic(submission_type=submission_type, extraction_kit=extraction_kit) for item in self.instances]
equipment = [item.to_pydantic(submission_type=submission_type, extraction_kit=extraction_kit) for item in
self.instances]
pyd_dict = self.to_dict()
# logger.debug("Creating list of Processes in this role")
pyd_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit)
@@ -1206,7 +1247,8 @@ class EquipmentRole(BaseClass):
@classmethod
@setup_lookup
def query(cls, name:str|None=None, id:int|None=None, limit:int=0) -> EquipmentRole|List[EquipmentRole]:
def query(cls, name: str | None = None, id: int | None = None, limit: int = 0) -> EquipmentRole | List[
EquipmentRole]:
"""
Lookup Equipment roles.
@@ -1235,7 +1277,8 @@ class EquipmentRole(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
def get_processes(self, submission_type:str|SubmissionType|None, extraction_kit:str|KitType|None=None) -> List[Process]:
def get_processes(self, submission_type: str | SubmissionType | None,
extraction_kit: str | KitType | None = None) -> List[Process]:
"""
Get processes used by this EquipmentRole
@@ -1269,6 +1312,7 @@ class EquipmentRole(BaseClass):
else:
return output
class SubmissionEquipmentAssociation(BaseClass):
"""
Abstract association between BasicSubmission and Equipment
@@ -1277,12 +1321,14 @@ class SubmissionEquipmentAssociation(BaseClass):
equipment_id = Column(INTEGER, ForeignKey("_equipment.id"), primary_key=True) #: id of associated equipment
submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission
role = Column(String(64), primary_key=True) #: name of the role the equipment fills
process_id = Column(INTEGER, ForeignKey("_process.id",ondelete="SET NULL", name="SEA_Process_id")) #: Foreign key of process id
process_id = Column(INTEGER, ForeignKey("_process.id", ondelete="SET NULL",
name="SEA_Process_id")) #: Foreign key of process id
start_time = Column(TIMESTAMP) #: start time of equipment use
end_time = Column(TIMESTAMP) #: end time of equipment use
comments = Column(String(1024)) #: comments about equipment
submission = relationship("BasicSubmission", back_populates="submission_equipment_associations") #: associated submission
submission = relationship("BasicSubmission",
back_populates="submission_equipment_associations") #: associated submission
equipment = relationship(Equipment, back_populates="equipment_submission_associations") #: associated equipment
@@ -1305,21 +1351,27 @@ class SubmissionEquipmentAssociation(BaseClass):
process = self.process.name
except AttributeError:
process = "No process found"
output = dict(name=self.equipment.name, asset_number=self.equipment.asset_number, comment=self.comments, processes=[process], role=self.role, nickname=self.equipment.nickname)
output = dict(name=self.equipment.name, asset_number=self.equipment.asset_number, comment=self.comments,
processes=[process], role=self.role, nickname=self.equipment.nickname)
return output
class SubmissionTypeEquipmentRoleAssociation(BaseClass):
"""
Abstract association between SubmissionType and EquipmentRole
"""
equipmentrole_id = Column(INTEGER, ForeignKey("_equipmentrole.id"), primary_key=True) #: id of associated equipment
submissiontype_id = Column(INTEGER, ForeignKey("_submissiontype.id"), primary_key=True) #: id of associated submission
submissiontype_id = Column(INTEGER, ForeignKey("_submissiontype.id"),
primary_key=True) #: id of associated submission
uses = Column(JSON) #: locations of equipment on the submission type excel sheet.
static = Column(INTEGER, default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list?
static = Column(INTEGER,
default=1) #: if 1 this piece of equipment will always be used, otherwise it will need to be selected from list?
submission_type = relationship(SubmissionType, back_populates="submissiontype_equipmentrole_associations") #: associated submission
submission_type = relationship(SubmissionType,
back_populates="submissiontype_equipmentrole_associations") #: associated submission
equipment_role = relationship(EquipmentRole, back_populates="equipmentrole_submissiontype_associations") #: associated equipment
equipment_role = relationship(EquipmentRole,
back_populates="equipmentrole_submissiontype_associations") #: associated equipment
@validates('static')
def validate_age(self, key, value):
@@ -1368,6 +1420,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass):
def save(self):
super().save()
class Process(BaseClass):
"""
A Process is a method used by a piece of equipment.
@@ -1375,11 +1428,16 @@ class Process(BaseClass):
id = Column(INTEGER, primary_key=True) #: Process id, primary key
name = Column(String(64)) #: Process name
submission_types = relationship("SubmissionType", back_populates='processes', secondary=submissiontypes_processes) #: relation to SubmissionType
equipment = relationship("Equipment", back_populates='processes', secondary=equipment_processes) #: relation to Equipment
equipment_roles = relationship("EquipmentRole", back_populates='processes', secondary=equipmentroles_processes) #: relation to EquipmentRoles
submissions = relationship("SubmissionEquipmentAssociation", backref='process') #: relation to SubmissionEquipmentAssociation
kit_types = relationship("KitType", back_populates='processes', secondary=kittypes_processes) #: relation to KitType
submission_types = relationship("SubmissionType", back_populates='processes',
secondary=submissiontypes_processes) #: relation to SubmissionType
equipment = relationship("Equipment", back_populates='processes',
secondary=equipment_processes) #: relation to Equipment
equipment_roles = relationship("EquipmentRole", back_populates='processes',
secondary=equipmentroles_processes) #: relation to EquipmentRoles
submissions = relationship("SubmissionEquipmentAssociation",
backref='process') #: relation to SubmissionEquipmentAssociation
kit_types = relationship("KitType", back_populates='processes',
secondary=kittypes_processes) #: relation to KitType
def __repr__(self) -> str:
"""
@@ -1410,4 +1468,3 @@ class Process(BaseClass):
case _:
pass
return cls.execute_query(query=query, limit=limit)

View File

@@ -2,6 +2,8 @@
Contains miscellaenous functions used by both frontend and backend.
'''
from __future__ import annotations
import json
from pathlib import Path
import numpy as np
import logging, re, yaml, sys, os, stat, platform, getpass, inspect, csv
@@ -10,7 +12,7 @@ from jinja2 import Environment, FileSystemLoader
from logging import handlers
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
from pydantic import field_validator, BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Any, Tuple, Literal, List
@@ -137,7 +139,7 @@ def get_first_blank_df_row(df:pd.DataFrame) -> int:
# Settings
class Settings(BaseSettings):
class Settings(BaseSettings, extra="allow"):
"""
Pydantic model to hold settings
@@ -147,21 +149,26 @@ class Settings(BaseSettings):
"""
directory_path: Path
database_path: Path|str|None = None
backup_path: Path
super_users: list|None = None
power_users: list|None = None
rerun_regex: str
backup_path: Path|str|None = None
# super_users: list|None = None
# power_users: list|None = None
# rerun_regex: str
submission_types: dict|None = None
database_session: Session|None = None
package: Any|None = None
model_config = SettingsConfigDict(env_file_encoding='utf-8')
@field_validator('backup_path')
@field_validator('backup_path', mode="before")
@classmethod
def set_backup_path(cls, value):
if isinstance(value, str):
def set_backup_path(cls, value, values):
match value:
case str():
value = Path(value)
case None:
value = values.data['directory_path'].joinpath("Database backups")
if not value.exists():
value.mkdir(parents=True)
# metadata.backup_path = value
return value
@@ -177,11 +184,14 @@ class Settings(BaseSettings):
@field_validator('database_path', mode="before")
@classmethod
def ensure_database_exists(cls, value):
def ensure_database_exists(cls, value, values):
if value == ":memory:":
return value
if isinstance(value, str):
match value:
case str():
value = Path(value)
case None:
value = values.data['directory_path'].joinpath("submissions.db")
if value.exists():
return value
else:
@@ -225,6 +235,20 @@ class Settings(BaseSettings):
if value == None:
return package
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# self.set_from_db(db_path=kwargs['database_path'])
def set_from_db(self, db_path:Path):
session = Session(create_engine(f"sqlite:///{db_path}"))
config_items = session.execute(text("SELECT * FROM _configitem")).all()
session.close()
config_items = {item[1]:json.loads(item[2]) for item in config_items}
for k, v in config_items.items():
if not hasattr(self, k):
self.__setattr__(k, v)
def get_config(settings_path: Path|str|None=None) -> Settings:
"""
Get configuration settings from path or default if blank.