Split Concentration controls on the chart so they are individually selectable.

This commit is contained in:
lwark
2025-04-11 12:54:27 -05:00
parent 96f178c09f
commit ae6717bc77
19 changed files with 380 additions and 457 deletions

View File

@@ -2,8 +2,7 @@
All kit and reagent related models
"""
from __future__ import annotations
import json, zipfile, yaml, logging, re
import sys
import json, zipfile, yaml, logging, re, sys
from pprint import pformat
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
@@ -11,7 +10,7 @@ from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from datetime import date, datetime, timedelta
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator, timezone
from typing import List, Literal, Generator, Any, Tuple, Dict, AnyStr
from typing import List, Literal, Generator, Any, Tuple
from pandas import ExcelFile
from pathlib import Path
from . import Base, BaseClass, Organization, LogMixin
@@ -136,18 +135,18 @@ class KitType(BaseClass):
return self.used_for
def get_reagents(self,
required: bool = False,
required_only: bool = False,
submission_type: str | SubmissionType | None = None
) -> Generator[ReagentRole, None, None]:
"""
Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation.
Args:
required (bool, optional): If true only return required types. Defaults to False.
required_only (bool, optional): If true only return required types. Defaults to False.
submission_type (str | Submissiontype | None, optional): Submission type to narrow results. Defaults to None.
Returns:
Generator[ReagentRole, None, None]: List of reagents linked to this kit.
Generator[ReagentRole, None, None]: List of reagent roles linked to this kit.
"""
match submission_type:
case SubmissionType():
@@ -158,7 +157,7 @@ class KitType(BaseClass):
item.submission_type.name == submission_type]
case _:
relevant_associations = [item for item in self.kit_reagentrole_associations]
if required:
if required_only:
return (item.reagent_role for item in relevant_associations if item.required == 1)
else:
return (item.reagent_role for item in relevant_associations)
@@ -168,7 +167,6 @@ class KitType(BaseClass):
Creates map of locations in Excel workbook for a SubmissionType
Args:
new_kit ():
submission_type (str | SubmissionType): Submissiontype.name
Returns:
@@ -240,7 +238,7 @@ class KitType(BaseClass):
Args:
name (str, optional): Name of desired kit (returns single instance). Defaults to None.
used_for (str | Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
submissiontype (str | Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
id (int | None, optional): Kit id in the database. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
@@ -276,108 +274,108 @@ class KitType(BaseClass):
def save(self):
super().save()
def to_export_dict(self, submission_type: SubmissionType) -> dict:
"""
Creates dictionary for exporting to yml used in new SubmissionType Construction
# def to_export_dict(self, submission_type: SubmissionType) -> dict:
# """
# Creates dictionary for exporting to yml used in new SubmissionType Construction
#
# Args:
# submission_type (SubmissionType): SubmissionType of interest.
#
# Returns:
# dict: Dictionary containing relevant info for SubmissionType construction
# """
# base_dict = dict(name=self.name, reagent_roles=[], equipment_roles=[])
# for key, value in self.construct_xl_map_for_use(submission_type=submission_type):
# try:
# assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == key)
# except StopIteration as e:
# continue
# for kk, vv in assoc.to_export_dict().items():
# value[kk] = vv
# base_dict['reagent_roles'].append(value)
# for key, value in submission_type.construct_field_map("equipment"):
# try:
# assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
# item.equipment_role.name == key)
# except StopIteration:
# continue
# for kk, vv in assoc.to_export_dict(extraction_kit=self).items():
# value[kk] = vv
# base_dict['equipment_roles'].append(value)
# return base_dict
Args:
submission_type (SubmissionType): SubmissionType of interest.
Returns:
dict: Dictionary containing relevant info for SubmissionType construction
"""
base_dict = dict(name=self.name, reagent_roles=[], equipment_roles=[])
for key, value in self.construct_xl_map_for_use(submission_type=submission_type):
try:
assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == key)
except StopIteration as e:
continue
for kk, vv in assoc.to_export_dict().items():
value[kk] = vv
base_dict['reagent_roles'].append(value)
for key, value in submission_type.construct_field_map("equipment"):
try:
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == key)
except StopIteration:
continue
for kk, vv in assoc.to_export_dict(extraction_kit=self).items():
value[kk] = vv
base_dict['equipment_roles'].append(value)
return base_dict
@classmethod
def import_from_yml(cls, submission_type: str | SubmissionType, filepath: Path | str | None = None,
import_dict: dict | None = None) -> KitType:
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if filepath:
yaml.add_constructor("!regex", yaml_regex_creator)
if isinstance(filepath, str):
filepath = Path(filepath)
if not filepath.exists():
logging.critical(f"Given file could not be found.")
return None
with open(filepath, "r") as f:
if filepath.suffix == ".json":
import_dict = json.load(fp=f)
elif filepath.suffix == ".yml":
import_dict = yaml.load(stream=f, Loader=yaml.Loader)
else:
raise Exception(f"Filetype {filepath.suffix} not supported.")
new_kit = KitType.query(name=import_dict['kit_type']['name'])
if not new_kit:
new_kit = KitType(name=import_dict['kit_type']['name'])
for role in import_dict['kit_type']['reagent_roles']:
new_role = ReagentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
eol = timedelta(role['extension_of_life'])
new_role = ReagentRole(name=role['role'], eol_ext=eol)
uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet'])
ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses)
ktrr_assoc.submission_type = submission_type
ktrr_assoc.required = role['required']
ktst_assoc = SubmissionTypeKitTypeAssociation(
kit_type=new_kit,
submission_type=submission_type,
mutable_cost_sample=import_dict['mutable_cost_sample'],
mutable_cost_column=import_dict['mutable_cost_column'],
constant_cost=import_dict['constant_cost']
)
for role in import_dict['kit_type']['equipment_roles']:
new_role = EquipmentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
new_role = EquipmentRole(name=role['role'])
for equipment in Equipment.assign_equipment(equipment_role=new_role):
new_role.instances.append(equipment)
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
equipment_role=new_role)
try:
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'],
static=role['static'])
except KeyError:
uses = None
ster_assoc.uses = uses
for process in role['processes']:
new_process = Process.query(name=process)
if not new_process:
new_process = Process(name=process)
new_process.submission_types.append(submission_type)
new_process.kit_types.append(new_kit)
new_process.equipment_roles.append(new_role)
return new_kit
# @classmethod
# def import_from_yml(cls, submission_type: str | SubmissionType, filepath: Path | str | None = None,
# import_dict: dict | None = None) -> KitType:
# if isinstance(submission_type, str):
# submission_type = SubmissionType.query(name=submission_type)
# if filepath:
# yaml.add_constructor("!regex", yaml_regex_creator)
# if isinstance(filepath, str):
# filepath = Path(filepath)
# if not filepath.exists():
# logging.critical(f"Given file could not be found.")
# return None
# with open(filepath, "r") as f:
# if filepath.suffix == ".json":
# import_dict = json.load(fp=f)
# elif filepath.suffix == ".yml":
# import_dict = yaml.load(stream=f, Loader=yaml.Loader)
# else:
# raise Exception(f"Filetype {filepath.suffix} not supported.")
# new_kit = KitType.query(name=import_dict['kit_type']['name'])
# if not new_kit:
# new_kit = KitType(name=import_dict['kit_type']['name'])
# for role in import_dict['kit_type']['reagent_roles']:
# new_role = ReagentRole.query(name=role['role'])
# if new_role:
# check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
# if check.lower() == "n":
# new_role = None
# else:
# pass
# if not new_role:
# eol = timedelta(role['extension_of_life'])
# new_role = ReagentRole(name=role['role'], eol_ext=eol)
# uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet'])
# ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses)
# ktrr_assoc.submission_type = submission_type
# ktrr_assoc.required = role['required']
# ktst_assoc = SubmissionTypeKitTypeAssociation(
# kit_type=new_kit,
# submission_type=submission_type,
# mutable_cost_sample=import_dict['mutable_cost_sample'],
# mutable_cost_column=import_dict['mutable_cost_column'],
# constant_cost=import_dict['constant_cost']
# )
# for role in import_dict['kit_type']['equipment_roles']:
# new_role = EquipmentRole.query(name=role['role'])
# if new_role:
# check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
# if check.lower() == "n":
# new_role = None
# else:
# pass
# if not new_role:
# new_role = EquipmentRole(name=role['role'])
# for equipment in Equipment.assign_equipment(equipment_role=new_role):
# new_role.instances.append(equipment)
# ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
# equipment_role=new_role)
# try:
# uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'],
# static=role['static'])
# except KeyError:
# uses = None
# ster_assoc.uses = uses
# for process in role['processes']:
# new_process = Process.query(name=process)
# if not new_process:
# new_process = Process(name=process)
# new_process.submission_types.append(submission_type)
# new_process.kit_types.append(new_kit)
# new_process.equipment_roles.append(new_role)
# return new_kit
def to_omni(self, expand: bool = False) -> "OmniKitType":
from backend.validators.omni_gui_objects import OmniKitType
@@ -395,7 +393,7 @@ class KitType(BaseClass):
kit_reagentrole_associations=kit_reagentrole_associations,
kit_submissiontype_associations=kit_submissiontype_associations
)
logger.debug(f"Creating omni for {pformat(data)}")
# logger.debug(f"Creating omni for {pformat(data)}")
return OmniKitType(instance_object=self, **data)
@@ -405,7 +403,6 @@ class ReagentRole(BaseClass):
"""
skip_on_edit = False
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of role reagent plays
instances = relationship("Reagent", back_populates="role",
@@ -453,7 +450,7 @@ class ReagentRole(BaseClass):
Args:
id (id | None, optional): Id of the object. Defaults to None.
name (str | None, optional): Reagent type name. Defaults to None.
kit_type (KitType | str | None, optional): Kit the type of interest belongs to. Defaults to None.
kittype (KitType | str | None, optional): Kit the type of interest belongs to. Defaults to None.
reagent (Reagent | str | None, optional): Concrete instance of the type of interest. Defaults to None.
limit (int, optional): maxmimum number of results to return (0 = all). Defaults to 0.
@@ -507,14 +504,14 @@ class ReagentRole(BaseClass):
from backend.validators.pydant import PydReagent
return PydReagent(lot=None, role=self.name, name=self.name, expiry=date.today())
def to_export_dict(self) -> dict:
"""
Creates dictionary for exporting to yml used in new SubmissionType Construction
Returns:
dict: Dictionary containing relevant info for SubmissionType construction
"""
return dict(role=self.name, extension_of_life=self.eol_ext.days)
# def to_export_dict(self) -> dict:
# """
# Creates dictionary for exporting to yml used in new SubmissionType Construction
#
# Returns:
# dict: Dictionary containing relevant info for SubmissionType construction
# """
# return dict(role=self.name, extension_of_life=self.eol_ext.days)
@check_authorization
def save(self):
@@ -1278,20 +1275,20 @@ class SubmissionType(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
def to_export_dict(self):
"""
Creates dictionary for exporting to yml used in new SubmissionType Construction
Returns:
dict: Dictionary containing relevant info for SubmissionType construction
"""
base_dict = dict(name=self.name)
base_dict['info'] = self.construct_info_map(mode='export')
base_dict['defaults'] = self.defaults
# base_dict['samples'] = self.construct_sample_map()
base_dict['samples'] = self.sample_map
base_dict['kits'] = [item.to_export_dict() for item in self.submissiontype_kit_associations]
return base_dict
# def to_export_dict(self):
# """
# Creates dictionary for exporting to yml used in new SubmissionType Construction
#
# Returns:
# dict: Dictionary containing relevant info for SubmissionType construction
# """
# base_dict = dict(name=self.name)
# base_dict['info'] = self.construct_info_map(mode='export')
# base_dict['defaults'] = self.defaults
# # base_dict['samples'] = self.construct_sample_map()
# base_dict['samples'] = self.sample_map
# base_dict['kits'] = [item.to_export_dict() for item in self.submissiontype_kit_associations]
# return base_dict
@check_authorization
def save(self):
@@ -1499,17 +1496,17 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
# limit = query.count()
return cls.execute_query(query=query, limit=limit)
def to_export_dict(self):
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related kittype
"""
exclude = ['_sa_instance_state', 'submission_types_id', 'kits_id', 'submission_type', 'kit_type']
base_dict = {k: v for k, v in self.__dict__.items() if k not in exclude}
base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type)
return base_dict
# def to_export_dict(self):
# """
# Creates a dictionary of relevant values in this object.
#
# Returns:
# dict: dictionary of Association and related kittype
# """
# exclude = ['_sa_instance_state', 'submission_types_id', 'kits_id', 'submission_type', 'kit_type']
# base_dict = {k: v for k, v in self.__dict__.items() if k not in exclude}
# base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type)
# return base_dict
def to_omni(self, expand: bool = False):
from backend.validators.omni_gui_objects import OmniSubmissionTypeKitTypeAssociation
@@ -1719,17 +1716,17 @@ class KitTypeReagentRoleAssociation(BaseClass):
limit = 1
return cls.execute_query(query=query, limit=limit)
def to_export_dict(self) -> dict:
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related reagent role
"""
base_dict = dict(required=self.required)
for k, v in self.reagent_role.to_export_dict().items():
base_dict[k] = v
return base_dict
# def to_export_dict(self) -> dict:
# """
# Creates a dictionary of relevant values in this object.
#
# Returns:
# dict: dictionary of Association and related reagent role
# """
# base_dict = dict(required=self.required)
# for k, v in self.reagent_role.to_export_dict().items():
# base_dict[k] = v
# return base_dict
def get_all_relevant_reagents(self) -> Generator[Reagent, None, None]:
"""
@@ -1915,13 +1912,6 @@ class Equipment(BaseClass, LogMixin):
submissions = association_proxy("equipment_submission_associations",
"submission") #: proxy to equipment_submission_associations.submission
# def __repr__(self) -> str:
# """
# Returns:
# str: representation of this Equipment
# """
# return f"<Equipment({self.name})>"
def to_dict(self, processes: bool = False) -> dict:
"""
This Equipment as a dictionary
@@ -2085,13 +2075,6 @@ class EquipmentRole(BaseClass):
submission_types = association_proxy("equipmentrole_submissiontype_associations",
"submission_type") #: proxy to equipmentrole_submissiontype_associations.submission_type
# def __repr__(self) -> str:
# """
# Returns:
# str: Representation of this EquipmentRole
# """
# return f"<EquipmentRole({self.name})>"
def to_dict(self) -> dict:
"""
This EquipmentRole as a dictionary
@@ -2192,16 +2175,6 @@ class EquipmentRole(BaseClass):
continue
yield process.name
def to_export_dict(self, submission_type: SubmissionType, kit_type: KitType):
"""
Creates a dictionary of relevant values in this object.
Returns:
dict: dictionary of Association and related reagent role
"""
processes = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)
return dict(role=self.name, processes=[item for item in processes])
def to_omni(self, expand: bool = False) -> "OmniEquipmentRole":
from backend.validators.omni_gui_objects import OmniEquipmentRole
return OmniEquipmentRole(instance_object=self, name=self.name)
@@ -2320,23 +2293,6 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass):
def save(self):
super().save()
def to_export_dict(self, extraction_kit: KitType | str) -> dict:
"""
Creates dictionary for exporting to yml used in new SubmissionType Construction
Args:
extraction_kit (KitType | str): KitType of interest.
Returns:
dict: Dictionary containing relevant info for SubmissionType construction
"""
if isinstance(extraction_kit, str):
extraction_kit = KitType.query(name=extraction_kit)
base_dict = {k: v for k, v in self.equipment_role.to_export_dict(submission_type=self.submission_type,
kit_type=extraction_kit).items()}
base_dict['static'] = self.static
return base_dict
class Process(BaseClass):
"""
@@ -2360,14 +2316,6 @@ class Process(BaseClass):
tip_roles = relationship("TipRole", back_populates='processes',
secondary=process_tiprole) #: relation to KitType
# def __repr__(self) -> str:
# """
# Returns:
# str: Representation of this Process
# """
# return f"<Process({self.name})>"
def set_attribute(self, key, value):
match key:
case "name":
@@ -2496,9 +2444,6 @@ class TipRole(BaseClass):
def tips(self):
return self.instances
# def __repr__(self):
# return f"<TipRole({self.name})>"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[TipRole, bool]:
new = False
@@ -2567,9 +2512,6 @@ class Tips(BaseClass, LogMixin):
def tiprole(self):
return self.role
# def __repr__(self):
# return f"<Tips({self.name})>"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Tips, bool]:
new = False