Creation of new scripts.

This commit is contained in:
lwark
2024-12-16 09:31:34 -06:00
parent 67520cb784
commit b1544da730
15 changed files with 214 additions and 148 deletions

View File

@@ -2,12 +2,12 @@
All kit and reagent related models
"""
from __future__ import annotations
import datetime, json, zipfile, yaml, logging, re
import json, zipfile, yaml, logging, re
from pprint import pformat
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date, datetime
from datetime import date, datetime, timedelta
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator, timezone
from typing import List, Literal, Generator, Any
from pandas import ExcelFile
@@ -259,6 +259,79 @@ class KitType(BaseClass):
base_dict['equipment_roles'].append(v)
return base_dict
@classmethod
def import_from_yml(cls, submission_type:str|SubmissionType, filepath: Path | str | None = None, import_dict: dict | None = None) -> KitType:
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if filepath:
yaml.add_constructor("!regex", yaml_regex_creator)
if isinstance(filepath, str):
filepath = Path(filepath)
if not filepath.exists():
logging.critical(f"Given file could not be found.")
return None
with open(filepath, "r") as f:
if filepath.suffix == ".json":
import_dict = json.load(fp=f)
elif filepath.suffix == ".yml":
import_dict = yaml.load(stream=f, Loader=yaml.Loader)
else:
raise Exception(f"Filetype {filepath.suffix} not supported.")
new_kit = KitType.query(name=import_dict['kit_type']['name'])
if not new_kit:
new_kit = KitType(name=import_dict['kit_type']['name'])
for role in import_dict['kit_type']['reagent_roles']:
new_role = ReagentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
eol = timedelta(role['extension_of_life'])
new_role = ReagentRole(name=role['role'], eol_ext=eol)
uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet'])
ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses)
ktrr_assoc.submission_type = submission_type
ktrr_assoc.required = role['required']
ktst_assoc = SubmissionTypeKitTypeAssociation(
kit_type=new_kit,
submission_type=submission_type,
mutable_cost_sample=import_dict['mutable_cost_sample'],
mutable_cost_column=import_dict['mutable_cost_column'],
constant_cost=import_dict['constant_cost']
)
for role in import_dict['kit_type']['equipment_roles']:
new_role = EquipmentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
new_role = EquipmentRole(name=role['role'])
for equipment in Equipment.assign_equipment(equipment_role=new_role):
new_role.instances.append(equipment)
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
equipment_role=new_role)
try:
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'],
static=role['static'])
except KeyError:
uses = None
ster_assoc.uses = uses
for process in role['processes']:
new_process = Process.query(name=process)
if not new_process:
new_process = Process(name=process)
new_process.submission_types.append(submission_type)
new_process.kit_types.append(new_kit)
new_process.equipment_roles.append(new_role)
return new_kit
class ReagentRole(BaseClass):
"""
@@ -903,58 +976,7 @@ class SubmissionType(BaseClass):
submission_type.sample_map = import_dict['samples']
submission_type.defaults = import_dict['defaults']
for kit in import_dict['kits']:
new_kit = KitType.query(name=kit['kit_type']['name'])
if not new_kit:
new_kit = KitType(name=kit['kit_type']['name'])
for role in kit['kit_type']['reagent roles']:
new_role = ReagentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
eol = datetime.timedelta(role['extension_of_life'])
new_role = ReagentRole(name=role['role'], eol_ext=eol)
uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet'])
ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses)
ktrr_assoc.submission_type = submission_type
ktrr_assoc.required = role['required']
ktst_assoc = SubmissionTypeKitTypeAssociation(
kit_type=new_kit,
submission_type=submission_type,
mutable_cost_sample=kit['mutable_cost_sample'],
mutable_cost_column=kit['mutable_cost_column'],
constant_cost=kit['constant_cost']
)
for role in kit['kit_type']['equipment roles']:
new_role = EquipmentRole.query(name=role['role'])
if new_role:
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
if check.lower() == "n":
new_role = None
else:
pass
if not new_role:
new_role = EquipmentRole(name=role['role'])
for equipment in Equipment.assign_equipment(equipment_role=new_role):
new_role.instances.append(equipment)
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
equipment_role=new_role)
try:
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'],
static=role['static'])
except KeyError:
uses = None
ster_assoc.uses = uses
for process in role['processes']:
new_process = Process.query(name=process)
if not new_process:
new_process = Process(name=process)
new_process.submission_types.append(submission_type)
new_process.kit_types.append(new_kit)
new_process.equipment_roles.append(new_role)
new_kit = KitType.import_from_yml(submission_type=submission_type, import_dict=kit)
if 'orgs' in import_dict.keys():
logger.info("Found Organizations to be imported.")
Organization.import_from_yml(filepath=filepath)
@@ -1321,7 +1343,8 @@ class Equipment(BaseClass, LogMixin):
else:
return {k: v for k, v in self.__dict__.items()}
def get_processes(self, submission_type: str | SubmissionType | None = None, extraction_kit: str | KitType | None = None) -> List[str]:
def get_processes(self, submission_type: str | SubmissionType | None = None,
extraction_kit: str | KitType | None = None) -> List[str]:
"""
Get all processes associated with this Equipment for a given SubmissionType
@@ -1399,7 +1422,7 @@ class Equipment(BaseClass, LogMixin):
from backend.validators.pydant import PydEquipment
processes = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit)
return PydEquipment(processes=processes, role=role,
**self.to_dict(processes=False))
**self.to_dict(processes=False))
@classmethod
def get_regex(cls) -> re.Pattern:
@@ -1547,9 +1570,9 @@ class EquipmentRole(BaseClass):
extraction_kit = KitType.query(name=extraction_kit)
for process in self.processes:
if submission_type and submission_type not in process.submission_types:
continue
continue
if extraction_kit and extraction_kit not in process.kit_types:
continue
continue
yield process.name
def to_export_dict(self, submission_type: SubmissionType, kit_type: KitType):
@@ -1597,7 +1620,6 @@ class SubmissionEquipmentAssociation(BaseClass):
Returns:
dict: This SubmissionEquipmentAssociation as a dictionary
"""
# TODO: Currently this will only fetch a single process, even if multiple are selectable.
try:
process = self.process.name
except AttributeError:
@@ -1606,7 +1628,13 @@ class SubmissionEquipmentAssociation(BaseClass):
processes=[process], role=self.role, nickname=self.equipment.nickname)
return output
def to_pydantic(self):
def to_pydantic(self) -> "PydEquipment":
"""
Returns a pydantic model based on this object.
Returns:
PydEquipment: pydantic equipment model
"""
from backend.validators import PydEquipment
return PydEquipment(**self.to_sub_dict())