Moments before disaster.

This commit is contained in:
lwark
2025-01-16 08:36:15 -06:00
parent 5cded949ed
commit bf711369c6
21 changed files with 541 additions and 368 deletions

View File

@@ -9,7 +9,7 @@ from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date, datetime, timedelta
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator, timezone
from typing import List, Literal, Generator, Any
from typing import List, Literal, Generator, Any, Tuple
from pandas import ExcelFile
from pathlib import Path
from . import Base, BaseClass, Organization, LogMixin
@@ -157,30 +157,62 @@ class KitType(BaseClass):
else:
return (item.reagent_role for item in relevant_associations)
def construct_xl_map_for_use(self, submission_type: str | SubmissionType) -> Generator[(str, str), None, None]:
def construct_xl_map_for_use(self, submission_type: str | SubmissionType) -> Tuple[dict|None, KitType]:
"""
Creates map of locations in Excel workbook for a SubmissionType
Args:
new_kit ():
submission_type (str | SubmissionType): Submissiontype.name
Returns:
Generator[(str, str), None, None]: Tuple containing information locations.
"""
new_kit = self
# NOTE: Account for submission_type variable type.
match submission_type:
case str():
assocs = [item for item in self.kit_reagentrole_associations if
item.submission_type.name == submission_type]
# assocs = [item for item in self.kit_reagentrole_associations if
# item.submission_type.name == submission_type]
logger.debug(f"Query for {submission_type}")
submission_type = SubmissionType.query(name=submission_type)
case SubmissionType():
assocs = [item for item in self.kit_reagentrole_associations if item.submission_type == submission_type]
pass
case _:
raise ValueError(f"Wrong variable type: {type(submission_type)} used!")
for assoc in assocs:
try:
yield assoc.reagent_role.name, assoc.uses
except TypeError:
continue
logger.debug(f"Submission type: {submission_type}, Kit: {self}")
assocs = [item for item in self.kit_reagentrole_associations if item.submission_type == submission_type]
logger.debug(f"Associations: {assocs}")
# NOTE: rescue with submission type's default kit.
if not assocs:
logger.error(
f"No associations found with {self}. Attempting rescue with default kit: {submission_type.default_kit}")
new_kit = submission_type.default_kit
if not new_kit:
from frontend.widgets.pop_ups import ObjectSelector
dlg = ObjectSelector(
title="Select Kit",
message="Could not find reagents for this submission type/kit type combo.\nSelect new kit.",
obj_type=self.__class__,
values=[kit.name for kit in submission_type.kit_types]
)
if dlg.exec():
dlg_result = dlg.parse_form()
logger.debug(f"Dialog result: {dlg_result}")
new_kit = self.__class__.query(name=dlg_result)
logger.debug(f"Query result: {new_kit}")
# return new_kit.construct_xl_map_for_use(submission_type=submission_type)
else:
return None, new_kit
assocs = [item for item in new_kit.kit_reagentrole_associations if item.submission_type == submission_type]
# for assoc in assocs:
# try:
# yield assoc.reagent_role.name, assoc.uses
# except TypeError:
# continue
output = {assoc.reagent_role.name: assoc.uses for assoc in assocs}
logger.debug(f"Output: {output}")
return output, new_kit
@classmethod
@setup_lookup
@@ -444,7 +476,7 @@ class Reagent(BaseClass, LogMixin):
Concrete reagent instance
"""
searchables = ["lot"]
searchables = [dict(label="Lot", field="lot")]
id = Column(INTEGER, primary_key=True) #: primary key
role = relationship("ReagentRole", back_populates="instances",
@@ -548,7 +580,9 @@ class Reagent(BaseClass, LogMixin):
def query_or_create(cls, **kwargs) -> Reagent:
from backend.validators.pydant import PydReagent
new = False
instance = cls.query(**kwargs)
disallowed = ['expiry']
sanitized_kwargs = {k:v for k,v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
if "role" not in kwargs:
try:
@@ -557,7 +591,7 @@ class Reagent(BaseClass, LogMixin):
pass
instance = PydReagent(**kwargs)
new = True
instance, _ = instance.toSQL()
instance = instance.to_sql()
logger.info(f"Instance from query or create: {instance}")
return instance, new
@@ -644,38 +678,15 @@ class Reagent(BaseClass, LogMixin):
except AttributeError as e:
logger.error(f"Could not set {key} due to {e}")
@check_authorization
def edit_from_search(self, obj, **kwargs):
from frontend.widgets.omni_add_edit import AddEdit
role = ReagentRole.query(kwargs['role'])
if role:
role_name = role.name
else:
role_name = None
# dlg = AddReagentForm(reagent_lot=self.lot, reagent_role=role_name, expiry=self.expiry, reagent_name=self.name)
dlg = AddEdit(parent=None, instance=self)
if dlg.exec():
pyd = dlg.parse_form()
for field in pyd.model_fields:
self.set_attribute(field, pyd.__getattribute__(field))
# for key, value in vars.items():
# match key:
# case "expiry":
# if isinstance(value, str):
# field_value = datetime.strptime(value, "%Y-%m-%d")
# elif isinstance(value, date):
# field_value = datetime.combine(value, datetime.max.time())
# else:
# field_value = value
# field_value.replace(tzinfo=timezone)
# case "role":
# continue
# case _:
# field_value = value
# self.__setattr__(key, field_value)
self.save()
# print(self.__dict__)
@classproperty
def add_edit_tooltips(self):
@@ -801,8 +812,8 @@ class SubmissionType(BaseClass):
"""
return f"<SubmissionType({self.name})>"
@classmethod
def retrieve_template_file(cls) -> bytes:
@classproperty
def basic_template(cls) -> bytes:
"""
Grabs the default excel template file.
@@ -812,7 +823,8 @@ class SubmissionType(BaseClass):
submission_type = cls.query(name="Bacterial Culture")
return submission_type.template_file
def get_template_file_sheets(self) -> List[str]:
@property
def template_file_sheets(self) -> List[str]:
"""
Gets names of sheet in the stored blank form.
@@ -870,15 +882,6 @@ class SubmissionType(BaseClass):
output['custom'] = self.info_map['custom']
return output
def construct_sample_map(self) -> dict:
"""
Returns sample map
Returns:
dict: sample location map
"""
return self.sample_map
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
"""
Make a map of all locations for tips or equipment.
@@ -895,7 +898,8 @@ class SubmissionType(BaseClass):
fmap = {}
yield getattr(item, f"{field}_role").name, fmap
def get_default_kit(self) -> KitType | None:
@property
def default_kit(self) -> KitType | None:
"""
If only one kits exists for this Submission Type, return it.
@@ -941,7 +945,8 @@ class SubmissionType(BaseClass):
raise TypeError(f"Type {type(equipment_role)} is not allowed")
return list(set([item for items in relevant for item in items if item is not None]))
def get_submission_class(self) -> "BasicSubmission":
@property
def submission_class(self) -> "BasicSubmission":
"""
Gets submission class associated with this submission type.
@@ -993,7 +998,8 @@ class SubmissionType(BaseClass):
base_dict = dict(name=self.name)
base_dict['info'] = self.construct_info_map(mode='export')
base_dict['defaults'] = self.defaults
base_dict['samples'] = self.construct_sample_map()
# base_dict['samples'] = self.construct_sample_map()
base_dict['samples'] = self.sample_map
base_dict['kits'] = [item.to_export_dict() for item in self.submissiontype_kit_associations]
return base_dict
@@ -1413,7 +1419,8 @@ class Equipment(BaseClass, LogMixin):
return {k: v for k, v in self.__dict__.items()}
def get_processes(self, submission_type: str | SubmissionType | None = None,
extraction_kit: str | KitType | None = None) -> List[str]:
extraction_kit: str | KitType | None = None,
equipment_role: str | EquipmentRole | None=None) -> List[str]:
"""
Get all processes associated with this Equipment for a given SubmissionType
@@ -1433,6 +1440,8 @@ class Equipment(BaseClass, LogMixin):
continue
if extraction_kit and extraction_kit not in process.kit_types:
continue
if equipment_role and equipment_role not in process.equipment_roles:
continue
yield process
@classmethod
@@ -1489,12 +1498,12 @@ class Equipment(BaseClass, LogMixin):
PydEquipment: pydantic equipment object
"""
from backend.validators.pydant import PydEquipment
processes = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit)
processes = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit, equipment_role=role)
return PydEquipment(processes=processes, role=role,
**self.to_dict(processes=False))
@classmethod
def get_regex(cls) -> re.Pattern:
@classproperty
def manufacturer_regex(cls) -> re.Pattern:
"""
Creates regex to determine tip manufacturer
@@ -1809,6 +1818,9 @@ class Process(BaseClass):
def query(cls,
name: str | None = None,
id: int | None = None,
submission_type: str | SubmissionType | None = None,
extraction_kit : str | KitType | None = None,
equipment_role: str | KitType | None = None,
limit: int = 0) -> Process | List[Process]:
"""
Lookup Processes
@@ -1822,6 +1834,30 @@ class Process(BaseClass):
Process|List[Process]: Process(es) matching criteria
"""
query = cls.__database_session__.query(cls)
match submission_type:
case str():
submission_type = SubmissionType.query(name=submission_type)
query = query.filter(cls.submission_types.contains(submission_type))
case SubmissionType():
query = query.filter(cls.submission_types.contains(submission_type))
case _:
pass
match extraction_kit:
case str():
extraction_kit = KitType.query(name=extraction_kit)
query = query.filter(cls.kit_types.contains(extraction_kit))
case KitType():
query = query.filter(cls.kit_types.contains(extraction_kit))
case _:
pass
match equipment_role:
case str():
equipment_role = EquipmentRole.query(name=equipment_role)
query = query.filter(cls.equipment_roles.contains(equipment_role))
case EquipmentRole():
query = query.filter(cls.equipment_roles.contains(equipment_role))
case _:
pass
match name:
case str():
query = query.filter(cls.name == name)
@@ -1975,6 +2011,14 @@ class SubmissionTipsAssociation(BaseClass):
query = query.filter(cls.role_name == role)
return cls.execute_query(query=query, limit=limit, **kwargs)
@classmethod
def query_or_create(cls, tips, submission, role: str, **kwargs):
instance = cls.query(tip_id=tips.id, role=role, submission_id=submission.id, limit=1, **kwargs)
if instance is None:
instance = SubmissionTipsAssociation(submission=submission, tips=tips, role_name=role)
return instance
def to_pydantic(self):
from backend.validators import PydTips
return PydTips(name=self.tips.name, lot=self.tips.lot, role=self.role_name)