Preparing for production testing using Bacterial Culture

This commit is contained in:
lwark
2025-08-06 13:26:37 -05:00
parent 6f134d3870
commit 087bf9bcb7
24 changed files with 757 additions and 650 deletions

View File

@@ -4,6 +4,7 @@ Contains all models for sqlalchemy
from __future__ import annotations
import sys, logging, json
from collections import OrderedDict
import sqlalchemy.exc
from dateutil.parser import parse
@@ -295,8 +296,8 @@ class BaseClass(Base):
# logger.debug(f"Model: {model}")
if query is None:
query: Query = cls.__database_session__.query(cls)
else:
logger.debug(f"Incoming query: {query}")
# else:
# logger.debug(f"Incoming query: {query}")
singles = cls.get_default_info('singles')
for k, v in kwargs.items():
logger.info(f"Using key: {k} with value: {v} against {cls}")
@@ -611,7 +612,7 @@ class BaseClass(Base):
relevant = {k: v for k, v in self.__class__.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)}
output = {}
output = OrderedDict()
output['excluded'] = ["excluded", "misc_info", "_misc_info", "id"]
for k, v in relevant.items():
try:
@@ -624,23 +625,11 @@ class BaseClass(Base):
value = getattr(self, k)
except AttributeError:
continue
# match value:
# case datetime():
# value = value.strftime("%Y-%m-%d %H:%M:%S")
# case bytes():
# continue
# case dict():
# try:
# value = value['name']
# except KeyError:
# if k == "_misc_info":
# value = value
# else:
# continue
# case _:
# pass
output[k.strip("_")] = value
# output = self.clean_details_dict(output)
if self._misc_info:
for key, value in self._misc_info.items():
output[key] = value
return output
@classmethod
@@ -670,8 +659,6 @@ class BaseClass(Base):
output[k] = value
return output
def to_pydantic(self, pyd_model_name:str|None=None, **kwargs):
from backend.validators import pydant
if not pyd_model_name:

View File

@@ -154,8 +154,8 @@ class KitType(BaseClass):
) #: Relation to SubmissionType
proceduretype = association_proxy("kittypeproceduretypeassociation", "proceduretype",
creator=lambda ST: ProcedureTypeKitTypeAssociation(
submissiontype=ST)) #: Association proxy to SubmissionTypeKitTypeAssociation
creator=lambda PT: ProcedureTypeKitTypeAssociation(
proceduretype=PT)) #: Association proxy to SubmissionTypeKitTypeAssociation
@@ -571,6 +571,8 @@ class ReagentRole(BaseClass):
reagents = [reagent for reagent in self.reagent]
if assoc:
last_used = Reagent.query(name=assoc.last_used)
if isinstance(last_used, list):
last_used = None
if last_used:
reagents.insert(0, reagents.pop(reagents.index(last_used)))
# return [f"{reagent.name} - {reagent.lot} - {reagent.expiry}" for reagent in reagents]

View File

