Addition of procedure parser in import.

This commit is contained in:
lwark
2025-06-17 15:09:51 -05:00
parent 0233bc3ac2
commit d8c3f3bbb2
31 changed files with 688 additions and 304 deletions

View File

@@ -1,14 +1,15 @@
"""
"""
from __future__ import annotations
import logging, re
from pathlib import Path
from typing import Generator, Tuple
from openpyxl import load_workbook
from typing import Generator, Tuple, TYPE_CHECKING
from pandas import DataFrame
from backend.validators import pydant
from backend.db.models import Procedure
from dataclasses import dataclass
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
@@ -30,7 +31,7 @@ class DefaultParser(object):
return instance
def __init__(self, filepath: Path | str, procedure: Procedure|None=None, range_dict: dict | None = None, *args, **kwargs):
def __init__(self, filepath: Path | str, proceduretype: ProcedureType|None=None, range_dict: dict | None = None, *args, **kwargs):
"""
Args:
@@ -40,7 +41,7 @@ class DefaultParser(object):
*args ():
**kwargs ():
"""
self.procedure = procedure
self.proceduretype = proceduretype
try:
self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}")
except AttributeError:
@@ -58,6 +59,13 @@ class DefaultParser(object):
data['filepath'] = self.filepath
return self._pyd_object(**data)
@classmethod
def correct_procedure_type(cls, proceduretype: str | "ProcedureType"):
from backend.db.models import ProcedureType
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
return proceduretype
class DefaultKEYVALUEParser(DefaultParser):
@@ -90,7 +98,6 @@ class DefaultTABLEParser(DefaultParser):
default_range_dict = [dict(
header_row=20,
end_row=116,
sheet="Sample List"
)]
@@ -98,15 +105,25 @@ class DefaultTABLEParser(DefaultParser):
def parsed_info(self):
for item in self.range_dict:
list_worksheet = self.workbook[item['sheet']]
list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:])
if "end_row" in item.keys():
list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:item['end_row']-1])
else:
list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:])
list_df.columns = list_df.iloc[0]
list_df = list_df[1:]
list_df = list_df.dropna(axis=1, how='all')
for ii, row in enumerate(list_df.iterrows()):
output = {key.lower().replace(" ", "_"): value for key, value in row[1].to_dict().items()}
output = {}
for key, value in row[1].to_dict().items():
if isinstance(key, str):
key = key.lower().replace(" ", "_")
key = re.sub(r"_(\(.*\)|#)", "", key)
logger.debug(f"Row {ii} values: {key}: {value}")
output[key] = value
yield output
def to_pydantic(self, **kwargs):
return [self._pyd_object(**output) for output in self.parsed_info]
from .submission_parser import *
from .clientsubmission_parser import *
from backend.excel.parsers.results_parsers.pcr_results_parser import *

View File

