Getting results referencing ProcedureSampleAssociation

This commit is contained in:
lwark
2025-06-09 11:05:10 -05:00
parent db0b65b07b
commit 10d4c9f155
17 changed files with 476 additions and 208 deletions

View File

@@ -1,26 +1,25 @@
"""
"""
import logging, re
from pathlib import Path
from typing import Generator, Tuple
from openpyxl import load_workbook
from pandas import DataFrame
from backend.validators import pydant
logger = logging.getLogger(f"submissions.{__name__}")
class DefaultParser(object):
default_range_dict = dict(
start_row=2,
end_row=18,
key_column=1,
value_column=2,
sheet="Sample List"
)
def __repr__(self):
return f"{self.__class__.__name__}<{self.filepath.stem}>"
def __init__(self, filepath: Path | str, range_dict: dict | None = None):
self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}")
def __init__(self, filepath: Path | str, range_dict: dict | None = None, *args, **kwargs):
try:
self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}")
except AttributeError:
self._pyd_object = pydant.PydResults
if isinstance(filepath, str):
self.filepath = Path(filepath)
else:
@@ -30,5 +29,63 @@ class DefaultParser(object):
self.range_dict = self.__class__.default_range_dict
else:
self.range_dict = range_dict
for item in self.range_dict:
item['worksheet'] = self.workbook[item['sheet']]
from .submission_parser import *
def to_pydantic(self):
data = {key: value for key, value in self.parsed_info}
data['filepath'] = self.filepath
return self._pyd_object(**data)
class DefaultKEYVALUEParser(DefaultParser):
default_range_dict = [dict(
start_row=2,
end_row=18,
key_column=1,
value_column=2,
sheet="Sample List"
)]
@property
def parsed_info(self) -> Generator[Tuple, None, None]:
for item in self.range_dict:
rows = range(item['start_row'], item['end_row'] + 1)
for row in rows:
key = item['worksheet'].cell(row, item['key_column']).value
if key:
# Note: Remove anything in brackets.
key = re.sub(r"\(.*\)", "", key)
key = key.lower().replace(":", "").strip().replace(" ", "_")
value = item['worksheet'].cell(row, item['value_column']).value
value = dict(value=value, missing=False if value else True)
yield key, value
class DefaultTABLEParser(DefaultParser):
default_range_dict = [dict(
header_row=20,
end_row=116,
sheet="Sample List"
)]
@property
def parsed_info(self):
for item in self.range_dict:
list_worksheet = self.workbook[item['sheet']]
list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:])
list_df.columns = list_df.iloc[0]
list_df = list_df[1:]
list_df = list_df.dropna(axis=1, how='all')
for ii, row in enumerate(list_df.iterrows()):
output = {key.lower().replace(" ", "_"): value for key, value in row[1].to_dict().items()}
yield output
def to_pydantic(self, **kwargs):
return [self._pyd_object(**output) for output in self.parsed_info]
from .submission_parser import *

View File

@@ -0,0 +1,99 @@
"""
"""
import logging, re, sys
from pprint import pformat
from pathlib import Path
from typing import Generator, Tuple
from openpyxl import load_workbook
from backend.db.models import Run, Sample
from . import DefaultKEYVALUEParser, DefaultTABLEParser
logger = logging.getLogger(f"submissions.{__name__}")
class PCRSampleParser(DefaultTABLEParser):
"""Object to pull data from Design and Analysis PCR export file."""
default_range_dict = [dict(
header_row=25,
sheet="Results"
)]
@property
def parsed_info(self):
output = [item for item in super().parsed_info]
merge_column = "sample"
sample_names = list(set([item['sample'] for item in output]))
for sample in sample_names:
multi = dict()
sois = (item for item in output if item['sample']==sample)
for soi in sois:
multi[soi['target']] = {k:v for k, v in soi.items() if k != "target"}
yield (sample, multi)
def to_pydantic(self):
for key, sample_info in self.parsed_info:
sample_obj = Sample.query(sample_id=key)
if sample_obj and not isinstance(sample_obj, list):
yield self._pyd_object(results=sample_info, parent=sample_obj)
else:
continue
class PCRInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict(
start_row=1,
end_row=24,
key_column=1,
value_column=2,
sheet="Results"
)]
# def __init__(self, filepath: Path | str, range_dict: dict | None = None):
# super().__init__(filepath=filepath, range_dict=range_dict)
# self.worksheet = self.workbook[self.range_dict['sheet']]
# self.rows = range(self.range_dict['start_row'], self.range_dict['end_row'] + 1)
#
# @property
# def parsed_info(self) -> Generator[Tuple, None, None]:
# for row in self.rows:
# key = self.worksheet.cell(row, self.range_dict['key_column']).value
# if key:
# key = re.sub(r"\(.*\)", "", key)
# key = key.lower().replace(":", "").strip().replace(" ", "_")
# value = self.worksheet.cell(row, self.range_dict['value_column']).value
# value = dict(value=value, missing=False if value else True)
# yield key, value
#
def to_pydantic(self):
from backend.db.models import Procedure
data = {key: value for key, value in self.parsed_info}
data['filepath'] = self.filepath
return self._pyd_object(**data, parent=Procedure)
# @property
# def pcr_info(self) -> dict:
# """
# Parse general info rows for all types of PCR results
# """
# info_map = self.submission_obj.get_submission_type().sample_map['pcr_general_info']
# sheet = self.xl[info_map['sheet']]
# iter_rows = sheet.iter_rows(min_row=info_map['start_row'], max_row=info_map['end_row'])
# pcr = {}
# for row in iter_rows:
# try:
# key = row[0].value.lower().replace(' ', '_')
# except AttributeError as e:
# logger.error(f"No key: {row[0].value} due to {e}")
# continue
# value = row[1].value or ""
# pcr[key] = value
# pcr['imported_by'] = getuser()
# return pcr

View File

@@ -1,66 +1,50 @@
"""
"""
import logging, re
from pathlib import Path
from typing import Generator, Tuple
from pandas import DataFrame
from . import DefaultParser
import logging
from string import ascii_lowercase
from typing import Generator
from tools import row_keys
from . import DefaultKEYVALUEParser, DefaultTABLEParser
logger = logging.getLogger(f"submissions.{__name__}")
class ClientSubmissionParser(DefaultParser):
class ClientSubmissionParser(DefaultKEYVALUEParser):
"""
Object for retrieving submitter info from "sample list" sheet
"""
def __init__(self, filepath: Path | str, range_dict: dict | None = None):
super().__init__(filepath=filepath, range_dict=range_dict)
self.worksheet = self.workbook[self.range_dict['sheet']]
self.rows = range(self.range_dict['start_row'], self.range_dict['end_row'] + 1)
@property
def parsed_info(self) -> Generator[Tuple, None, None]:
for row in self.rows:
key = self.worksheet.cell(row, self.range_dict['key_column']).value
if key:
key = re.sub(r"\(.*\)", "", key)
key = key.lower().replace(":", "").strip().replace(" ", "_")
value = self.worksheet.cell(row, self.range_dict['value_column']).value
value = dict(value=value, missing=False if value else True)
yield key, value
def to_pydantic(self):
data = {key: value for key, value in self.parsed_info}
data['filepath'] = self.filepath
return self._pyd_object(**data)
default_range_dict = [dict(
start_row=2,
end_row=18,
key_column=1,
value_column=2,
sheet="Sample List"
)]
class SampleParser(DefaultParser):
class SampleParser(DefaultTABLEParser):
"""
Object for retrieving submitter info from "sample list" sheet
"""
default_range_dict = dict(
default_range_dict = [dict(
header_row=20,
end_row=116,
list_sheet="Sample List"
)
def __init__(self, filepath: Path | str, range_dict: dict | None = None):
super().__init__(filepath=filepath, range_dict=range_dict)
self.list_worksheet = self.workbook[self.range_dict['list_sheet']]
self.list_df = DataFrame([item for item in self.list_worksheet.values][self.range_dict['header_row'] - 1:])
self.list_df.columns = self.list_df.iloc[0]
self.list_df = self.list_df[1:]
self.list_df = self.list_df.dropna(axis=1, how='all')
sheet="Sample List"
)]
@property
def parsed_info(self) -> Generator[dict, None, None]:
for ii, row in enumerate(self.list_df.iterrows()):
sample = {key.lower().replace(" ", "_"): value for key, value in row[1].to_dict().items()}
output = super().parsed_info
for ii, sample in enumerate(output):
if isinstance(sample["row"], str) and sample["row"].lower() in ascii_lowercase[0:8]:
try:
sample["row"] = row_keys[sample["row"]]
except KeyError:
pass
sample['submission_rank'] = ii + 1
yield sample