@@ -60,6 +60,7 @@ class ClientSubmission(BaseClass, LogMixin):
name="fk_BS_sublab_id")) #: client lab id from _organizations
submission_category = Column(String(64))
sample_count = Column(INTEGER) #: Number of sample in the procedure
full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate.
comment = Column(JSON)
run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship
contact = relationship("Contact", back_populates="clientsubmission") #: client org
@@ -86,6 +87,10 @@ class ClientSubmission(BaseClass, LogMixin):
def name(self):
return self.submitter_plate_id
@property
def max_sample_rank(self) -> int:
return max([item.submission_rank for item in self.clientsubmissionsampleassociation])
@classmethod
@setup_lookup
def query(cls,
@@ -308,6 +313,7 @@ class ClientSubmission(BaseClass, LogMixin):
row=row,
column=column
)
# assoc.save()
return assoc
@property
@@ -357,7 +363,9 @@ class ClientSubmission(BaseClass, LogMixin):
def details_dict(self, **kwargs):
output = super().details_dict(**kwargs)
output['clientlab'] = output['clientlab'].details_dict()
output['contact'] = output['contact'].details_dict()
if "contact" in output and issubclass(output['contact'].__class__, BaseClass):
output['contact'] = output['contact'].details_dict()
output['contact_email'] = output['contact']['email']
output['submissiontype'] = output['submissiontype'].details_dict()
output['run'] = [run.details_dict() for run in output['run']]
output['sample'] = [sample.details_dict() for sample in output['clientsubmissionsampleassociation']]
@@ -365,15 +373,13 @@ class ClientSubmission(BaseClass, LogMixin):
output['client_lab'] = output['clientlab']
output['submission_type'] = output['submissiontype']
output['excluded'] += ['run', "sample", "clientsubmissionsampleassociation", "excluded",
"expanded", 'clientlab', 'submissiontype', 'id']
"expanded", 'clientlab', 'submissiontype', 'id', 'info_placement', 'filepath', "name"]
output['expanded'] = ["clientlab", "contact", "submissiontype"]
# output = self.clean_details_dict(output)
logger.debug(f"{self.__class__.__name__}\n\n{pformat(output)}")
return output
def to_pydantic(self, filepath: Path | str | None = None, **kwargs):
output = super().to_pydantic(filepath=filepath, **kwargs)
output.template_file = self.template_file
# output.template_file = self.template_file
return output
@@ -690,7 +696,7 @@ class Run(BaseClass, LogMixin):
output['procedure'] = [procedure.details_dict() for procedure in output['procedure']]
output['permission'] = is_power_user()
output['excluded'] += ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom',
'permission', "clientsubmission"]
'permission', "clientsubmission"]
output['sample_count'] = self.sample_count
output['client_submission'] = self.clientsubmission.name
output['started_date'] = self.started_date
@@ -1337,6 +1343,10 @@ class Run(BaseClass, LogMixin):
Manager = getattr(managers, f"Default{self.__class__.__name__}Manager")
manager = Manager(parent=obj, input_object=self.to_pydantic())
workbook = manager.write()
try:
workbook.remove_sheet("Sheet")
except ValueError:
pass
workbook.save(filename=output_filepath)
def construct_filename(self):
@@ -1695,8 +1705,8 @@ class ClientSubmissionSampleAssociation(BaseClass):
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample
clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id"),
primary_key=True) #: id of associated procedure
row = Column(INTEGER)
column = Column(INTEGER)
# row = Column(INTEGER)
# column = Column(INTEGER)
submission_rank = Column(INTEGER, primary_key=True, default=0) #: Location in sample list
# NOTE: reference to the Submission object
clientsubmission = relationship("ClientSubmission",
@@ -1740,13 +1750,13 @@ class ClientSubmissionSampleAssociation(BaseClass):
# NOTE: Get associated sample info
sample = self.sample.to_sub_dict()
sample['sample_id'] = self.sample.sample_id
sample['row'] = self.row
sample['column'] = self.column
try:
sample['well'] = f"{row_map[self.row]}{self.column}"
except KeyError as e:
logger.error(f"Unable to find row {self.row} in row_map.")
sample['Well'] = None
# sample['row'] = self.row
# sample['column'] = self.column
# try:
# sample['well'] = f"{row_map[self.row]}{self.column}"
# except (KeyError, AttributeError) as e:
# logger.error(f"Unable to find row {self.row} in row_map.")
# sample['Well'] = None
sample['plate_name'] = self.clientsubmission.submitter_plate_id
sample['positive'] = False
sample['submitted_date'] = self.clientsubmission.submitted_date
@@ -1760,13 +1770,9 @@ class ClientSubmissionSampleAssociation(BaseClass):
# logger.debug(f"Relevant info from assoc output: {pformat(relevant)}")
output = output['sample'].details_dict()
misc = output['misc_info']
# logger.debug(f"Output from sample: {pformat(output)}")
# # logger.debug(f"Output from sample: {pformat(output)}")
output.update(relevant)
output['misc_info'] = misc
# output['sample'] = temp
# output.update(output['sample'].details_dict())
# sys.exit()
return output
def to_pydantic(self) -> "PydSample":
@@ -1777,7 +1783,7 @@ class ClientSubmissionSampleAssociation(BaseClass):
PydSample: Pydantic Model
"""
from backend.validators import PydSample
return PydSample(**self.to_sub_dict())
return PydSample(**self.details_dict())
@property
def hitpicked(self) -> dict | None:
@@ -2194,7 +2200,7 @@ class ProcedureSampleAssociation(BaseClass):
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
row = Column(INTEGER)
column = Column(INTEGER)
plate_rank = Column(INTEGER)
procedure_rank = Column(INTEGER)
procedure = relationship(Procedure,
back_populates="proceduresampleassociation") #: associated procedure
@@ -2267,4 +2273,3 @@ class ProcedureSampleAssociation(BaseClass):
except KeyError:
logger.error(output)
return output

View File

@@ -4,13 +4,11 @@
from __future__ import annotations
import logging, re
from pathlib import Path
from typing import Generator, Tuple, TYPE_CHECKING
from typing import Generator, TYPE_CHECKING
from openpyxl.cell import MergedCell
from openpyxl.reader.excel import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from pandas import DataFrame
from backend.validators import pydant
if TYPE_CHECKING:
from backend.db.models import ProcedureType
@@ -34,8 +32,8 @@ class DefaultParser(object):
instance.filepath = filepath
return instance
def __init__(self, filepath: Path | str, proceduretype: ProcedureType | None = None, range_dict: dict | None = None,
*args, **kwargs):
def __init__(self, filepath: Path | str, proceduretype: ProcedureType | None = None, sheet: str | None = None,
start_row: int = 1, *args, **kwargs):
"""
Args:
@@ -45,7 +43,6 @@ class DefaultParser(object):
*args ():
**kwargs ():
"""
logger.debug(f"\n\nHello from {self.__class__.__name__}\n\n")
self.proceduretype = proceduretype
try:
@@ -55,20 +52,21 @@ class DefaultParser(object):
logger.error(
f"Couldn't get pyd object: Pyd{self.__class__.__name__.replace('Parser', '').replace('Info', '')}, using {self.__class__.pyd_name}")
self._pyd_object = getattr(pydant, self.__class__.pyd_name)
if not sheet:
sheet = self.__class__.sheet
self.sheet = sheet
if not start_row:
start_row = self.__class__.start_row
self.workbook = load_workbook(self.filepath, data_only=True)
if not range_dict:
self.range_dict = self.__class__.default_range_dict
else:
self.range_dict = range_dict
logger.debug(f"Default parser range dict: {self.range_dict}")
for item in self.range_dict:
item['worksheet'] = self.workbook[item['sheet']]
self.worksheet = self.workbook[self.sheet]
self.start_row = self.delineate_start_row(start_row=start_row)
self.end_row = self.delineate_end_row(start_row=self.start_row)
logger.debug(f"Start row: {self.start_row}, End row: {self.end_row}")
def to_pydantic(self):
# data = {key: value['value'] for key, value in self.parsed_info.items()}
data = self.parsed_info
data['filepath'] = self.filepath
return self._pyd_object(**data)
@classmethod
@@ -78,75 +76,68 @@ class DefaultParser(object):
proceduretype = ProcedureType.query(name=proceduretype)
return proceduretype
@classmethod
def delineate_end_row(cls, worksheet: Worksheet, start_row: int = 1):
for iii, row in enumerate(worksheet.iter_rows(min_row=start_row), start=1):
def delineate_start_row(self, start_row: int = 1):
for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row):
if not all([item.value is None for item in row]):
return iii
return self.worksheet.min
def delineate_end_row(self, start_row: int = 1):
for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row):
if all([item.value is None for item in row]):
return iii
return self.worksheet.max_row
class DefaultKEYVALUEParser(DefaultParser):
# default_range_dict = [dict(
# start_row=2,
# end_row=18,
# key_column=1,
# value_column=2,
# sheet="Sample List"
# )]
# default_range_dict = [dict(sheet="Sample List", start_row=2)]
sheet = "Client Info"
start_row = 1
@property
def parsed_info(self):
for item in self.range_dict:
item['end_row'] = self.delineate_end_row(item['worksheet'], start_row=item['start_row'])
rows = range(item['start_row'], item['end_row'])
# item['start_row'] = item['end_row']
# del item['end_row']
for row in rows:
key = item['worksheet'].cell(row, 1).value
if key:
# Note: Remove anything in brackets.
key = re.sub(r"\(.*\)", "", key)
key = key.lower().replace(":", "").strip().replace(" ", "_")
value = item['worksheet'].cell(row, 2).value
missing = False if value else True
location_map = dict(row=row, key_column=1, value_column=2,
sheet=item['sheet'])
value = dict(value=value, location=location_map, missing=missing)
logger.debug(f"Yielding {value} for {key}")
yield key, value
rows = range(self.start_row, self.end_row)
for row in rows:
check_row = [item for item in self.worksheet.rows][row-1]
logger.debug(f"Checking row {row-1}, {check_row} for merged cells.")
if any([isinstance(cell, MergedCell) for cell in check_row]):
continue
key = self.worksheet.cell(row, 1).value
if key:
# Note: Remove anything in brackets.
key = re.sub(r"\(.*\)", "", key)
key = key.lower().replace(":", "").strip().replace(" ", "_")
value = self.worksheet.cell(row, 2).value
missing = False if value else True
# location_map = dict(row=row, key_column=1, value_column=2, sheet=self.worksheet.title)
value = dict(value=value, missing=missing)#, location=location_map)
logger.debug(f"Yielding {value} for {key}")
yield key, value
class DefaultTABLEParser(DefaultParser):
default_range_dict = [dict(
header_row=18,
sheet="Sample List"
)]
sheet = "Client Info"
start_row = 18
@property
def parsed_info(self) -> Generator[dict, None, None]:
for item in self.range_dict:
# list_worksheet = self.workbook[item['sheet']]
list_worksheet = item['worksheet']
if "end_row" in item.keys():
list_df = DataFrame(
[item for item in list_worksheet.values][item['header_row'] - 1:item['end_row'] - 1])
else:
list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:])
list_df.columns = list_df.iloc[0]
list_df = list_df[1:]
list_df = list_df.dropna(axis=1, how='all')
for ii, row in enumerate(list_df.iterrows()):
output = {}
for key, value in row[1].to_dict().items():
if isinstance(key, str):
key = key.lower().replace(" ", "_")
key = re.sub(r"_(\(.*\)|#)", "", key)
# logger.debug(f"Row {ii} values: {key}: {value}")
output[key] = value
yield output
logger.debug(f"creating dataframe from {self.start_row} to {self.end_row}")
df = DataFrame(
[item for item in self.worksheet.values][self.start_row - 1:self.end_row - 1])
df.columns = df.iloc[0]
df = df[1:]
df = df.dropna(axis=1, how='all')
for ii, row in enumerate(df.iterrows()):
output = {}
for key, value in row[1].to_dict().items():
if isinstance(key, str):
key = key.lower().replace(" ", "_")
key = re.sub(r"_(\(.*\)|#)", "", key)
# logger.debug(f"Row {ii} values: {key}: {value}")
output[key] = value
yield output
def to_pydantic(self, **kwargs):
return [self._pyd_object(**output) for output in self.parsed_info]

View File

@@ -5,14 +5,12 @@ from __future__ import annotations
import logging
from pathlib import Path
from string import ascii_lowercase
from typing import Generator
from typing import Generator, TYPE_CHECKING, Literal
from openpyxl.reader.excel import load_workbook
from tools import row_keys
# from backend.db.models import SubmissionType
from . import DefaultKEYVALUEParser, DefaultTABLEParser
if TYPE_CHECKING:
from backend.db.models import SubmissionType
logger = logging.getLogger(f"submissions.{__name__}")
@@ -20,7 +18,16 @@ logger = logging.getLogger(f"submissions.{__name__}")
class SubmissionTyperMixin(object):
@classmethod
def retrieve_submissiontype(cls, filepath: Path):
def retrieve_submissiontype(cls, filepath: Path) -> "SubmissionType":
"""
Gets the submission type from a file.
Args:
filepath (Path): The import file
Returns:
SubmissionType: The determined submissiontype
"""
# NOTE: Attempt 1, get from form properties:
sub_type = cls.get_subtype_from_properties(filepath=filepath)
if not sub_type:
@@ -35,7 +42,16 @@ class SubmissionTyperMixin(object):
return sub_type
@classmethod
def get_subtype_from_regex(cls, filepath: Path):
def get_subtype_from_regex(cls, filepath: Path) -> "SubmissionType":
"""
Uses regex of the file name to determine submissiontype
Args:
filepath (Path): The import file
Returns:
SubmissionType: The determined submissiontype
"""
from backend.db.models import SubmissionType
regex = SubmissionType.regex
m = regex.search(filepath.__str__())
@@ -43,21 +59,42 @@ class SubmissionTyperMixin(object):
sub_type = m.lastgroup
except AttributeError as e:
sub_type = None
logger.critical(f"No procedure type found or procedure type found!: {e}")
logger.critical(f"No submission type or procedure type found!: {e}")
sub_type = SubmissionType.query(name=sub_type, limit=1)
if not sub_type:
return
return sub_type
@classmethod
def get_subtype_from_preparse(cls, filepath: Path):
def get_subtype_from_preparse(cls, filepath: Path) -> "SubmissionType":
"""
Performs a default parse of the file in an attempt to find the submission type.
Args:
filepath (Path): The import file
Returns:
SubmissionType: The determined submissiontype
"""
from backend.db.models import SubmissionType
parser = ClientSubmissionInfoParser(filepath)
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type)
parser = ClientSubmissionInfoParser(filepath=filepath, submissiontype=SubmissionType.query(name="Test"))
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype" or k == "submission_type"), None)
sub_type = SubmissionType.query(name=sub_type.title())
if isinstance(sub_type, list):
sub_type = None
return
return sub_type
@classmethod
def get_subtype_from_properties(cls, filepath: Path):
def get_subtype_from_properties(cls, filepath: Path) -> "SubmissionType":
"""
Attempts to get submission type from the xl metadata.
Args:
filepath (Path): The import file
Returns:
SubmissionType: The determined submissiontype
"""
from backend.db.models import SubmissionType
wb = load_workbook(filepath)
# NOTE: Gets first category in the metadata.
@@ -65,62 +102,56 @@ class SubmissionTyperMixin(object):
sub_type = next((item.strip().title() for item in categories), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
sub_type = None
return
return sub_type
class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter info from "sample list" sheet
Object for retrieving submitter info from "Client Info" sheet
"""
pyd_name = "PydClientSubmission"
default_range_dict = [dict(
start_row=2,
end_row=16,
key_column=1,
value_column=2,
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, submissiontype:"SubmissionType"|None=None, *args, **kwargs):
from frontend.widgets.pop_ups import QuestionAsker
from backend.managers import procedures as procedure_managers
def __init__(self, filepath: Path | str, submissiontype: "SubmissionType" | None = None, *args, **kwargs):
logger.debug(f"Set submission type: {submissiontype}")
if not submissiontype:
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
else:
self.submissiontype = submissiontype
# if "range_dict" not in kwargs:
# kwargs['range_dict'] = self.submissiontype.info_map
super().__init__(filepath=filepath, range_dict=[dict(sheet="Client Info")], **kwargs)
allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype]
for name in allowed_procedure_types:
if name in self.workbook.sheetnames:
# TODO: check if run with name already exists
add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?")
if add_run.accepted:
# NOTE: recruit parser.
try:
manager = getattr(procedure_managers, name)
except AttributeError:
manager = procedure_managers.DefaultManager
self.manager = manager(proceduretype=name)
pass
super().__init__(filepath=filepath, sheet="Client Info", start_row=1, **kwargs)
# NOTE: move to the manager class.
# allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype]
# for name in allowed_procedure_types:
# if name in self.workbook.sheetnames:
# # TODO: check if run with name already exists
# add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?")
# if add_run.accepted:
# # NOTE: recruit parser.
# try:
# manager = getattr(procedure_managers, name)
# except AttributeError:
# manager = procedure_managers.DefaultManager
# self.manager = manager(proceduretype=name)
# pass
@property
def parsed_info(self):
output = {k:v for k, v in super().parsed_info}
output = {k: v for k, v in super().parsed_info}
try:
output['clientlab'] = output['client_lab']
except KeyError:
pass
# output['submissiontype'] = dict(value=self.submissiontype.name.title())
try:
output['submissiontype'] = output['submission_type']
output['submissiontype']['value'] = self.submissiontype.name.title()
except KeyError:
pass
logger.debug(f"Data: {output}")
output['submissiontype'] = self.submissiontype.name
return output
class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter samples from "sample list" sheet
@@ -128,32 +159,26 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
pyd_name = "PydSample"
default_range_dict = [dict(
header_row=18,
end_row=114,
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, submissiontype: "SubmissionType"|None=None, *args, **kwargs):
def __init__(self, filepath: Path | str, submissiontype: "SubmissionType" | None = None, start_row: int = 1, *args,
**kwargs):
if not submissiontype:
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
else:
self.submissiontype = submissiontype
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.sample_map
super().__init__(filepath=filepath, **kwargs)
super().__init__(filepath=filepath, sheet="Client Info", start_row=start_row, **kwargs)
@property
def parsed_info(self) -> Generator[dict, None, None]:
output = super().parsed_info
for ii, sample in enumerate(output):
logger.debug(f"Parsed info sample: {sample}")
for ii, sample in enumerate(output, start=1):
# logger.debug(f"Parsed info sample: {sample}")
if isinstance(sample["row"], str) and sample["row"].lower() in ascii_lowercase[0:8]:
try:
sample["row"] = row_keys[sample["row"]]
except KeyError:
pass
sample['submission_rank'] = ii + 1
sample['submission_rank'] = ii
yield sample
def to_pydantic(self):

View File

@@ -1,12 +1,12 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from backend.excel.parsers import DefaultTABLEParser, DefaultKEYVALUEParser
import logging
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class ProcedureInfoParser(DefaultKEYVALUEParser):

View File

@@ -0,0 +1,38 @@
from __future__ import annotations
from pathlib import Path
from backend.excel.parsers import DefaultKEYVALUEParser, DefaultTABLEParser
from typing import TYPE_CHECKING
import logging
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class DefaultResultsInfoParser(DefaultKEYVALUEParser):
pyd_name = "PydResults"
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None,
results_type: str | None = "PCR", *args, **kwargs):
if results_type:
self.results_type = results_type
sheet = proceduretype.allowed_result_methods[results_type]['info']['sheet']
start_row = proceduretype.allowed_result_methods[results_type]['info']['start_row']
super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args,
**kwargs)
class DefaultResultsSampleParser(DefaultTABLEParser):
pyd_name = "PydResults"
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None,
results_type: str | None = "PCR", *args, **kwargs):
if results_type:
self.results_type = results_type
sheet = proceduretype.allowed_result_methods[results_type]['sample']['sheet']
start_row = proceduretype.allowed_result_methods[results_type]['sample']['start_row']
super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args,
**kwargs)
from .pcr_results_parser import PCRInfoParser, PCRSampleParser