@@ -1,6 +1,7 @@
"""
"""
from __future__ import annotations
import logging
from pathlib import Path
from string import ascii_lowercase
@@ -9,8 +10,9 @@ from typing import Generator
from openpyxl.reader.excel import load_workbook
from tools import row_keys
from backend.db.models import SubmissionType
# from backend.db.models import SubmissionType
from . import DefaultKEYVALUEParser, DefaultTABLEParser
from backend.managers import procedures as procedure_managers
logger = logging.getLogger(f"submissions.{__name__}")
@@ -34,6 +36,7 @@ class SubmissionTyperMixin(object):
@classmethod
def get_subtype_from_regex(cls, filepath: Path):
from backend.db.models import SubmissionType
regex = SubmissionType.regex
m = regex.search(filepath.__str__())
try:
@@ -45,7 +48,8 @@ class SubmissionTyperMixin(object):
@classmethod
def get_subtype_from_preparse(cls, filepath: Path):
parser = ClientSubmissionParser(filepath)
from backend.db.models import SubmissionType
parser = ClientSubmissionInfoParser(filepath)
sub_type = next((value for k, value in parser.parsed_info if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
@@ -54,6 +58,7 @@ class SubmissionTyperMixin(object):
@classmethod
def get_subtype_from_properties(cls, filepath: Path):
from backend.db.models import SubmissionType
wb = load_workbook(filepath)
# NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";")
@@ -64,7 +69,7 @@ class SubmissionTyperMixin(object):
return sub_type
class ClientSubmissionParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter info from "sample list" sheet
"""
@@ -78,13 +83,29 @@ class ClientSubmissionParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
)]
def __init__(self, filepath: Path | str, *args, **kwargs):
from frontend.widgets.pop_ups import QuestionAsker
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.info_map
super().__init__(filepath=filepath, **kwargs)
allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype]
for name in allowed_procedure_types:
if name in self.workbook.sheetnames:
# TODO: check if run with name already exists
add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?")
if add_run.accepted:
class ClientSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
# NOTE: recruit parser.
try:
manager = getattr(procedure_managers, name)
except AttributeError:
manager = procedure_managers.DefaultManager
self.manager = manager(proceduretype=name)
pass
class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter samples from "sample list" sheet
"""

View File

@@ -0,0 +1,119 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from backend.excel.parsers import DefaultTABLEParser, DefaultKEYVALUEParser
if TYPE_CHECKING:
from backend.db.models import ProcedureType
class DefaultInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict(
start_row=1,
end_row=14,
key_column=1,
value_column=2,
sheet=""
)]
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType"|None=None, range_dict: dict | None = None, *args, **kwargs):
from backend.validators.pydant import PydProcedure
proceduretype = self.correct_procedure_type(proceduretype)
if not range_dict:
range_dict = proceduretype.info_map
if not range_dict:
range_dict = self.__class__.default_range_dict
for item in range_dict:
item['sheet'] = proceduretype.name
super().__init__(filepath=filepath, proceduretype=proceduretype, range_dict=range_dict, *args, **kwargs)
self._pyd_object = PydProcedure
class DefaultSampleParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=41,
sheet=""
)]
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType"|None=None, range_dict: dict | None = None, *args, **kwargs):
from backend.validators.pydant import PydSample
proceduretype = self.correct_procedure_type(proceduretype)
if not range_dict:
range_dict = proceduretype.sample_map
if not range_dict:
range_dict = self.__class__.default_range_dict
for item in range_dict:
item['sheet'] = proceduretype.name
super().__init__(filepath=filepath, procedure=proceduretype, range_dict=range_dict, *args, **kwargs)
self._pyd_object = PydSample
class DefaultReagentParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=17,
end_row=29,
sheet=""
)]
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType"|None=None, range_dict: dict | None = None, *args, **kwargs):
from backend.validators.pydant import PydReagent
proceduretype = self.correct_procedure_type(proceduretype)
if not range_dict:
range_dict = proceduretype.sample_map
if not range_dict:
range_dict = self.__class__.default_range_dict
for item in range_dict:
item['sheet'] = proceduretype.name
super().__init__(filepath=filepath, proceduretype=proceduretype, range_dict=range_dict, *args, **kwargs)
self._pyd_object = PydReagent
@property
def parsed_info(self):
output = super().parsed_info
for item in output:
if not item['lot']:
continue
item['reagentrole'] = item['reagent_role']
yield item
class DefaultEquipmentParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=32,
end_row=39,
sheet=""
)]
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType"|None=None, range_dict: dict | None = None, *args, **kwargs):
from backend.validators.pydant import PydEquipment
proceduretype = self.correct_procedure_type(proceduretype)
if not range_dict:
range_dict = proceduretype.sample_map
if not range_dict:
range_dict = self.__class__.default_range_dict
for item in range_dict:
item['sheet'] = proceduretype.name
super().__init__(filepath=filepath, proceduretype=proceduretype, range_dict=range_dict, *args, **kwargs)
self._pyd_object = PydEquipment
@property
def parsed_info(self):
output = super().parsed_info
for item in output:
if not item['name']:
continue
from backend.db.models import Equipment, Process
from backend.validators.pydant import PydTips, PydProcess
eq = Equipment.query(name=item['name'])
item['asset_number'] = eq.asset_number
item['nickname'] = eq.nickname
process = Process.query(name=item['process'])
if item['tips']:
item['tips'] = [PydTips(name=item['tips'], tiprole=process.tiprole[0].name)]
item['equipmentrole'] = item['equipment_role']
yield item

View File

@@ -1,19 +1,14 @@
"""
"""
import logging, re, sys
from pprint import pformat
from pathlib import Path
from typing import Generator, Tuple
from openpyxl import load_workbook
import logging
from backend.db.models import Run, Sample, Procedure, ProcedureSampleAssociation
from . import DefaultKEYVALUEParser, DefaultTABLEParser
from backend.excel.parsers import DefaultKEYVALUEParser, DefaultTABLEParser
logger = logging.getLogger(f"submissions.{__name__}")
# class PCRResultsParser(DefaultParser):
# pass
class PCRInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict(