From 452d1b4a77050f41832d579541d717c0797116be Mon Sep 17 00:00:00 2001 From: lwark Date: Wed, 4 Sep 2024 09:45:17 -0500 Subject: [PATCH] Importing submission type from json. --- src/submissions/backend/db/models/kits.py | 87 ++++++++++++++++--- .../backend/db/models/submissions.py | 6 ++ 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index c1fe6bc..b1d88c6 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -3,6 +3,8 @@ All kit and reagent related models """ from __future__ import annotations +import datetime +import json from pprint import pprint from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB @@ -255,9 +257,8 @@ class KitType(BaseClass): def save(self): super().save() - def to_export_dict(self, submission_type: SubmissionType): - base_dict = {} + base_dict = dict(name=self.name) base_dict['reagent roles'] = [] base_dict['equipment roles'] = [] for k, v in self.construct_xl_map_for_use(submission_type=submission_type): @@ -271,7 +272,8 @@ class KitType(BaseClass): base_dict['reagent roles'].append(v) for k, v in submission_type.construct_equipment_map(): try: - assoc = [item for item in submission_type.submissiontype_equipmentrole_associations if item.equipment_role.name == k][0] + assoc = [item for item in submission_type.submissiontype_equipmentrole_associations if + item.equipment_role.name == k][0] except IndexError: continue for kk, vv in assoc.to_export_dict(kit_type=self).items(): @@ -845,8 +847,9 @@ class SubmissionType(BaseClass): return cls.execute_query(query=query, limit=limit) def to_dict(self): - base_dict = {} + base_dict = dict(name=self.name) base_dict['info'] = self.construct_info_map(mode='export') + base_dict['defaults'] = self.defaults # base_dict['excel location maps']['kits'] = [{k: v for k, v in item.kit_type.construct_xl_map_for_use(submission_type=self)} for item in # self.submissiontype_kit_associations] base_dict['samples'] = self.construct_sample_map() @@ -854,7 +857,6 @@ class SubmissionType(BaseClass): base_dict['kits'] = [item.to_export_dict() for item in self.submissiontype_kit_associations] return base_dict - @check_authorization def save(self): """ @@ -862,6 +864,63 @@ class SubmissionType(BaseClass): """ super().save() + @classmethod + @check_authorization + def import_from_json(cls, filepath:Path|str): + if isinstance(filepath, str): + filepath = Path(filepath) + if not filepath.exists(): + logging.critical(f"Given file could not be found.") + return None + with open(filepath, "r") as f: + import_dict = json.load(fp=f) + submission_type = cls.query(name=import_dict['name']) + if submission_type: + return submission_type + submission_type = cls() + submission_type.name = import_dict['name'] + submission_type.info_map = import_dict['info'] + submission_type.sample_map = import_dict['samples'] + submission_type.defaults = import_dict['defaults'] + for kit in import_dict['kits']: + new_kit = KitType.query(name=kit['kit_type']['name']) + if not new_kit: + new_kit = KitType(name=kit['kit_type']['name']) + for role in kit['kit_type']['reagent roles']: + new_role = ReagentRole.query(name=role['role']) + eol = datetime.timedelta(role['extension_of_life']) + if not new_role: + new_role = ReagentRole(name=role['role'], eol_ext=eol) + uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet']) + ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses) + ktrr_assoc.submission_type = submission_type + ktrr_assoc.required = role['required'] + ktst_assoc = SubmissionTypeKitTypeAssociation( + kit_type=new_kit, + submission_type=submission_type, + mutable_cost_sample=kit['mutable_cost_sample'], + mutable_cost_column=kit['mutable_cost_column'], + constant_cost=kit['constant_cost'] + ) + for role in kit['kit_type']['equipment roles']: + new_role = EquipmentRole.query(name=role['role']) + if not new_role: + new_role = EquipmentRole(name=role['role']) + ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type, equipment_role=new_role) + try: + uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'], static=role['static']) + except KeyError: + uses = None + ster_assoc.uses = uses + for process in role['processes']: + new_process = Process.query(name=process) + if not new_process: + new_process = Process(name=process) + new_process.submission_types.append(submission_type) + new_process.kit_types.append(new_kit) + new_process.equipment_roles.append(new_role) + return submission_type + class SubmissionTypeKitTypeAssociation(BaseClass): """ @@ -942,7 +1001,7 @@ class SubmissionTypeKitTypeAssociation(BaseClass): def to_export_dict(self): exclude = ['_sa_instance_state', 'submission_types_id', 'kits_id', 'submission_type', 'kit_type'] - base_dict = {k:v for k,v in self.__dict__.items() if k not in exclude} + base_dict = {k: v for k, v in self.__dict__.items() if k not in exclude} base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type) logger.debug(f"STKTA returning: {base_dict}") return base_dict @@ -1061,7 +1120,7 @@ class KitTypeReagentRoleAssociation(BaseClass): return cls.execute_query(query=query, limit=limit) def to_export_dict(self): - base_dict={} + base_dict = {} base_dict['required'] = self.required for k, v in self.reagent_role.to_export_dict().items(): base_dict[k] = v @@ -1494,16 +1553,17 @@ class SubmissionEquipmentAssociation(BaseClass): @classmethod @setup_lookup - def query(cls, equipment_id:int, submission_id:int, role:str|None=None, limit:int=0, **kwargs) -> Any | List[Any]: + def query(cls, equipment_id: int, submission_id: int, role: str | None = None, limit: int = 0, **kwargs) -> Any | \ + List[ + Any]: query: Query = cls.__database_session__.query(cls) - query = query.filter(cls.equipment_id==equipment_id) - query = query.filter(cls.submission_id==submission_id) + query = query.filter(cls.equipment_id == equipment_id) + query = query.filter(cls.submission_id == submission_id) if role is not None: - query = query.filter(cls.role==role) + query = query.filter(cls.role == role) return cls.execute_query(query=query, limit=limit, **kwargs) - class SubmissionTypeEquipmentRoleAssociation(BaseClass): """ Abstract association between SubmissionType and EquipmentRole @@ -1767,7 +1827,8 @@ class SubmissionTipsAssociation(BaseClass): @classmethod @setup_lookup - def query(cls, tip_id: int, role: str, submission_id: int|None=None, limit: int = 0, **kwargs) -> Any | List[Any]: + def query(cls, tip_id: int, role: str, submission_id: int | None = None, limit: int = 0, **kwargs) -> Any | List[ + Any]: query: Query = cls.__database_session__.query(cls) query = query.filter(cls.tip_id == tip_id) if submission_id is not None: diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 2c8c22b..61ddd33 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -1322,6 +1322,12 @@ class BacterialCulture(BasicSubmission): row = idx.index.to_list()[0] return row + 1 + @classmethod + def custom_info_parser(cls, input_dict: dict, xl: Workbook | None = None, custom_fields: dict = {}) -> dict: + input_dict = super().custom_info_parser(input_dict=input_dict, xl=xl, custom_fields=custom_fields) + logger.debug(f"\n\nInfo dictionary:\n\n{pformat(input_dict)}\n\n") + return input_dict + class Wastewater(BasicSubmission): """