View File

@@ -1,57 +1,42 @@
"""
"""
from __future__ import annotations
import logging
from backend.db.models import Run, Sample, Procedure, ProcedureSampleAssociation
from backend.excel.parsers import DefaultKEYVALUEParser, DefaultTABLEParser
from typing import Generator, TYPE_CHECKING
from backend.db.models import ProcedureSampleAssociation
from backend.excel.parsers.results_parsers import DefaultResultsInfoParser, DefaultResultsSampleParser
from pathlib import Path
if TYPE_CHECKING:
from backend.validators.pydant import PydSample
logger = logging.getLogger(f"submissions.{__name__}")
# class PCRResultsParser(DefaultParser):
# pass
class PCRInfoParser(DefaultResultsInfoParser):
class PCRInfoParser(DefaultKEYVALUEParser):
pyd_name = "PydResults"
default_range_dict = [dict(
start_row=1,
end_row=24,
key_column=1,
value_column=2,
sheet="Results"
)]
def __init__(self, filepath: Path | str, range_dict: dict | None = None, procedure=None):
super().__init__(filepath=filepath, range_dict=range_dict)
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs):
self.results_type = "PCR"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype)
def to_pydantic(self):
# from backend.db.models import Procedure
data = dict(results={k:v for k, v in self.parsed_info}, filepath=self.filepath,
result_type="PCR")
data = dict(results={k: v for k, v in self.parsed_info}, filepath=self.filepath,
result_type=self.results_type)
return self._pyd_object(**data, parent=self.procedure)
class PCRSampleParser(DefaultTABLEParser):
class PCRSampleParser(DefaultResultsSampleParser):
"""Object to pull data from Design and Analysis PCR export file."""
pyd_name = "PydResults"
default_range_dict = [dict(
header_row=25,
sheet="Results"
)]
def __init__(self, filepath: Path | str, range_dict: dict | None = None, procedure=None):
super().__init__(filepath=filepath, range_dict=range_dict)
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs):
self.results_type = "PCR"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype)
@property
def parsed_info(self):
def parsed_info(self) -> Generator[dict, None, None]:
output = [item for item in super().parsed_info]
merge_column = "sample"
sample_names = list(set([item['sample'] for item in output]))
for sample in sample_names:
multi = dict(result_type="PCR")
@@ -60,17 +45,14 @@ class PCRSampleParser(DefaultTABLEParser):
multi[soi['target']] = {k: v for k, v in soi.items() if k != "target" and k != "sample"}
yield {sample: multi}
def to_pydantic(self):
logger.debug("running to pydantic")
def to_pydantic(self) -> Generator["PydSample", None, None]:
for item in self.parsed_info:
# sample_obj = Sample.query(sample_id=list(item.keys())[0])
# NOTE: Ensure that only samples associated with the procedure are used.
try:
sample_obj = next(
(sample for sample in self.procedure.sample if sample.sample_id == list(item.keys())[0]))
except StopIteration:
continue
# logger.debug(f"Sample object {sample_obj}")
assoc = ProcedureSampleAssociation.query(sample=sample_obj, procedure=self.procedure)
if assoc and not isinstance(assoc, list):
output = self._pyd_object(results=list(item.values())[0], parent=assoc)

View File

@@ -206,7 +206,10 @@ class ConcentrationMaker(ReportArchetype):
self.subs = Run.query(start_date=start_date, end_date=end_date,
submissiontype_name=submission_type, page_size=0)
# self.sample = flatten_list([sub.get_provisional_controls(controls_only=controls_only) for sub in self.run])
self.samples = flatten_list([sub.get_provisional_controls(include=include) for sub in self.subs])
try:
self.samples = flatten_list([sub.get_provisional_controls(include=include) for sub in self.subs])
except AttributeError:
self.samples = []
self.records = [self.build_record(sample) for sample in self.samples]
self.df = DataFrame.from_records(self.records)
self.sheet_name = "Concentration"

View File

@@ -1,17 +1,17 @@
import logging
import re
from io import BytesIO
from pathlib import Path
import logging, sys
from datetime import datetime, date
from pprint import pformat
from typing import Any
from types import NoneType
from typing import Any, Literal
from openpyxl.reader.excel import load_workbook
from openpyxl.styles import Alignment, PatternFill
from openpyxl.utils import get_column_letter
from openpyxl.workbook.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from pandas import DataFrame
from backend.db.models import BaseClass, ProcedureType
from backend.validators.pydant import PydBaseClass
from tools import flatten_list, create_plate_grid, sort_dict_by_list
logger = logging.getLogger(f"submissions.{__name__}")
@@ -21,155 +21,220 @@ class DefaultWriter(object):
def __repr__(self):
return f"{self.__class__.__name__}<{self.filepath.stem}>"
def __init__(self, pydant_obj, proceduretype: ProcedureType|None=None, range_dict: dict | None = None, *args, **kwargs):
# self.filepath = output_filepath
def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs):
self.pydant_obj = pydant_obj
self.proceduretype = proceduretype
if range_dict:
self.range_dict = range_dict
else:
self.range_dict = self.__class__.default_range_dict
@classmethod
def stringify_value(cls, value:Any) -> str:
def stringify_value(cls, value: Any) -> str:
if isinstance(value, dict):
try:
value = value['value']
except (KeyError, ValueError):
try:
value = value['name']
except (KeyError, ValueError):
return
match value:
case x if issubclass(value.__class__, BaseClass):
value = value.name
case x if issubclass(value.__class__, PydBaseClass):
value = value.name
case dict():
try:
value = value['value']
except ValueError:
try:
value = value['name']
except ValueError:
value = value.__str__()
case bytes() | list():
value = None
case datetime() | date():
value = value.strftime("%Y-%m-%d")
case _:
value = str(value)
# logger.debug(f"Returning value: {value}")
return value
@classmethod
def prettify_key(cls, value:str) -> str:
value = value.replace("type", " type").strip()
value = value.title()
return value
def prettify_key(cls, key: str) -> str:
key = key.replace("type", " type").strip()
key = key.replace("_", " ")
key = key.title()
key = key.replace("Id", "ID")
return key
def write_to_workbook(self, workbook: Workbook):
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs):
logger.debug(f"Writing to workbook with {self.__class__.__name__}")
if not start_row:
try:
start_row = self.__class__.start_row
except AttributeError as e:
logger.error(f"Couldn't get start row due to {e}")
start_row = 1
if not sheet:
sheet = self.__class__.sheet
self.sheet = sheet
if self.sheet not in workbook.sheetnames:
try:
self.worksheet = workbook["Sheet"]
self.worksheet.title = self.sheet
except KeyError:
self.worksheet = workbook.create_sheet(self.sheet)
else:
self.worksheet = workbook[self.sheet]
self.worksheet = self.prewrite(self.worksheet, start_row=start_row)
self.start_row = self.delineate_start_row(start_row=start_row)
self.end_row = self.delineate_end_row(start_row=start_row)
logger.debug(f"{self.__class__.__name__} Start row: {self.start_row}, end row: {self.end_row}")
return workbook
def delineate_start_row(self, start_row: int = 1):
logger.debug(f"Attempting to find start row from {start_row}")
for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row):
if all([item.value is None for item in row]):
logger.debug(f"Returning {iii} for start row.")
return iii
if self.worksheet.max_row == 1:
return self.worksheet.max_row + 1
else:
return self.worksheet.max_row + 2
def prewrite(self, worksheet: Worksheet, start_row: int) -> Worksheet:
return worksheet
def columns_best_fit(self, worksheet: Worksheet) -> None:
"""
Make all columns best fit
"""
for col in worksheet.columns:
setlen = 0
column = col[0].column_letter # Get the column name
for cell in col:
if len(str(cell.value)) > setlen:
setlen = len(str(cell.value))
set_col_width = setlen + 5
# Setting the column width
worksheet.column_dimensions[column].width = set_col_width
return worksheet
class DefaultKEYVALUEWriter(DefaultWriter):
sheet = "Client Info"
start_row = 2
exclude = []
key_order = []
default_range_dict = [dict(
start_row=2,
end_row=18,
key_column=1,
value_column=2,
sheet="Sample List"
)]
def __init__(self, pydant_obj, proceduretype: ProcedureType|None=None, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, range_dict=range_dict, *args, **kwargs)
def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs)
self.fill_dictionary = self.pydant_obj.improved_dict()
def delineate_end_row(self, start_row: int = 1):
data_length = len(self.fill_dictionary)
return data_length + start_row
@classmethod
def check_location(cls, locations: list, sheet: str):
logger.debug(f"Checking for location against {sheet}")
return any([item['sheet'] == sheet for item in locations])
def write_to_workbook(self, workbook: Workbook) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook)
for rng in self.range_dict:
rows = range(rng['start_row'], rng['end_row'] + 1)
worksheet = workbook[rng['sheet']]
try:
for ii, (k, v) in enumerate(self.fill_dictionary.items(), start=rng['start_row']):
try:
worksheet.cell(column=rng['key_column'], row=rows[ii], value=self.prettify_key(k))
worksheet.cell(column=rng['value_column'], row=rows[ii], value=self.stringify_value(v))
except IndexError:
logger.error(f"Not enough rows: {len(rows)} for index {ii}")
except ValueError as e:
logger.error(self.fill_dictionary)
raise e
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row)
dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in self.__class__.exclude}
dictionary = sort_dict_by_list(dictionary=dictionary, order_list=self.key_order)
for ii, (k, v) in enumerate(dictionary.items(), start=self.start_row):
value = self.stringify_value(value=v)
if value is None:
continue
self.worksheet.cell(column=1, row=ii, value=self.prettify_key(k))
self.worksheet.cell(column=2, row=ii, value=value)
self.worksheet = self.postwrite(self.worksheet)
return workbook
def postwrite(self, worksheet: Worksheet) -> Worksheet:
worksheet = self.columns_best_fit(worksheet=worksheet)
return worksheet
class DefaultTABLEWriter(DefaultWriter):
sheet = "Client Info"
start_row = 19
header_order = []
exclude = []
default_range_dict = [dict(
header_row=19,
sheet="Sample List"
)]
@classmethod
def get_row_count(cls, worksheet: Worksheet, range_dict:dict):
if "end_row" in range_dict.keys():
list_df = DataFrame([item for item in worksheet.values][range_dict['header_row'] - 1:range_dict['end_row'] - 1])
else:
list_df = DataFrame([item for item in worksheet.values][range_dict['header_row'] - 1:])
def get_row_count(self, start_row: int = 1):
list_df = DataFrame([item for item in self.worksheet.values][start_row - 1:])
row_count = list_df.shape[0]
return row_count
def pad_samples_to_length(self, row_count, column_names):
def delineate_end_row(self, start_row: int = 1) -> int:
return start_row + len(self.pydant_obj) + 1
def pad_samples_to_length(self, row_count,
mode: Literal["submission", "procedure"] = "submission"): #, column_names):
from backend import PydSample
output_samples = []
for iii in range(1, row_count + 1):
# logger.debug(f"Submission rank: {iii}")
if isinstance(self.pydant_obj, list):
iterator = self.pydant_obj
else:
iterator = self.pydant_obj.sample
try:
sample = next((item for item in iterator if item.submission_rank == iii))
sample = next((item for item in iterator if getattr(item, f"{mode}_rank") == iii))
except StopIteration:
sample = PydSample(sample_id="")
for column in column_names:
setattr(sample, column[0], "")
sample.submission_rank = iii
sample.plate_rank = iii
# logger.debug(f"Appending {sample.sample_id}")
# logger.debug(f"Iterator now: {[item.submission_rank for item in iterator]}")
sample.procedure_rank = iii
if mode == "procedure":
if all([item.row for item in self.pydant_obj.sample]):
rows, columns = self.pydant_obj.rows_columns_count
grid = create_plate_grid(rows=rows, columns=columns)
sample.row, sample.column = grid[sample.procedure_rank]
output_samples.append(sample)
return sorted(output_samples, key=lambda x: x.submission_rank)
return sorted(output_samples, key=lambda x: getattr(x, f"{mode}_rank"))
def write_to_workbook(self, workbook: Workbook) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook)
for rng in self.range_dict:
list_worksheet = workbook[rng['sheet']]
column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
for iii, object in enumerate(self.pydant_obj, start=1):
# logger.debug(f"Writing object: {object}")
write_row = rng['header_row'] + iii
for column in column_names:
if column[0].lower() in ["well", "row", "column"]:
continue
write_column = column[1]
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row, *args, **kwargs)
self.header_list = self.sort_header_row(list(set(flatten_list([item.fields for item in self.pydant_obj]))))
logger.debug(f"Header row: {self.header_list}")
self.worksheet = self.write_header_row(worksheet=self.worksheet)
for iii, object in enumerate(self.pydant_obj, start=1):
write_row = self.start_row + iii
for header in self.header_list:
try:
column = next((cell for cell in self.worksheet[self.start_row] if
cell.value == header.replace("_", " ").title()))
except StopIteration:
logger.warning(f'Could not find column for {header.replace("_", " ").title()}')
continue
column = column.column
try:
value = object.improved_dict()[header.lower().replace(" ", "")]
except (AttributeError, KeyError) as e:
try:
value = getattr(object, column[0].lower().replace(" ", ""))
except AttributeError:
try:
value = getattr(object, column[0].lower().replace("_", ""))
except AttributeError:
value = ""
# logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}")
list_worksheet.cell(row=write_row, column=write_column, value=self.stringify_value(value))
value = object.improved_dict()[header.lower().replace(" ", "_")]
except (AttributeError, KeyError):
value = ""
self.worksheet.cell(row=write_row, column=column, value=self.stringify_value(value))
self.worksheet = self.postwrite(self.worksheet)
return workbook
@classmethod
def construct_column_names(cls, column_item):
column = column_item.column
match column_item.value:
case str():
value = column_item.value.lower().replace(" ", "_")
case _:
value = column_item.value
return value, column
def sort_header_row(cls, header_list: list) -> list:
output = []
logger.debug(cls.exclude)
for item in cls.header_order:
if item in [header for header in header_list if header not in cls.exclude]:
output.append(header_list.pop(header_list.index(item)))
return output + sorted([item for item in header_list if item not in cls.exclude])
def write_header_row(self, worksheet: Worksheet) -> Worksheet:
for iii, header in enumerate(self.header_list, start=1):
worksheet.cell(row=self.start_row, column=iii, value=header.replace("_", " ").title())
worksheet.cell(row=self.start_row, column=iii).alignment = Alignment(horizontal='center')
worksheet.cell(row=self.start_row, column=iii).fill = PatternFill(start_color='2DE733', end_color='2DE733', fill_type="solid")
return worksheet
def postwrite(self, worksheet: Worksheet) -> Worksheet:
worksheet = self.columns_best_fit(worksheet=worksheet)
return worksheet
from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter

View File

@@ -1,76 +1,52 @@
from __future__ import annotations
import logging
from pathlib import Path
from pprint import pformat
from openpyxl.workbook import Workbook
from openpyxl.styles import Alignment
from openpyxl.worksheet.worksheet import Worksheet
from typing import TYPE_CHECKING
from . import DefaultKEYVALUEWriter, DefaultTABLEWriter
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class ClientSubmissionInfoWriter(DefaultKEYVALUEWriter):
exclude = ["name", "id", "clientlab", "filepath"]
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs)
def __init__(self, pydant_obj, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
logger.debug(f"{self.__class__.__name__} recruited!")
def write_to_workbook(self, workbook: Workbook) -> Workbook:
# workbook = super().write_to_workbook(workbook=workbook)
logger.debug(f"Skipped super.")
for rng in self.range_dict:
worksheet = workbook[rng['sheet']]
for key, value in self.fill_dictionary.items():
logger.debug(f"Checking: key {key}, value {str(value)[:64]}")
if isinstance(value, bytes):
continue
try:
check = self.check_location(value['location'], rng['sheet'])
except TypeError:
check = False
if not check:
continue
# relevant_values[k] = v
logger.debug(f"Location passed for {value['location']}")
for location in value['location']:
if location['sheet'] != rng['sheet']:
continue
logger.debug(f"Writing {value} to row {location['row']}, column {location['value_column']}")
try:
worksheet.cell(location['row'], location['value_column'], value=value['value'])
except KeyError:
worksheet.cell(location['row'], location['value_column'], value=value['name'])
return workbook
def prewrite(self, worksheet: Worksheet, start_row: int) -> Worksheet:
# worksheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
worksheet.cell(row=start_row, column=1, value="Submitter Info")
worksheet.cell(row=start_row, column=1).alignment = Alignment(horizontal="center")
return worksheet
class ClientSubmissionSampleWriter(DefaultTABLEWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook)
# logger.debug(f"\n\nHello from {self.__class__.__name__} with range_dict: {pformat(self.range_dict)}")
for rng in self.range_dict:
list_worksheet = workbook[rng['sheet']]
row_count = self.get_row_count(list_worksheet, rng)
column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names)
# logger.debug(f"Samples: {pformat(samples)}")
for sample in samples:
write_row = rng['header_row'] + sample.submission_rank
# logger.debug(f"Writing sample: {sample} to row {write_row}")
for column in column_names:
# logger.debug(f"At column {column}")
if column[0].lower() in ["well", "row", "column"]:
continue
write_column = column[1]
try:
# value = sample[column[0]]
value = getattr(sample, column[0])
except AttributeError:
value = ""
# logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}")
list_worksheet.cell(row=write_row, column=write_column, value=value)
exclude = ['id', 'enabled', 'procedure_rank', "name"]
header_order = ["submission_rank", "sample_id"]
def __init__(self, pydant_obj, proceduretype: "ProcedureType" | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs)
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs) -> Workbook:
self.pydant_obj = self.pad_samples_to_length(row_count=self.pydant_obj.max_sample_rank)#, column_names=header_list)
workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row, *args, **kwargs)
self.worksheet = self.postwrite(self.worksheet)
return workbook
def postwrite(self, worksheet: Worksheet) -> Worksheet:
worksheet = super().postwrite(worksheet)
for row in worksheet.iter_rows(min_row=self.start_row, max_row=self.end_row):
for cell in row:
if cell.value in [0, "0", "None"]:
cell.value = ""
cell.alignment = Alignment(horizontal="center")
return worksheet

View File

@@ -6,96 +6,83 @@ from pprint import pformat
from openpyxl.workbook import Workbook
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class ProcedureInfoWriter(DefaultKEYVALUEWriter):
default_range_dict = [dict(
start_row=1,
end_row=6,
key_column=1,
value_column=2,
sheet=""
)]
start_row = 1
header_order = []
exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits',
'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent',
'reagentrole', 'results', 'sample', 'tips']
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs)
exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits', 'procedureequipmentassociation',
'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', 'reagentrole',
'results', 'sample', 'tips']
self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in exclude}
def __init__(self, pydant_obj, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in self.__class__.exclude}
# logger.debug(pformat(self.fill_dictionary))
for rng in self.range_dict:
if "sheet" not in rng or rng['sheet'] == "":
rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
# for rng in self.range_dict:
# if "sheet" not in rng or rng['sheet'] == "":
# rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.pydant_obj.proceduretype.name} Quality")
return workbook
class ProcedureReagentWriter(DefaultTABLEWriter):
default_range_dict = [dict(
header_row=8
)]
exclude = ["id", "comments", "missing"]
header_order = ["reagentrole", "name", "lot", "expiry"]
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs)
for rng in self.range_dict:
if "sheet" not in rng:
rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
def __init__(self, pydant_obj, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
self.sheet = f"{self.pydant_obj.proceduretype.name} Quality"
self.pydant_obj = self.pydant_obj.reagent
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
logger.debug(self.pydant_obj)
workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet)
return workbook
class ProcedureEquipmentWriter(DefaultTABLEWriter):
default_range_dict = [dict(
header_row=14
)]
exclude = ['id']
header_order = ['equipmentrole', 'name', 'asset_number', 'process', 'tips']
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs)
for rng in self.range_dict:
if "sheet" not in rng:
rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
self.sheet = f"{self.pydant_obj.proceduretype.name} Quality"
self.pydant_obj = self.pydant_obj.equipment
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
logger.debug(self.pydant_obj)
workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet)
return workbook
class ProcedureSampleWriter(DefaultTABLEWriter):
default_range_dict = [dict(
header_row=21
)]
exclude = ['id', 'enabled', 'name', "submission_rank"]
header_order = ['procedure_rank', 'sample_id']
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs)
for rng in self.range_dict:
if "sheet" not in rng:
rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
self.pydant_obj = self.pydant_obj.sample
self.sheet = f"{self.pydant_obj.proceduretype.name} Quality"
# self.pydant_obj = self.pydant_obj.sample
self.pydant_obj = self.pad_samples_to_length(row_count=pydant_obj.max_sample_rank, mode="procedure")
def write_to_workbook(self, workbook: Workbook) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook)
for rng in self.range_dict:
list_worksheet = workbook[rng['sheet']]
row_count = self.get_row_count(list_worksheet, rng)
column_names = [(item.value.lower().replace(" ", "_"), item.column) for item in
list_worksheet[rng['header_row']] if item.value]
samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names)
samples = sorted(samples, key=lambda x: x.plate_rank)
# samples = self.pydant_obj
# logger.debug(f"Samples: {[item.submission_rank for item in samples]}")
for sample in samples:
# logger.debug(f"Writing sample: {sample}")
if sample.row == 0 or sample.column == 0:
continue
write_row = rng['header_row'] + sample.plate_rank
for column in column_names:
if column[0].lower() in ["well"]:#, "row", "column"]:
continue
write_column = column[1]
try:
value = getattr(sample, column[0])
except KeyError:
value = ""
# logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}")
list_worksheet.cell(row=write_row, column=write_column, value=value)
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
logger.debug(self.pydant_obj)
workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet)
return workbook

View File

@@ -1,31 +1,46 @@
from __future__ import annotations
import logging
from pathlib import Path
from typing import Generator
from pprint import pformat
from typing import Generator, TYPE_CHECKING
from openpyxl import Workbook
from openpyxl.styles import Alignment
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from tools import flatten_list
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class PCRInfoWriter(DefaultKEYVALUEWriter):
default_range_dict = [dict(
start_row=1,
end_row=24,
key_column=1,
value_column=2,
sheet="Results"
)]
start_row = 1
def write_to_workbook(self, workbook: Workbook) -> Workbook:
worksheet = workbook[f"{self.proceduretype.name} Results"]
for key, value in self.fill_dictionary['result'].items():
# logger.debug(f"Filling in {key} with {value}")
worksheet.cell(value['location']['row'], value['location']['key_column'], value=key.replace("_", " ").title())
worksheet.cell(value['location']['row'], value['location']['value_column'], value=value['value'])
def __init__(self, pydant_obj, proceduretype: "ProcedureType" | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs)
self.fill_dictionary = self.pydant_obj.improved_dict()['result']
logger.debug(pformat(self.fill_dictionary))
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.proceduretype.name} Results")
# if not start_row:
# try:
# start_row = self.__class__.start_row
# except AttributeError as e:
# logger.error(f"Couldn't get start row due to {e}")
# start_row = 1
# # worksheet = workbook[f"{self.proceduretype.name} Results"]
# self.worksheet = workbook.create_sheet(f"{self.proceduretype.name} Results")
# self.worksheet = self.prewrite(self.worksheet, start_row=start_row)
# # self.start_row = self.delineate_start_row(start_row=start_row)
# # self.end_row = self.delineate_end_row(start_row=start_row)
# # for key, value in self.fill_dictionary['result'].items():
# # # logger.debug(f"Filling in {key} with {value}")
# # self.worksheet.cell(value['location']['row'], value['location']['key_column'], value=key.replace("_", " ").title())
# # self.worksheet.cell(value['location']['row'], value['location']['value_column'], value=value['value'])
return workbook
@@ -33,7 +48,7 @@ class PCRSampleWriter(DefaultTABLEWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook:
worksheet = workbook[f"{self.proceduretype.name} Results"]
header_row = self.proceduretype.allowed_result_methods['PCR']['sample']['header_row']
header_row = self.proceduretype.allowed_result_methods['PCR']['sample']['start_row']
proto_columns = [(1, "sample"), (2, "target")]
columns = []
for iii, header in enumerate(self.column_headers, start=3):

View File

@@ -14,7 +14,6 @@ class DefaultManager(object):
def __init__(self, parent, input_object: Path | str | None = None):
logger.debug(f"FName before correction: {type(input_object)}")
# if input_object != "no_file":
self.parent = parent
match input_object:
case str():

View File

@@ -1,16 +1,14 @@
from __future__ import annotations
import logging
from io import BytesIO
import sys
from typing import TYPE_CHECKING
from pathlib import Path
from openpyxl.reader.excel import load_workbook
from openpyxl.workbook import Workbook
from backend.validators import RSLNamer
from backend.managers import DefaultManager
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser
from backend.excel.writers.clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter
if TYPE_CHECKING:
from backend.db.models import SubmissionType
@@ -42,24 +40,26 @@ class DefaultClientSubmissionManager(DefaultManager):
def to_pydantic(self):
self.info_parser = ClientSubmissionInfoParser(filepath=self.input_object, submissiontype=self.submissiontype)
self.sample_parser = ClientSubmissionSampleParser(filepath=self.input_object,
submissiontype=self.submissiontype)
logger.debug(f"Info Parser range dict: {self.info_parser.range_dict}")
submissiontype=self.submissiontype,
start_row=self.info_parser.end_row)
logger.debug(self.sample_parser.__dict__)
self.clientsubmission = self.info_parser.to_pydantic()
self.clientsubmission.full_batch_size = self.sample_parser.end_row - self.sample_parser.start_row
self.clientsubmission.sample = self.sample_parser.to_pydantic()
return self.clientsubmission
# def to_pydantic(self):
# self.clientsubmission = self.info_parser.to_pydantic()
# self.clientsubmission.sample = self.sample_parser.to_pydantic()
def write(self):
workbook: Workbook = load_workbook(BytesIO(self.submissiontype.template_file))
def write(self, workbook: Workbook) -> Workbook:
# workbook: Workbook = load_workbook(BytesIO(self.submissiontype.template_file))
self.info_writer = ClientSubmissionInfoWriter(pydant_obj=self.pyd)
assert isinstance(self.info_writer, ClientSubmissionInfoWriter)
logger.debug("Attempting write.")
workbook = self.info_writer.write_to_workbook(workbook)
self.sample_writer = ClientSubmissionSampleWriter(pydant_obj=self.pyd)
workbook = self.sample_writer.write_to_workbook(workbook)
# workbook.save(output_path)
workbook = self.sample_writer.write_to_workbook(workbook, start_row=self.info_writer.worksheet.max_row + 1)
return workbook

View File

@@ -56,41 +56,42 @@ class DefaultProcedureManager(DefaultManager):
self.samples = self.sample_parser.to_pydantic()
self.equipment = self.equipment_parser.to_pydantic()
def write(self, worksheet_only: bool=False) -> Workbook:
workbook = load_workbook(BytesIO(self.proceduretype.template_file))
def write(self, workbook: Workbook) -> Workbook:
# workbook = load_workbook(BytesIO(self.proceduretype.template_file))
try:
info_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}InfoWriter")
except AttributeError:
info_writer = procedure_writers.ProcedureInfoWriter
self.info_writer = info_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.info_map)
self.info_writer = info_writer(pydant_obj=self.pyd)
workbook = self.info_writer.write_to_workbook(workbook)
try:
reagent_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}ReagentWriter")
except AttributeError:
reagent_writer = procedure_writers.ProcedureReagentWriter
self.reagent_writer = reagent_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.reagent_map)
self.reagent_writer = reagent_writer(pydant_obj=self.pyd)
workbook = self.reagent_writer.write_to_workbook(workbook)
try:
equipment_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}EquipmentWriter")
except AttributeError:
equipment_writer = procedure_writers.ProcedureEquipmentWriter
self.equipment_writer = equipment_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.equipment_map)
self.equipment_writer = equipment_writer(pydant_obj=self.pyd)
workbook = self.equipment_writer.write_to_workbook(workbook)
try:
sample_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}SampleWriter")
except AttributeError:
sample_writer = procedure_writers.ProcedureSampleWriter
self.sample_writer = sample_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.sample_map)
self.sample_writer = sample_writer(pydant_obj=self.pyd)
workbook = self.sample_writer.write_to_workbook(workbook)
# logger.debug(self.pyd.result)
# TODO: Find way to group results by result_type.
# # logger.debug(self.pyd.result)
# # TODO: Find way to group results by result_type.
for result in self.pyd.result:
logger.debug(f"Writing {result.result_type}")
Writer = getattr(results_writers, f"{result.result_type}InfoWriter")
res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype)
workbook = res_info_writer.write_to_workbook(workbook=workbook)
# sample_results = [sample.result for sample in self.pyd.sample]
# logger.debug(pformat(self.pyd.sample_results))
Writer = getattr(results_writers, "PCRSampleWriter")
res_sample_writer = Writer(pydant_obj=self.pyd.sample_results, proceduretype=self.proceduretype)
workbook = res_sample_writer.write_to_workbook(workbook=workbook)
# # sample_results = [sample.result for sample in self.pyd.sample]
# # logger.debug(pformat(self.pyd.sample_results))
# Writer = getattr(results_writers, "PCRSampleWriter")
# res_sample_writer = Writer(pydant_obj=self.pyd.sample_results, proceduretype=self.proceduretype)
# workbook = res_sample_writer.write_to_workbook(workbook=workbook)
return workbook

View File

@@ -26,7 +26,7 @@ class PCRManager(DefaultResultsManager):
def parse(self):
self.info_parser = PCRInfoParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = PCRSampleParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = PCRSampleParser(filepath=self.fname, procedure=self.procedure, start_row=self.info_parser.end_row)
def write(self):
workbook = load_workbook(BytesIO(self.procedure.proceduretype.template_file))

View File

@@ -18,13 +18,14 @@ class DefaultRunManager(DefaultManager):
from backend.managers import DefaultClientSubmissionManager, DefaultProcedureManager
logger.debug(f"Initializing write")
clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype)
workbook = clientsubmission.write()
workbook = Workbook()
workbook = clientsubmission.write(workbook=workbook)
for procedure in self.pyd.procedure:
# logger.debug(f"Running procedure: {pformat(procedure.__dict__)}")
# # logger.debug(f"Running procedure: {pformat(procedure.__dict__)}")
procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure)
wb: Workbook = procedure.write()
for sheetname in wb.sheetnames:
source_sheet = wb[sheetname]
ws = workbook.create_sheet(sheetname)
copy_xl_sheet(source_sheet, ws)
workbook: Workbook = procedure.write(workbook=workbook)
# for sheetname in wb.sheetnames:
# source_sheet = wb[sheetname]
# ws = workbook.create_sheet(sheetname)
# copy_xl_sheet(source_sheet, ws)
return workbook

View File

@@ -43,22 +43,28 @@ class ClientSubmissionNamer(DefaultNamer):
logger.warning(f"Getting submissiontype from file properties failed, falling back on preparse.\nDepending on excel structure this might yield an incorrect submissiontype")
sub_type = self.get_subtype_from_preparse()
if not sub_type:
logger.warning(f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on excel structure this might yield an incorrect submissiontype")
logger.warning(f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on file name this might yield an incorrect submissiontype")
sub_type = self.get_subtype_from_regex()
if not sub_type:
logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.")
sub_type = SubmissionType.query(name="Test")
logger.debug(f"Submission Type: {sub_type}")
sys.exit()
return sub_type
def get_subtype_from_regex(self):
def get_subtype_from_regex(self) -> SubmissionType:
regex = SubmissionType.regex
m = regex.search(self.filepath.__str__())
try:
sub_type = m.lastgroup
sub_type = SubmissionType.query(name=sub_type)
except AttributeError as e:
sub_type = None
logger.critical(f"No procedure type found or procedure type found!: {e}")
return sub_type
def get_subtype_from_preparse(self):
def get_subtype_from_preparse(self) -> SubmissionType:
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser
parser = ClientSubmissionInfoParser(self.filepath)
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None)
@@ -67,7 +73,7 @@ class ClientSubmissionNamer(DefaultNamer):
sub_type = None
return sub_type
def get_subtype_from_properties(self):
def get_subtype_from_properties(self) -> SubmissionType:
wb = load_workbook(self.filepath)
# NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";")
@@ -238,11 +244,13 @@ class RSLNamer(object):
today = datetime.now()
if isinstance(today, str):
today = datetime.strptime(today, "%Y-%m-%d")
if "name" in data.keys():
plate_number = data['name'].split("-")[-1][0]
else:
previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype'])
plate_number = len(previous) + 1
# if "name" in data.keys():
# logger.debug(f"Found name: {data['name']}")
# plate_number = data['name'].split("-")[-1][0]
# else:
previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype'])
plate_number = len(previous) + 1
logger.debug(f"Using plate number: {plate_number}")
return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}"
@classmethod

View File

@@ -2,21 +2,19 @@
Contains pydantic models and accompanying validators
"""
from __future__ import annotations
import uuid, re, logging, csv, sys, string
from pydantic import BaseModel, field_validator, Field, model_validator, PrivateAttr
import re, logging, csv, sys, string
from pydantic import BaseModel, field_validator, Field, model_validator
from datetime import date, datetime, timedelta
from dateutil.parser import parse
from dateutil.parser import ParserError
from typing import List, Tuple, Literal
from types import GeneratorType
import backend
from . import RSLNamer
from pathlib import Path
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list
from backend.db import models
from backend.db.models import *
from sqlalchemy.exc import StatementError, IntegrityError
from sqlalchemy.exc import StatementError
from sqlalchemy.orm.properties import ColumnProperty
from sqlalchemy.orm.relationships import _RelationshipDeclared
from sqlalchemy.orm.attributes import InstrumentedAttribute
@@ -26,9 +24,9 @@ logger = logging.getLogger(f"submission.{__name__}")
class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
_sql_object: ClassVar = None
# _misc_info: dict|None = None
_sql_object: ClassVar = None
key_value_order: ClassVar = []
@model_validator(mode="before")
@classmethod
@@ -37,7 +35,8 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
output = {}
try:
items = data.items()
except AttributeError:
except AttributeError as e:
logger.error(f"Could not prevalidate {cls.__name__} due to {e}")
return data
for key, value in items:
new_key = key.replace("_", "")
@@ -69,10 +68,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
def __init__(self, **data):
# NOTE: Grab the sql model for validation purposes.
self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", ""))
# try:
# self.template_file = self.__class__._sql_object.template_file
# except AttributeError:
# pass
super().__init__(**data)
def filter_field(self, key: str) -> Any:
@@ -111,13 +106,11 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
output = {k: getattr(self, k) for k in fields}
else:
output = {k: self.filter_field(k) for k in fields}
if hasattr(self, "misc_info") and "info_placement" in self.misc_info:
for k, v in output.items():
try:
output[k]['location'] = [item['location'] for item in self.misc_info['info_placement'] if
item['name'] == k]
except (TypeError, KeyError):
continue
if "misc_info" in output.keys():
for k, v in output['misc_info'].items():
if k not in output.keys():
output[k] = v
del output['misc_info']
return output
def to_sql(self):
@@ -129,6 +122,19 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}")
return sql
@property
def fields(self):
output = []
for k, v in self.improved_dict().items():
match v:
case str() | int() | float() | datetime() | date():
output.append(k)
case x if issubclass(v.__class__, PydBaseClass):
output.append(k)
case _:
continue
return list(set(output))
class PydReagent(PydBaseClass):
lot: str | None
@@ -496,7 +502,7 @@ class PydEquipment(PydBaseClass):
return {k: getattr(self, k) for k in fields}
class PydRun(PydBaseClass, extra='allow'):
class PydRun(PydBaseClass): #, extra='allow'):
clientsubmission: PydClientSubmission | None = Field(default=None)
rsl_plate_number: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
@@ -1154,65 +1160,6 @@ class PydEquipmentRole(BaseModel):
return RoleComboBox(parent=parent, role=self, used=used)
# class PydPCRControl(BaseModel):
#
# name: str
# subtype: str
# target: str
# ct: float
# reagent_lot: str
# submitted_date: datetime #: Date submitted to Robotics
# procedure_id: int
# controltype_name: str
#
# @report_result
# def to_sql(self):
# report = Report
# instance = PCRControl.query(name=self.name)
# if not instance:
# instance = PCRControl()
# for key in self.model_fields:
# field_value = self.__getattribute__(key)
# if instance.__getattribute__(key) != field_value:
# instance.__setattr__(key, field_value)
# return instance, report
#
#
# class PydIridaControl(BaseModel, extra='ignore'):
#
# name: str
# contains: list | dict #: unstructured hashes in contains.tsv for each organism
# matches: list | dict #: unstructured hashes in matches.tsv for each organism
# kraken: list | dict #: unstructured output from kraken_report
# subtype: Literal["ATCC49226", "ATCC49619", "EN-NOS", "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"]
# refseq_version: str #: version of refseq used in fastq parsing
# kraken2_version: str
# kraken2_db_version: str
# sample_id: int
# submitted_date: datetime #: Date submitted to Robotics
# procedure_id: int
# controltype_name: str
#
# @field_validator("refseq_version", "kraken2_version", "kraken2_db_version", mode='before')
# @classmethod
# def enforce_string(cls, value):
# if not value:
# value = ""
# return value
#
# @report_result
# def to_sql(self):
# report = Report()
# instance = IridaControl.query(name=self.name)
# if not instance:
# instance = IridaControl()
# for key in self.model_fields:
# field_value = self.__getattribute__(key)
# if instance.__getattribute__(key) != field_value:
# instance.__setattr__(key, field_value)
# return instance, report
class PydProcess(PydBaseClass, extra="allow"):
name: str
version: str = Field(default="1")
@@ -1404,6 +1351,27 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
value = None
return value
@property
def rows_columns_count(self) -> tuple[int, int]:
try:
proc: ProcedureType = Procedure.query(name=self.name).proceduretype
except AttributeError as e:
logger.error(f"Can't get rows, columns due to {e}")
return 0, 0
return proc.plate_rows, proc.plate_columns
@property
def max_sample_rank(self) -> int:
rows, columns = self.rows_columns_count
output = rows * columns
if output > 0:
return output
else:
try:
return max([item.procedure_rank for item in self.sample])
except TypeError:
return len(self.sample)
def update_kittype_reagentroles(self, kittype: str | KitType):
if kittype == self.__class__.model_fields['kittype'].default['value']:
return
@@ -1471,7 +1439,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sample.well_id = sample_dict['sample_id']
sample.row = row
sample.column = column
sample.plate_rank = sample_dict['index']
sample.procedure_rank = sample_dict['index']
logger.debug(f"Sample of interest: {sample.improved_dict()}")
# logger.debug(f"Updated samples:\n{pformat(self.sample)}")
@@ -1495,7 +1463,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
reg = reagent.to_sql()
reg.save()
def to_sql(self, new: bool=False):
def to_sql(self, new: bool = False):
from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation
if new:
sql = Procedure()
@@ -1514,8 +1482,9 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql.repeat = self.repeat
if sql.repeat:
regex = re.compile(r".*\dR\d$")
repeats = [item for item in self.run.procedure if self.repeat_of in item.name and bool(regex.match(item.name))]
sql.name = f"{self.repeat_of}R{str(len(repeats)+1)}"
repeats = [item for item in self.run.procedure if
self.repeat_of in item.name and bool(regex.match(item.name))]
sql.name = f"{self.repeat_of}R{str(len(repeats) + 1)}"
sql.repeat_of = self.repeat_of
sql.started_date = datetime.now()
if self.run:
@@ -1572,7 +1541,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
logger.debug(f"sample {sample_sql} found in {sql.run.sample}")
if sample_sql not in sql.sample:
proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
row=sample.row, column=sample.column, plate_rank=sample.plate_rank)
row=sample.row, column=sample.column,
procedure_rank=sample.procedure_rank)
if self.kittype['value'] not in ["NA", None, ""]:
kittype = KitType.query(name=self.kittype['value'], limit=1)
if kittype:
@@ -1592,11 +1562,22 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
class PydClientSubmission(PydBaseClass):
# sql_object: ClassVar = ClientSubmission
key_value_order = ["submitter_plate_id",
"submitted_date",
"client_lab",
"contact",
"contact_email",
"cost_centre",
"submission_type",
"sample_count",
"submission_category"]
filepath: Path | None = Field(default=None)
submissiontype: dict | None
submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True)
clientlab: dict | None
sample_count: dict | None
full_batch_size: int | dict = Field(default=0)
submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True)
cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
@@ -1707,6 +1688,14 @@ class PydClientSubmission(PydBaseClass):
value = dict(value=value, missing=True)
return value
@field_validator("full_batch_size")
@classmethod
def dict_to_int(cls, value):
if isinstance(value, dict):
value = value['value']
value = int(value)
return value
def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None):
"""
Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget
@@ -1721,6 +1710,7 @@ class PydClientSubmission(PydBaseClass):
"""
from frontend.widgets.submission_widget import ClientSubmissionFormWidget
if not samples:
# samples = [sample for sample in self.sample if sample.sample_id.lower() not in ["", "blank"]]
samples = self.sample
return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable)
@@ -1728,9 +1718,18 @@ class PydClientSubmission(PydBaseClass):
sql = super().to_sql()
assert not any([isinstance(item, PydSample) for item in sql.sample])
sql.sample = []
if "info_placement" not in sql._misc_info:
sql._misc_info['info_placement'] = []
info_placement = []
# if "info_placement" not in sql._misc_info:
# sql._misc_info['info_placement'] = []
# info_placement = []
logger.debug(f"PYD Submission type: {self.submissiontype}")
logger.debug(f"SQL Submission Type: {sql.submissiontype}")
if not sql.submissiontype:
sql.submissiontype = SubmissionType.query(name=self.submissiontype['value'])
match sql.submissiontype:
case SubmissionType():
pass
case _:
sql.submissiontype = SubmissionType.query(name="Test")
for k in list(self.model_fields.keys()) + list(self.model_extra.keys()):
logger.debug(f"Running {k}")
attribute = getattr(self, k)
@@ -1738,41 +1737,52 @@ class PydClientSubmission(PydBaseClass):
case "filepath":
sql._misc_info[k] = attribute.__str__()
continue
# case "sample":
# sample = [item.to_sql() for item in self.sample]
# logger.debug(sample)
# for s in sample:
# logger.debug(f"adding {s}")
# sql.add_sample(sample=s)
case _:
pass
logger.debug(f"Setting {k} to {attribute}")
if isinstance(attribute, dict):
if "location" in attribute:
info_placement.append(dict(name=k, location=attribute['location']))
else:
info_placement.append(dict(name=k, location=None))
# max_row = max([value['location']['row'] for value in info_placement if value])
sql._misc_info['info_placement'] = info_placement
# logger.debug(f"Setting {k} to {attribute}")
# if isinstance(attribute, dict):
# if "location" in attribute:
# info_placement.append(dict(name=k, location=attribute['location']))
# else:
# info_placement.append(dict(name=k, location=None))
# # max_row = max([value['location']['row'] for value in info_placement if value])
# sql._misc_info['info_placement'] = info_placement
return sql
def pad_samples_to_length(self, row_count, column_names):
output_samples = []
for iii in range(1, row_count + 1):
try:
sample = next((item for item in self.samples if item.submission_rank == iii))
except StopIteration:
sample = PydSample(sample_id="")
for column in column_names:
setattr(sample, column[0], "")
sample.submission_rank = iii
output_samples.append(sample)
return sorted(output_samples, key=lambda x: x.submission_rank)
@property
def max_sample_rank(self) -> int:
output = self.full_batch_size
if output > 0:
return output
else:
return max([item.submission_rank for item in self.sample])
# def pad_samples_to_length(self, row_count, column_names):
# output_samples = []
# for iii in range(1, row_count + 1):
# try:
# sample = next((item for item in self.samples if item.submission_rank == iii))
# except StopIteration:
# sample = PydSample(sample_id="")
# for column in column_names:
# setattr(sample, column[0], "")
# sample.submission_rank = iii
# output_samples.append(sample)
# return sorted(output_samples, key=lambda x: x.submission_rank)
def improved_dict(self, dictionaries: bool = True) -> dict:
output = super().improved_dict(dictionaries=dictionaries)
output['sample'] = self.sample
return output
output['client_lab'] = output['clientlab']
try:
output['contact_email'] = output['contact']['email']
except TypeError:
pass
return sort_dict_by_list(output, self.key_value_order)
# @property
# def writable_dict(self):
# output = self.improved_dict()
@property
def filename_template(self):

View File

@@ -55,8 +55,8 @@ class SampleChecker(QDialog):
self.layout.addWidget(self.buttonBox, 11, 9, 1, 1, alignment=Qt.AlignmentFlag.AlignRight)
self.setLayout(self.layout)
with open("sample_checker_rendered.html", "w") as f:
f.write(html)
# with open("sample_checker_rendered.html", "w") as f:
# f.write(html)
logger.debug(f"HTML sample checker written!")
@pyqtSlot(str, str, str)
@@ -88,7 +88,7 @@ class SampleChecker(QDialog):
def formatted_list(self) -> List[dict]:
output = []
for sample in self.samples:
logger.debug(sample)
# logger.debug(sample)
s = sample.improved_dict(dictionaries=False)
if s['sample_id'] in [item['sample_id'] for item in output]:
s['color'] = "red"

View File

@@ -66,9 +66,9 @@ class SubmissionDetails(QDialog):
html = template.render(**d, css=[css])
self.webview.setHtml(html)
self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}")
with open(f"{object.__class__.__name__}_details_rendered.html", "w") as f:
# with open(f"{object.__class__.__name__}_details_rendered.html", "w") as f:
# f.write(html)
pass
# pass
def activate_export(self) -> None:

View File

@@ -8,24 +8,20 @@ from PyQt6.QtWidgets import (
QComboBox, QDateEdit, QLineEdit, QLabel, QCheckBox, QHBoxLayout, QGridLayout
)
from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker
from .functions import select_open_file, select_save_file
import logging
from pathlib import Path
from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser
from backend.validators import PydRun, PydReagent, PydClientSubmission, PydSample
from backend.validators import PydReagent, PydClientSubmission, PydSample
from backend.db import (
ClientLab, SubmissionType, Reagent,
ReagentRole, KitTypeReagentRoleAssociation, Run
ReagentRole, KitTypeReagentRoleAssociation, Run, ClientSubmission
)
from pprint import pformat
from .pop_ups import QuestionAsker, AlertPop
from .omni_add_edit import AddEdit
from typing import List, Tuple
from datetime import date
from .sample_checker import SampleChecker
logger = logging.getLogger(f"submissions.{__name__}")
@@ -368,33 +364,33 @@ class SubmissionFormWidget(QWidget):
return report
base_submission = self.pyd.to_sql()
# NOTE: check output message for issues
try:
trigger = result.results[-1]
code = trigger.code
except IndexError as e:
logger.error(result.results)
logger.error(f"Problem getting error code: {e}")
code = 0
match code:
# NOTE: code 0: everything is fine.
case 0:
pass
# NOTE: code 1: ask for overwrite
case 1:
dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_number}?", message=trigger.msg)
if dlg.exec():
# NOTE: Do not add duplicate reagents.
pass
else:
self.app.ctx.database_session.rollback()
report.add_result(Result(msg="Overwrite cancelled", status="Information"))
return report
# NOTE: code 2: No RSL plate number given
case 2:
report.add_result(result)
return report
case _:
pass
# try:
# trigger = result.results[-1]
# code = trigger.code
# except IndexError as e:
# logger.error(result.results)
# logger.error(f"Problem getting error code: {e}")
# code = 0
# match code:
# # NOTE: code 0: everything is fine.
# case 0:
# pass
# # NOTE: code 1: ask for overwrite
# case 1:
# dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_number}?", message=trigger.msg)
# if dlg.exec():
# # NOTE: Do not add duplicate reagents.
# pass
# else:
# self.app.ctx.database_session.rollback()
# report.add_result(Result(msg="Overwrite cancelled", status="Information"))
# return report
# # NOTE: code 2: No RSL plate number given
# case 2:
# report.add_result(result)
# return report
# case _:
# pass
# NOTE: add reagents to procedure object
if base_submission is None:
return
@@ -517,7 +513,7 @@ class SubmissionFormWidget(QWidget):
def set_widget(self, parent: QWidget, key: str, value: dict,
submission_type: str | SubmissionType | None = None,
sub_obj: Run | None = None) -> QWidget:
sub_obj: ClientSubmission | None = None) -> QWidget:
"""
Creates form widget
@@ -596,7 +592,7 @@ class SubmissionFormWidget(QWidget):
add_widget.addItems(categories)
add_widget.setToolTip("Enter procedure category or select from list.")
case _:
if key in sub_obj.timestamps:
if key in ClientSubmission.timestamps:
add_widget = MyQDateEdit(calendarPopup=True, scrollWidget=parent)
# NOTE: sets submitted date based on date found in excel sheet
try:
@@ -875,6 +871,9 @@ class ClientSubmissionFormWidget(SubmissionFormWidget):
if isinstance(sample, PydSample):
sample = sample.to_sql()
assert not isinstance(sample, PydSample)
if sample.sample_id.lower() in ["", "blank"]:
continue
sample.save()
# if sample not in sql.sample:
sql.add_sample(sample=sample)
logger.debug(pformat(sql.__dict__))

View File

@@ -4,6 +4,7 @@ Contains miscellaenous functions used by both frontend and backend.
from __future__ import annotations
import builtins, importlib, time, logging, re, yaml, sys, os, stat, platform, getpass, json, numpy as np, pandas as pd
import itertools
from collections import OrderedDict
from datetime import date, datetime, timedelta
from json import JSONDecodeError
from threading import Thread
@@ -467,7 +468,6 @@ def render_details_template(template_name:str, css_in:List[str]|str=[], js_in:Li
return template.render(css=css_out, js=js_out, **kwargs)
def convert_well_to_row_column(input_str: str) -> Tuple[int, int]:
"""
Converts typical alphanumeric (i.e. "A2") to row, column
@@ -564,6 +564,19 @@ def list_str_comparator(input_str:str, listy: List[str], mode: Literal["starts_w
else:
return False
def sort_dict_by_list(dictionary: dict, order_list: list) -> dict:
output = OrderedDict()
for item in order_list:
try:
output[item] = dictionary[item]
except KeyError:
continue
for k, v in dictionary.items():
if k in output:
continue
output[k] = v
return output
def setup_lookup(func):
"""