From 087bf9bcb7d2436847b90e1e79109977cd67de37 Mon Sep 17 00:00:00 2001 From: lwark Date: Wed, 6 Aug 2025 13:26:37 -0500 Subject: [PATCH] Preparing for production testing using Bacterial Culture --- src/submissions/backend/db/models/__init__.py | 29 +- src/submissions/backend/db/models/kits.py | 6 +- .../backend/db/models/submissions.py | 51 ++-- .../backend/excel/parsers/__init__.py | 127 ++++---- .../excel/parsers/clientsubmission_parser.py | 143 +++++---- .../parsers/procedure_parsers/__init__.py | 4 +- .../excel/parsers/results_parsers/__init__.py | 38 +++ .../results_parsers/pcr_results_parser.py | 54 ++-- src/submissions/backend/excel/reports.py | 5 +- .../backend/excel/writers/__init__.py | 275 +++++++++++------- .../excel/writers/clientsubmission_writer.py | 92 +++--- .../writers/procedure_writers/__init__.py | 113 ++++--- .../results_writers/pcr_results_writer.py | 45 ++- src/submissions/backend/managers/__init__.py | 1 - .../backend/managers/clientsubmissions.py | 20 +- .../backend/managers/procedures.py | 27 +- .../managers/results/pcr_results_manager.py | 2 +- src/submissions/backend/managers/runs.py | 15 +- .../backend/validators/__init__.py | 26 +- src/submissions/backend/validators/pydant.py | 240 +++++++-------- .../frontend/widgets/sample_checker.py | 6 +- .../frontend/widgets/submission_details.py | 4 +- .../frontend/widgets/submission_widget.py | 69 +++-- src/submissions/tools/__init__.py | 15 +- 24 files changed, 757 insertions(+), 650 deletions(-) diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index af595e8..776c7cc 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -4,6 +4,7 @@ Contains all models for sqlalchemy from __future__ import annotations import sys, logging, json +from collections import OrderedDict import sqlalchemy.exc from dateutil.parser import parse @@ -295,8 +296,8 @@ class BaseClass(Base): # logger.debug(f"Model: {model}") if query is None: query: Query = cls.__database_session__.query(cls) - else: - logger.debug(f"Incoming query: {query}") + # else: + # logger.debug(f"Incoming query: {query}") singles = cls.get_default_info('singles') for k, v in kwargs.items(): logger.info(f"Using key: {k} with value: {v} against {cls}") @@ -611,7 +612,7 @@ class BaseClass(Base): relevant = {k: v for k, v in self.__class__.__dict__.items() if isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)} - output = {} + output = OrderedDict() output['excluded'] = ["excluded", "misc_info", "_misc_info", "id"] for k, v in relevant.items(): try: @@ -624,23 +625,11 @@ class BaseClass(Base): value = getattr(self, k) except AttributeError: continue - # match value: - # case datetime(): - # value = value.strftime("%Y-%m-%d %H:%M:%S") - # case bytes(): - # continue - # case dict(): - # try: - # value = value['name'] - # except KeyError: - # if k == "_misc_info": - # value = value - # else: - # continue - # case _: - # pass output[k.strip("_")] = value - # output = self.clean_details_dict(output) + if self._misc_info: + for key, value in self._misc_info.items(): + output[key] = value + return output @classmethod @@ -670,8 +659,6 @@ class BaseClass(Base): output[k] = value return output - - def to_pydantic(self, pyd_model_name:str|None=None, **kwargs): from backend.validators import pydant if not pyd_model_name: diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 5a0a23f..419d06a 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -154,8 +154,8 @@ class KitType(BaseClass): ) #: Relation to SubmissionType proceduretype = association_proxy("kittypeproceduretypeassociation", "proceduretype", - creator=lambda ST: ProcedureTypeKitTypeAssociation( - submissiontype=ST)) #: Association proxy to SubmissionTypeKitTypeAssociation + creator=lambda PT: ProcedureTypeKitTypeAssociation( + proceduretype=PT)) #: Association proxy to SubmissionTypeKitTypeAssociation @@ -571,6 +571,8 @@ class ReagentRole(BaseClass): reagents = [reagent for reagent in self.reagent] if assoc: last_used = Reagent.query(name=assoc.last_used) + if isinstance(last_used, list): + last_used = None if last_used: reagents.insert(0, reagents.pop(reagents.index(last_used))) # return [f"{reagent.name} - {reagent.lot} - {reagent.expiry}" for reagent in reagents] diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index edd8aee..8c1b705 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -60,6 +60,7 @@ class ClientSubmission(BaseClass, LogMixin): name="fk_BS_sublab_id")) #: client lab id from _organizations submission_category = Column(String(64)) sample_count = Column(INTEGER) #: Number of sample in the procedure + full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate. comment = Column(JSON) run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship contact = relationship("Contact", back_populates="clientsubmission") #: client org @@ -86,6 +87,10 @@ class ClientSubmission(BaseClass, LogMixin): def name(self): return self.submitter_plate_id + @property + def max_sample_rank(self) -> int: + return max([item.submission_rank for item in self.clientsubmissionsampleassociation]) + @classmethod @setup_lookup def query(cls, @@ -308,6 +313,7 @@ class ClientSubmission(BaseClass, LogMixin): row=row, column=column ) + # assoc.save() return assoc @property @@ -357,7 +363,9 @@ class ClientSubmission(BaseClass, LogMixin): def details_dict(self, **kwargs): output = super().details_dict(**kwargs) output['clientlab'] = output['clientlab'].details_dict() - output['contact'] = output['contact'].details_dict() + if "contact" in output and issubclass(output['contact'].__class__, BaseClass): + output['contact'] = output['contact'].details_dict() + output['contact_email'] = output['contact']['email'] output['submissiontype'] = output['submissiontype'].details_dict() output['run'] = [run.details_dict() for run in output['run']] output['sample'] = [sample.details_dict() for sample in output['clientsubmissionsampleassociation']] @@ -365,15 +373,13 @@ class ClientSubmission(BaseClass, LogMixin): output['client_lab'] = output['clientlab'] output['submission_type'] = output['submissiontype'] output['excluded'] += ['run', "sample", "clientsubmissionsampleassociation", "excluded", - "expanded", 'clientlab', 'submissiontype', 'id'] + "expanded", 'clientlab', 'submissiontype', 'id', 'info_placement', 'filepath', "name"] output['expanded'] = ["clientlab", "contact", "submissiontype"] - # output = self.clean_details_dict(output) - logger.debug(f"{self.__class__.__name__}\n\n{pformat(output)}") return output def to_pydantic(self, filepath: Path | str | None = None, **kwargs): output = super().to_pydantic(filepath=filepath, **kwargs) - output.template_file = self.template_file + # output.template_file = self.template_file return output @@ -690,7 +696,7 @@ class Run(BaseClass, LogMixin): output['procedure'] = [procedure.details_dict() for procedure in output['procedure']] output['permission'] = is_power_user() output['excluded'] += ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom', - 'permission', "clientsubmission"] + 'permission', "clientsubmission"] output['sample_count'] = self.sample_count output['client_submission'] = self.clientsubmission.name output['started_date'] = self.started_date @@ -1337,6 +1343,10 @@ class Run(BaseClass, LogMixin): Manager = getattr(managers, f"Default{self.__class__.__name__}Manager") manager = Manager(parent=obj, input_object=self.to_pydantic()) workbook = manager.write() + try: + workbook.remove_sheet("Sheet") + except ValueError: + pass workbook.save(filename=output_filepath) def construct_filename(self): @@ -1695,8 +1705,8 @@ class ClientSubmissionSampleAssociation(BaseClass): sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated sample clientsubmission_id = Column(INTEGER, ForeignKey("_clientsubmission.id"), primary_key=True) #: id of associated procedure - row = Column(INTEGER) - column = Column(INTEGER) + # row = Column(INTEGER) + # column = Column(INTEGER) submission_rank = Column(INTEGER, primary_key=True, default=0) #: Location in sample list # NOTE: reference to the Submission object clientsubmission = relationship("ClientSubmission", @@ -1740,13 +1750,13 @@ class ClientSubmissionSampleAssociation(BaseClass): # NOTE: Get associated sample info sample = self.sample.to_sub_dict() sample['sample_id'] = self.sample.sample_id - sample['row'] = self.row - sample['column'] = self.column - try: - sample['well'] = f"{row_map[self.row]}{self.column}" - except KeyError as e: - logger.error(f"Unable to find row {self.row} in row_map.") - sample['Well'] = None + # sample['row'] = self.row + # sample['column'] = self.column + # try: + # sample['well'] = f"{row_map[self.row]}{self.column}" + # except (KeyError, AttributeError) as e: + # logger.error(f"Unable to find row {self.row} in row_map.") + # sample['Well'] = None sample['plate_name'] = self.clientsubmission.submitter_plate_id sample['positive'] = False sample['submitted_date'] = self.clientsubmission.submitted_date @@ -1760,13 +1770,9 @@ class ClientSubmissionSampleAssociation(BaseClass): # logger.debug(f"Relevant info from assoc output: {pformat(relevant)}") output = output['sample'].details_dict() misc = output['misc_info'] - # logger.debug(f"Output from sample: {pformat(output)}") + # # logger.debug(f"Output from sample: {pformat(output)}") output.update(relevant) output['misc_info'] = misc - # output['sample'] = temp - # output.update(output['sample'].details_dict()) - - # sys.exit() return output def to_pydantic(self) -> "PydSample": @@ -1777,7 +1783,7 @@ class ClientSubmissionSampleAssociation(BaseClass): PydSample: Pydantic Model """ from backend.validators import PydSample - return PydSample(**self.to_sub_dict()) + return PydSample(**self.details_dict()) @property def hitpicked(self) -> dict | None: @@ -2194,7 +2200,7 @@ class ProcedureSampleAssociation(BaseClass): sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment row = Column(INTEGER) column = Column(INTEGER) - plate_rank = Column(INTEGER) + procedure_rank = Column(INTEGER) procedure = relationship(Procedure, back_populates="proceduresampleassociation") #: associated procedure @@ -2267,4 +2273,3 @@ class ProcedureSampleAssociation(BaseClass): except KeyError: logger.error(output) return output - diff --git a/src/submissions/backend/excel/parsers/__init__.py b/src/submissions/backend/excel/parsers/__init__.py index b42dc13..7f12dc1 100644 --- a/src/submissions/backend/excel/parsers/__init__.py +++ b/src/submissions/backend/excel/parsers/__init__.py @@ -4,13 +4,11 @@ from __future__ import annotations import logging, re from pathlib import Path -from typing import Generator, Tuple, TYPE_CHECKING - +from typing import Generator, TYPE_CHECKING +from openpyxl.cell import MergedCell from openpyxl.reader.excel import load_workbook -from openpyxl.worksheet.worksheet import Worksheet from pandas import DataFrame from backend.validators import pydant - if TYPE_CHECKING: from backend.db.models import ProcedureType @@ -34,8 +32,8 @@ class DefaultParser(object): instance.filepath = filepath return instance - def __init__(self, filepath: Path | str, proceduretype: ProcedureType | None = None, range_dict: dict | None = None, - *args, **kwargs): + def __init__(self, filepath: Path | str, proceduretype: ProcedureType | None = None, sheet: str | None = None, + start_row: int = 1, *args, **kwargs): """ Args: @@ -45,7 +43,6 @@ class DefaultParser(object): *args (): **kwargs (): """ - logger.debug(f"\n\nHello from {self.__class__.__name__}\n\n") self.proceduretype = proceduretype try: @@ -55,20 +52,21 @@ class DefaultParser(object): logger.error( f"Couldn't get pyd object: Pyd{self.__class__.__name__.replace('Parser', '').replace('Info', '')}, using {self.__class__.pyd_name}") self._pyd_object = getattr(pydant, self.__class__.pyd_name) + if not sheet: + sheet = self.__class__.sheet + self.sheet = sheet + if not start_row: + start_row = self.__class__.start_row self.workbook = load_workbook(self.filepath, data_only=True) - if not range_dict: - self.range_dict = self.__class__.default_range_dict - else: - self.range_dict = range_dict - logger.debug(f"Default parser range dict: {self.range_dict}") - for item in self.range_dict: - item['worksheet'] = self.workbook[item['sheet']] + self.worksheet = self.workbook[self.sheet] + self.start_row = self.delineate_start_row(start_row=start_row) + self.end_row = self.delineate_end_row(start_row=self.start_row) + logger.debug(f"Start row: {self.start_row}, End row: {self.end_row}") def to_pydantic(self): # data = {key: value['value'] for key, value in self.parsed_info.items()} data = self.parsed_info data['filepath'] = self.filepath - return self._pyd_object(**data) @classmethod @@ -78,75 +76,68 @@ class DefaultParser(object): proceduretype = ProcedureType.query(name=proceduretype) return proceduretype - @classmethod - def delineate_end_row(cls, worksheet: Worksheet, start_row: int = 1): - for iii, row in enumerate(worksheet.iter_rows(min_row=start_row), start=1): + def delineate_start_row(self, start_row: int = 1): + for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row): + if not all([item.value is None for item in row]): + return iii + return self.worksheet.min + + def delineate_end_row(self, start_row: int = 1): + for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row): if all([item.value is None for item in row]): return iii + return self.worksheet.max_row class DefaultKEYVALUEParser(DefaultParser): - # default_range_dict = [dict( - # start_row=2, - # end_row=18, - # key_column=1, - # value_column=2, - # sheet="Sample List" - # )] - # default_range_dict = [dict(sheet="Sample List", start_row=2)] + sheet = "Client Info" + start_row = 1 @property def parsed_info(self): - for item in self.range_dict: - item['end_row'] = self.delineate_end_row(item['worksheet'], start_row=item['start_row']) - rows = range(item['start_row'], item['end_row']) - # item['start_row'] = item['end_row'] - # del item['end_row'] - for row in rows: - key = item['worksheet'].cell(row, 1).value - if key: - # Note: Remove anything in brackets. - key = re.sub(r"\(.*\)", "", key) - key = key.lower().replace(":", "").strip().replace(" ", "_") - value = item['worksheet'].cell(row, 2).value - missing = False if value else True - location_map = dict(row=row, key_column=1, value_column=2, - sheet=item['sheet']) - value = dict(value=value, location=location_map, missing=missing) - logger.debug(f"Yielding {value} for {key}") - yield key, value + rows = range(self.start_row, self.end_row) + for row in rows: + check_row = [item for item in self.worksheet.rows][row-1] + logger.debug(f"Checking row {row-1}, {check_row} for merged cells.") + if any([isinstance(cell, MergedCell) for cell in check_row]): + continue + key = self.worksheet.cell(row, 1).value + if key: + # Note: Remove anything in brackets. + key = re.sub(r"\(.*\)", "", key) + key = key.lower().replace(":", "").strip().replace(" ", "_") + value = self.worksheet.cell(row, 2).value + missing = False if value else True + # location_map = dict(row=row, key_column=1, value_column=2, sheet=self.worksheet.title) + value = dict(value=value, missing=missing)#, location=location_map) + logger.debug(f"Yielding {value} for {key}") + yield key, value class DefaultTABLEParser(DefaultParser): - default_range_dict = [dict( - header_row=18, - sheet="Sample List" - )] + + sheet = "Client Info" + start_row = 18 @property def parsed_info(self) -> Generator[dict, None, None]: - for item in self.range_dict: - # list_worksheet = self.workbook[item['sheet']] - list_worksheet = item['worksheet'] - if "end_row" in item.keys(): - list_df = DataFrame( - [item for item in list_worksheet.values][item['header_row'] - 1:item['end_row'] - 1]) - else: - list_df = DataFrame([item for item in list_worksheet.values][item['header_row'] - 1:]) - list_df.columns = list_df.iloc[0] - list_df = list_df[1:] - list_df = list_df.dropna(axis=1, how='all') - for ii, row in enumerate(list_df.iterrows()): - output = {} - for key, value in row[1].to_dict().items(): - if isinstance(key, str): - key = key.lower().replace(" ", "_") - key = re.sub(r"_(\(.*\)|#)", "", key) - # logger.debug(f"Row {ii} values: {key}: {value}") - output[key] = value - yield output + logger.debug(f"creating dataframe from {self.start_row} to {self.end_row}") + df = DataFrame( + [item for item in self.worksheet.values][self.start_row - 1:self.end_row - 1]) + df.columns = df.iloc[0] + df = df[1:] + df = df.dropna(axis=1, how='all') + for ii, row in enumerate(df.iterrows()): + output = {} + for key, value in row[1].to_dict().items(): + if isinstance(key, str): + key = key.lower().replace(" ", "_") + key = re.sub(r"_(\(.*\)|#)", "", key) + # logger.debug(f"Row {ii} values: {key}: {value}") + output[key] = value + yield output def to_pydantic(self, **kwargs): return [self._pyd_object(**output) for output in self.parsed_info] diff --git a/src/submissions/backend/excel/parsers/clientsubmission_parser.py b/src/submissions/backend/excel/parsers/clientsubmission_parser.py index 8a9e2ad..c3a4e95 100644 --- a/src/submissions/backend/excel/parsers/clientsubmission_parser.py +++ b/src/submissions/backend/excel/parsers/clientsubmission_parser.py @@ -5,14 +5,12 @@ from __future__ import annotations import logging from pathlib import Path from string import ascii_lowercase -from typing import Generator - +from typing import Generator, TYPE_CHECKING, Literal from openpyxl.reader.excel import load_workbook - from tools import row_keys -# from backend.db.models import SubmissionType from . import DefaultKEYVALUEParser, DefaultTABLEParser - +if TYPE_CHECKING: + from backend.db.models import SubmissionType logger = logging.getLogger(f"submissions.{__name__}") @@ -20,7 +18,16 @@ logger = logging.getLogger(f"submissions.{__name__}") class SubmissionTyperMixin(object): @classmethod - def retrieve_submissiontype(cls, filepath: Path): + def retrieve_submissiontype(cls, filepath: Path) -> "SubmissionType": + """ + Gets the submission type from a file. + + Args: + filepath (Path): The import file + + Returns: + SubmissionType: The determined submissiontype + """ # NOTE: Attempt 1, get from form properties: sub_type = cls.get_subtype_from_properties(filepath=filepath) if not sub_type: @@ -35,7 +42,16 @@ class SubmissionTyperMixin(object): return sub_type @classmethod - def get_subtype_from_regex(cls, filepath: Path): + def get_subtype_from_regex(cls, filepath: Path) -> "SubmissionType": + """ + Uses regex of the file name to determine submissiontype + + Args: + filepath (Path): The import file + + Returns: + SubmissionType: The determined submissiontype + """ from backend.db.models import SubmissionType regex = SubmissionType.regex m = regex.search(filepath.__str__()) @@ -43,21 +59,42 @@ class SubmissionTyperMixin(object): sub_type = m.lastgroup except AttributeError as e: sub_type = None - logger.critical(f"No procedure type found or procedure type found!: {e}") + logger.critical(f"No submission type or procedure type found!: {e}") + sub_type = SubmissionType.query(name=sub_type, limit=1) + if not sub_type: + return return sub_type @classmethod - def get_subtype_from_preparse(cls, filepath: Path): + def get_subtype_from_preparse(cls, filepath: Path) -> "SubmissionType": + """ + Performs a default parse of the file in an attempt to find the submission type. + + Args: + filepath (Path): The import file + + Returns: + SubmissionType: The determined submissiontype + """ from backend.db.models import SubmissionType - parser = ClientSubmissionInfoParser(filepath) - sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None) - sub_type = SubmissionType.query(name=sub_type) + parser = ClientSubmissionInfoParser(filepath=filepath, submissiontype=SubmissionType.query(name="Test")) + sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype" or k == "submission_type"), None) + sub_type = SubmissionType.query(name=sub_type.title()) if isinstance(sub_type, list): - sub_type = None + return return sub_type @classmethod - def get_subtype_from_properties(cls, filepath: Path): + def get_subtype_from_properties(cls, filepath: Path) -> "SubmissionType": + """ + Attempts to get submission type from the xl metadata. + + Args: + filepath (Path): The import file + + Returns: + SubmissionType: The determined submissiontype + """ from backend.db.models import SubmissionType wb = load_workbook(filepath) # NOTE: Gets first category in the metadata. @@ -65,62 +102,56 @@ class SubmissionTyperMixin(object): sub_type = next((item.strip().title() for item in categories), None) sub_type = SubmissionType.query(name=sub_type) if isinstance(sub_type, list): - sub_type = None + return return sub_type class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin): """ - Object for retrieving submitter info from "sample list" sheet + Object for retrieving submitter info from "Client Info" sheet """ pyd_name = "PydClientSubmission" - default_range_dict = [dict( - start_row=2, - end_row=16, - key_column=1, - value_column=2, - sheet="Sample List" - )] - - def __init__(self, filepath: Path | str, submissiontype:"SubmissionType"|None=None, *args, **kwargs): - from frontend.widgets.pop_ups import QuestionAsker - from backend.managers import procedures as procedure_managers + def __init__(self, filepath: Path | str, submissiontype: "SubmissionType" | None = None, *args, **kwargs): + logger.debug(f"Set submission type: {submissiontype}") if not submissiontype: self.submissiontype = self.retrieve_submissiontype(filepath=filepath) else: self.submissiontype = submissiontype - # if "range_dict" not in kwargs: - # kwargs['range_dict'] = self.submissiontype.info_map - super().__init__(filepath=filepath, range_dict=[dict(sheet="Client Info")], **kwargs) - allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype] - for name in allowed_procedure_types: - if name in self.workbook.sheetnames: - # TODO: check if run with name already exists - add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?") - if add_run.accepted: - # NOTE: recruit parser. - try: - manager = getattr(procedure_managers, name) - except AttributeError: - manager = procedure_managers.DefaultManager - self.manager = manager(proceduretype=name) - pass + super().__init__(filepath=filepath, sheet="Client Info", start_row=1, **kwargs) + # NOTE: move to the manager class. + # allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype] + # for name in allowed_procedure_types: + # if name in self.workbook.sheetnames: + # # TODO: check if run with name already exists + # add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?") + # if add_run.accepted: + # # NOTE: recruit parser. + # try: + # manager = getattr(procedure_managers, name) + # except AttributeError: + # manager = procedure_managers.DefaultManager + # self.manager = manager(proceduretype=name) + # pass @property def parsed_info(self): - output = {k:v for k, v in super().parsed_info} + output = {k: v for k, v in super().parsed_info} try: output['clientlab'] = output['client_lab'] except KeyError: pass + # output['submissiontype'] = dict(value=self.submissiontype.name.title()) + try: + output['submissiontype'] = output['submission_type'] + output['submissiontype']['value'] = self.submissiontype.name.title() + except KeyError: + pass logger.debug(f"Data: {output}") - output['submissiontype'] = self.submissiontype.name return output - class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin): """ Object for retrieving submitter samples from "sample list" sheet @@ -128,32 +159,26 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin): pyd_name = "PydSample" - default_range_dict = [dict( - header_row=18, - end_row=114, - sheet="Sample List" - )] - - def __init__(self, filepath: Path | str, submissiontype: "SubmissionType"|None=None, *args, **kwargs): + def __init__(self, filepath: Path | str, submissiontype: "SubmissionType" | None = None, start_row: int = 1, *args, + **kwargs): if not submissiontype: self.submissiontype = self.retrieve_submissiontype(filepath=filepath) else: self.submissiontype = submissiontype - if "range_dict" not in kwargs: - kwargs['range_dict'] = self.submissiontype.sample_map - super().__init__(filepath=filepath, **kwargs) + super().__init__(filepath=filepath, sheet="Client Info", start_row=start_row, **kwargs) @property def parsed_info(self) -> Generator[dict, None, None]: output = super().parsed_info - for ii, sample in enumerate(output): - logger.debug(f"Parsed info sample: {sample}") + for ii, sample in enumerate(output, start=1): + # logger.debug(f"Parsed info sample: {sample}") + if isinstance(sample["row"], str) and sample["row"].lower() in ascii_lowercase[0:8]: try: sample["row"] = row_keys[sample["row"]] except KeyError: pass - sample['submission_rank'] = ii + 1 + sample['submission_rank'] = ii yield sample def to_pydantic(self): diff --git a/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py b/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py index c3d1ace..e00b897 100644 --- a/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py +++ b/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py @@ -1,12 +1,12 @@ from __future__ import annotations from pathlib import Path from typing import TYPE_CHECKING - - from backend.excel.parsers import DefaultTABLEParser, DefaultKEYVALUEParser +import logging if TYPE_CHECKING: from backend.db.models import ProcedureType +logger = logging.getLogger(f"submissions.{__name__}") class ProcedureInfoParser(DefaultKEYVALUEParser): diff --git a/src/submissions/backend/excel/parsers/results_parsers/__init__.py b/src/submissions/backend/excel/parsers/results_parsers/__init__.py index e69de29..3f839d5 100644 --- a/src/submissions/backend/excel/parsers/results_parsers/__init__.py +++ b/src/submissions/backend/excel/parsers/results_parsers/__init__.py @@ -0,0 +1,38 @@ +from __future__ import annotations +from pathlib import Path +from backend.excel.parsers import DefaultKEYVALUEParser, DefaultTABLEParser +from typing import TYPE_CHECKING +import logging +if TYPE_CHECKING: + from backend.db.models import ProcedureType + +logger = logging.getLogger(f"submissions.{__name__}") + + +class DefaultResultsInfoParser(DefaultKEYVALUEParser): + pyd_name = "PydResults" + + def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None, + results_type: str | None = "PCR", *args, **kwargs): + if results_type: + self.results_type = results_type + sheet = proceduretype.allowed_result_methods[results_type]['info']['sheet'] + start_row = proceduretype.allowed_result_methods[results_type]['info']['start_row'] + super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args, + **kwargs) + + +class DefaultResultsSampleParser(DefaultTABLEParser): + pyd_name = "PydResults" + + def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None, + results_type: str | None = "PCR", *args, **kwargs): + if results_type: + self.results_type = results_type + sheet = proceduretype.allowed_result_methods[results_type]['sample']['sheet'] + start_row = proceduretype.allowed_result_methods[results_type]['sample']['start_row'] + super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args, + **kwargs) + + +from .pcr_results_parser import PCRInfoParser, PCRSampleParser diff --git a/src/submissions/backend/excel/parsers/results_parsers/pcr_results_parser.py b/src/submissions/backend/excel/parsers/results_parsers/pcr_results_parser.py index 8a59e2a..07ead5b 100644 --- a/src/submissions/backend/excel/parsers/results_parsers/pcr_results_parser.py +++ b/src/submissions/backend/excel/parsers/results_parsers/pcr_results_parser.py @@ -1,57 +1,42 @@ """ """ +from __future__ import annotations import logging -from backend.db.models import Run, Sample, Procedure, ProcedureSampleAssociation -from backend.excel.parsers import DefaultKEYVALUEParser, DefaultTABLEParser +from typing import Generator, TYPE_CHECKING +from backend.db.models import ProcedureSampleAssociation +from backend.excel.parsers.results_parsers import DefaultResultsInfoParser, DefaultResultsSampleParser from pathlib import Path +if TYPE_CHECKING: + from backend.validators.pydant import PydSample logger = logging.getLogger(f"submissions.{__name__}") -# class PCRResultsParser(DefaultParser): -# pass +class PCRInfoParser(DefaultResultsInfoParser): -class PCRInfoParser(DefaultKEYVALUEParser): - pyd_name = "PydResults" - - default_range_dict = [dict( - start_row=1, - end_row=24, - key_column=1, - value_column=2, - sheet="Results" - )] - - def __init__(self, filepath: Path | str, range_dict: dict | None = None, procedure=None): - super().__init__(filepath=filepath, range_dict=range_dict) + def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs): + self.results_type = "PCR" self.procedure = procedure + super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype) def to_pydantic(self): - # from backend.db.models import Procedure - data = dict(results={k:v for k, v in self.parsed_info}, filepath=self.filepath, - result_type="PCR") + data = dict(results={k: v for k, v in self.parsed_info}, filepath=self.filepath, + result_type=self.results_type) return self._pyd_object(**data, parent=self.procedure) -class PCRSampleParser(DefaultTABLEParser): +class PCRSampleParser(DefaultResultsSampleParser): """Object to pull data from Design and Analysis PCR export file.""" - pyd_name = "PydResults" - - default_range_dict = [dict( - header_row=25, - sheet="Results" - )] - - def __init__(self, filepath: Path | str, range_dict: dict | None = None, procedure=None): - super().__init__(filepath=filepath, range_dict=range_dict) + def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs): + self.results_type = "PCR" self.procedure = procedure + super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype) @property - def parsed_info(self): + def parsed_info(self) -> Generator[dict, None, None]: output = [item for item in super().parsed_info] - merge_column = "sample" sample_names = list(set([item['sample'] for item in output])) for sample in sample_names: multi = dict(result_type="PCR") @@ -60,17 +45,14 @@ class PCRSampleParser(DefaultTABLEParser): multi[soi['target']] = {k: v for k, v in soi.items() if k != "target" and k != "sample"} yield {sample: multi} - def to_pydantic(self): - logger.debug("running to pydantic") + def to_pydantic(self) -> Generator["PydSample", None, None]: for item in self.parsed_info: - # sample_obj = Sample.query(sample_id=list(item.keys())[0]) # NOTE: Ensure that only samples associated with the procedure are used. try: sample_obj = next( (sample for sample in self.procedure.sample if sample.sample_id == list(item.keys())[0])) except StopIteration: continue - # logger.debug(f"Sample object {sample_obj}") assoc = ProcedureSampleAssociation.query(sample=sample_obj, procedure=self.procedure) if assoc and not isinstance(assoc, list): output = self._pyd_object(results=list(item.values())[0], parent=assoc) diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index 9f265fb..5edbb0c 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -206,7 +206,10 @@ class ConcentrationMaker(ReportArchetype): self.subs = Run.query(start_date=start_date, end_date=end_date, submissiontype_name=submission_type, page_size=0) # self.sample = flatten_list([sub.get_provisional_controls(controls_only=controls_only) for sub in self.run]) - self.samples = flatten_list([sub.get_provisional_controls(include=include) for sub in self.subs]) + try: + self.samples = flatten_list([sub.get_provisional_controls(include=include) for sub in self.subs]) + except AttributeError: + self.samples = [] self.records = [self.build_record(sample) for sample in self.samples] self.df = DataFrame.from_records(self.records) self.sheet_name = "Concentration" diff --git a/src/submissions/backend/excel/writers/__init__.py b/src/submissions/backend/excel/writers/__init__.py index 60291e6..0fea86f 100644 --- a/src/submissions/backend/excel/writers/__init__.py +++ b/src/submissions/backend/excel/writers/__init__.py @@ -1,17 +1,17 @@ -import logging -import re -from io import BytesIO -from pathlib import Path +import logging, sys +from datetime import datetime, date from pprint import pformat -from typing import Any +from types import NoneType +from typing import Any, Literal -from openpyxl.reader.excel import load_workbook +from openpyxl.styles import Alignment, PatternFill +from openpyxl.utils import get_column_letter from openpyxl.workbook.workbook import Workbook from openpyxl.worksheet.worksheet import Worksheet from pandas import DataFrame - from backend.db.models import BaseClass, ProcedureType from backend.validators.pydant import PydBaseClass +from tools import flatten_list, create_plate_grid, sort_dict_by_list logger = logging.getLogger(f"submissions.{__name__}") @@ -21,155 +21,220 @@ class DefaultWriter(object): def __repr__(self): return f"{self.__class__.__name__}<{self.filepath.stem}>" - def __init__(self, pydant_obj, proceduretype: ProcedureType|None=None, range_dict: dict | None = None, *args, **kwargs): - # self.filepath = output_filepath + def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs): self.pydant_obj = pydant_obj self.proceduretype = proceduretype - if range_dict: - self.range_dict = range_dict - else: - self.range_dict = self.__class__.default_range_dict @classmethod - def stringify_value(cls, value:Any) -> str: + def stringify_value(cls, value: Any) -> str: + if isinstance(value, dict): + try: + value = value['value'] + except (KeyError, ValueError): + try: + value = value['name'] + except (KeyError, ValueError): + return match value: case x if issubclass(value.__class__, BaseClass): value = value.name case x if issubclass(value.__class__, PydBaseClass): value = value.name - case dict(): - try: - value = value['value'] - except ValueError: - try: - value = value['name'] - except ValueError: - value = value.__str__() + case bytes() | list(): + value = None + case datetime() | date(): + value = value.strftime("%Y-%m-%d") + case _: value = str(value) + # logger.debug(f"Returning value: {value}") return value @classmethod - def prettify_key(cls, value:str) -> str: - value = value.replace("type", " type").strip() - value = value.title() - return value + def prettify_key(cls, key: str) -> str: + key = key.replace("type", " type").strip() + key = key.replace("_", " ") + key = key.title() + key = key.replace("Id", "ID") + return key - - def write_to_workbook(self, workbook: Workbook): + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int | None = None, *args, **kwargs): logger.debug(f"Writing to workbook with {self.__class__.__name__}") + if not start_row: + try: + start_row = self.__class__.start_row + except AttributeError as e: + logger.error(f"Couldn't get start row due to {e}") + start_row = 1 + if not sheet: + sheet = self.__class__.sheet + self.sheet = sheet + if self.sheet not in workbook.sheetnames: + try: + self.worksheet = workbook["Sheet"] + self.worksheet.title = self.sheet + except KeyError: + self.worksheet = workbook.create_sheet(self.sheet) + else: + self.worksheet = workbook[self.sheet] + self.worksheet = self.prewrite(self.worksheet, start_row=start_row) + self.start_row = self.delineate_start_row(start_row=start_row) + self.end_row = self.delineate_end_row(start_row=start_row) + logger.debug(f"{self.__class__.__name__} Start row: {self.start_row}, end row: {self.end_row}") return workbook + def delineate_start_row(self, start_row: int = 1): + logger.debug(f"Attempting to find start row from {start_row}") + for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row): + if all([item.value is None for item in row]): + logger.debug(f"Returning {iii} for start row.") + return iii + if self.worksheet.max_row == 1: + return self.worksheet.max_row + 1 + else: + return self.worksheet.max_row + 2 + + def prewrite(self, worksheet: Worksheet, start_row: int) -> Worksheet: + return worksheet + + def columns_best_fit(self, worksheet: Worksheet) -> None: + """ + Make all columns best fit + """ + for col in worksheet.columns: + setlen = 0 + column = col[0].column_letter # Get the column name + for cell in col: + if len(str(cell.value)) > setlen: + setlen = len(str(cell.value)) + set_col_width = setlen + 5 + # Setting the column width + worksheet.column_dimensions[column].width = set_col_width + return worksheet + class DefaultKEYVALUEWriter(DefaultWriter): + sheet = "Client Info" + start_row = 2 + exclude = [] + key_order = [] - default_range_dict = [dict( - start_row=2, - end_row=18, - key_column=1, - value_column=2, - sheet="Sample List" - )] - - def __init__(self, pydant_obj, proceduretype: ProcedureType|None=None, range_dict: dict | None = None, *args, **kwargs): - super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, range_dict=range_dict, *args, **kwargs) + def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs): + super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs) self.fill_dictionary = self.pydant_obj.improved_dict() + def delineate_end_row(self, start_row: int = 1): + data_length = len(self.fill_dictionary) + return data_length + start_row + @classmethod def check_location(cls, locations: list, sheet: str): + logger.debug(f"Checking for location against {sheet}") return any([item['sheet'] == sheet for item in locations]) - def write_to_workbook(self, workbook: Workbook) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook) - for rng in self.range_dict: - rows = range(rng['start_row'], rng['end_row'] + 1) - worksheet = workbook[rng['sheet']] - try: - for ii, (k, v) in enumerate(self.fill_dictionary.items(), start=rng['start_row']): - try: - worksheet.cell(column=rng['key_column'], row=rows[ii], value=self.prettify_key(k)) - worksheet.cell(column=rng['value_column'], row=rows[ii], value=self.stringify_value(v)) - except IndexError: - logger.error(f"Not enough rows: {len(rows)} for index {ii}") - except ValueError as e: - logger.error(self.fill_dictionary) - raise e + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int = 1, *args, **kwargs) -> Workbook: + workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row) + dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in self.__class__.exclude} + dictionary = sort_dict_by_list(dictionary=dictionary, order_list=self.key_order) + for ii, (k, v) in enumerate(dictionary.items(), start=self.start_row): + value = self.stringify_value(value=v) + if value is None: + continue + self.worksheet.cell(column=1, row=ii, value=self.prettify_key(k)) + self.worksheet.cell(column=2, row=ii, value=value) + self.worksheet = self.postwrite(self.worksheet) return workbook + def postwrite(self, worksheet: Worksheet) -> Worksheet: + worksheet = self.columns_best_fit(worksheet=worksheet) + return worksheet + + class DefaultTABLEWriter(DefaultWriter): + sheet = "Client Info" + start_row = 19 + header_order = [] + exclude = [] - default_range_dict = [dict( - header_row=19, - sheet="Sample List" - )] - - @classmethod - def get_row_count(cls, worksheet: Worksheet, range_dict:dict): - if "end_row" in range_dict.keys(): - list_df = DataFrame([item for item in worksheet.values][range_dict['header_row'] - 1:range_dict['end_row'] - 1]) - else: - list_df = DataFrame([item for item in worksheet.values][range_dict['header_row'] - 1:]) + def get_row_count(self, start_row: int = 1): + list_df = DataFrame([item for item in self.worksheet.values][start_row - 1:]) row_count = list_df.shape[0] return row_count - def pad_samples_to_length(self, row_count, column_names): + def delineate_end_row(self, start_row: int = 1) -> int: + return start_row + len(self.pydant_obj) + 1 + + def pad_samples_to_length(self, row_count, + mode: Literal["submission", "procedure"] = "submission"): #, column_names): from backend import PydSample output_samples = [] for iii in range(1, row_count + 1): - # logger.debug(f"Submission rank: {iii}") if isinstance(self.pydant_obj, list): iterator = self.pydant_obj else: iterator = self.pydant_obj.sample try: - sample = next((item for item in iterator if item.submission_rank == iii)) + sample = next((item for item in iterator if getattr(item, f"{mode}_rank") == iii)) except StopIteration: sample = PydSample(sample_id="") - for column in column_names: - setattr(sample, column[0], "") sample.submission_rank = iii - sample.plate_rank = iii - # logger.debug(f"Appending {sample.sample_id}") - # logger.debug(f"Iterator now: {[item.submission_rank for item in iterator]}") + sample.procedure_rank = iii + if mode == "procedure": + if all([item.row for item in self.pydant_obj.sample]): + rows, columns = self.pydant_obj.rows_columns_count + grid = create_plate_grid(rows=rows, columns=columns) + sample.row, sample.column = grid[sample.procedure_rank] output_samples.append(sample) - return sorted(output_samples, key=lambda x: x.submission_rank) + return sorted(output_samples, key=lambda x: getattr(x, f"{mode}_rank")) - def write_to_workbook(self, workbook: Workbook) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook) - for rng in self.range_dict: - list_worksheet = workbook[rng['sheet']] - column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value] - for iii, object in enumerate(self.pydant_obj, start=1): - # logger.debug(f"Writing object: {object}") - write_row = rng['header_row'] + iii - for column in column_names: - if column[0].lower() in ["well", "row", "column"]: - continue - write_column = column[1] + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int | None = None, *args, **kwargs) -> Workbook: + workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row, *args, **kwargs) + self.header_list = self.sort_header_row(list(set(flatten_list([item.fields for item in self.pydant_obj])))) + logger.debug(f"Header row: {self.header_list}") + self.worksheet = self.write_header_row(worksheet=self.worksheet) + for iii, object in enumerate(self.pydant_obj, start=1): + write_row = self.start_row + iii + for header in self.header_list: + try: + column = next((cell for cell in self.worksheet[self.start_row] if + cell.value == header.replace("_", " ").title())) + except StopIteration: + logger.warning(f'Could not find column for {header.replace("_", " ").title()}') + continue + column = column.column + try: + value = object.improved_dict()[header.lower().replace(" ", "")] + except (AttributeError, KeyError) as e: try: - value = getattr(object, column[0].lower().replace(" ", "")) - except AttributeError: - try: - value = getattr(object, column[0].lower().replace("_", "")) - except AttributeError: - value = "" - # logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}") - list_worksheet.cell(row=write_row, column=write_column, value=self.stringify_value(value)) + value = object.improved_dict()[header.lower().replace(" ", "_")] + except (AttributeError, KeyError): + value = "" + self.worksheet.cell(row=write_row, column=column, value=self.stringify_value(value)) + self.worksheet = self.postwrite(self.worksheet) return workbook @classmethod - def construct_column_names(cls, column_item): - column = column_item.column - match column_item.value: - case str(): - value = column_item.value.lower().replace(" ", "_") - case _: - value = column_item.value - return value, column + def sort_header_row(cls, header_list: list) -> list: + output = [] + logger.debug(cls.exclude) + for item in cls.header_order: + if item in [header for header in header_list if header not in cls.exclude]: + output.append(header_list.pop(header_list.index(item))) + return output + sorted([item for item in header_list if item not in cls.exclude]) + + def write_header_row(self, worksheet: Worksheet) -> Worksheet: + for iii, header in enumerate(self.header_list, start=1): + worksheet.cell(row=self.start_row, column=iii, value=header.replace("_", " ").title()) + worksheet.cell(row=self.start_row, column=iii).alignment = Alignment(horizontal='center') + worksheet.cell(row=self.start_row, column=iii).fill = PatternFill(start_color='2DE733', end_color='2DE733', fill_type="solid") + return worksheet + + def postwrite(self, worksheet: Worksheet) -> Worksheet: + worksheet = self.columns_best_fit(worksheet=worksheet) + return worksheet from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter - - - - diff --git a/src/submissions/backend/excel/writers/clientsubmission_writer.py b/src/submissions/backend/excel/writers/clientsubmission_writer.py index 6352c2b..4efec8a 100644 --- a/src/submissions/backend/excel/writers/clientsubmission_writer.py +++ b/src/submissions/backend/excel/writers/clientsubmission_writer.py @@ -1,76 +1,52 @@ +from __future__ import annotations import logging -from pathlib import Path from pprint import pformat - from openpyxl.workbook import Workbook - +from openpyxl.styles import Alignment +from openpyxl.worksheet.worksheet import Worksheet +from typing import TYPE_CHECKING from . import DefaultKEYVALUEWriter, DefaultTABLEWriter +if TYPE_CHECKING: + from backend.db.models import ProcedureType logger = logging.getLogger(f"submissions.{__name__}") class ClientSubmissionInfoWriter(DefaultKEYVALUEWriter): + exclude = ["name", "id", "clientlab", "filepath"] - def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): - super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) + def __init__(self, pydant_obj, *args, **kwargs): + super().__init__(pydant_obj=pydant_obj, *args, **kwargs) logger.debug(f"{self.__class__.__name__} recruited!") - def write_to_workbook(self, workbook: Workbook) -> Workbook: - # workbook = super().write_to_workbook(workbook=workbook) - logger.debug(f"Skipped super.") - for rng in self.range_dict: - worksheet = workbook[rng['sheet']] - for key, value in self.fill_dictionary.items(): - logger.debug(f"Checking: key {key}, value {str(value)[:64]}") - if isinstance(value, bytes): - continue - try: - check = self.check_location(value['location'], rng['sheet']) - except TypeError: - check = False - if not check: - continue - # relevant_values[k] = v - logger.debug(f"Location passed for {value['location']}") - for location in value['location']: - if location['sheet'] != rng['sheet']: - continue - logger.debug(f"Writing {value} to row {location['row']}, column {location['value_column']}") - try: - worksheet.cell(location['row'], location['value_column'], value=value['value']) - except KeyError: - worksheet.cell(location['row'], location['value_column'], value=value['name']) - return workbook + def prewrite(self, worksheet: Worksheet, start_row: int) -> Worksheet: + # worksheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4) + worksheet.cell(row=start_row, column=1, value="Submitter Info") + worksheet.cell(row=start_row, column=1).alignment = Alignment(horizontal="center") + return worksheet class ClientSubmissionSampleWriter(DefaultTABLEWriter): - def write_to_workbook(self, workbook: Workbook) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook) - # logger.debug(f"\n\nHello from {self.__class__.__name__} with range_dict: {pformat(self.range_dict)}") - for rng in self.range_dict: - list_worksheet = workbook[rng['sheet']] - row_count = self.get_row_count(list_worksheet, rng) - column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value] - samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names) - # logger.debug(f"Samples: {pformat(samples)}") - for sample in samples: - write_row = rng['header_row'] + sample.submission_rank - # logger.debug(f"Writing sample: {sample} to row {write_row}") - for column in column_names: - # logger.debug(f"At column {column}") - if column[0].lower() in ["well", "row", "column"]: - continue - write_column = column[1] - try: - # value = sample[column[0]] - value = getattr(sample, column[0]) - except AttributeError: - value = "" - # logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}") - list_worksheet.cell(row=write_row, column=write_column, value=value) + + exclude = ['id', 'enabled', 'procedure_rank', "name"] + header_order = ["submission_rank", "sample_id"] + + def __init__(self, pydant_obj, proceduretype: "ProcedureType" | None = None, *args, **kwargs): + super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs) + + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int | None = None, *args, **kwargs) -> Workbook: + self.pydant_obj = self.pad_samples_to_length(row_count=self.pydant_obj.max_sample_rank)#, column_names=header_list) + workbook = super().write_to_workbook(workbook=workbook, sheet=sheet, start_row=start_row, *args, **kwargs) + self.worksheet = self.postwrite(self.worksheet) return workbook - - - + def postwrite(self, worksheet: Worksheet) -> Worksheet: + worksheet = super().postwrite(worksheet) + for row in worksheet.iter_rows(min_row=self.start_row, max_row=self.end_row): + for cell in row: + if cell.value in [0, "0", "None"]: + cell.value = "" + cell.alignment = Alignment(horizontal="center") + return worksheet diff --git a/src/submissions/backend/excel/writers/procedure_writers/__init__.py b/src/submissions/backend/excel/writers/procedure_writers/__init__.py index 4748c7e..3c2eda4 100644 --- a/src/submissions/backend/excel/writers/procedure_writers/__init__.py +++ b/src/submissions/backend/excel/writers/procedure_writers/__init__.py @@ -6,96 +6,83 @@ from pprint import pformat from openpyxl.workbook import Workbook from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from backend.db.models import ProcedureType logger = logging.getLogger(f"submissions.{__name__}") class ProcedureInfoWriter(DefaultKEYVALUEWriter): - default_range_dict = [dict( - start_row=1, - end_row=6, - key_column=1, - value_column=2, - sheet="" - )] + start_row = 1 + header_order = [] + exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits', + 'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', + 'reagentrole', 'results', 'sample', 'tips'] - def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): - super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits', 'procedureequipmentassociation', - 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', 'reagentrole', - 'results', 'sample', 'tips'] - self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in exclude} + def __init__(self, pydant_obj, *args, **kwargs): + + super().__init__(pydant_obj=pydant_obj, *args, **kwargs) + + self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in self.__class__.exclude} # logger.debug(pformat(self.fill_dictionary)) - for rng in self.range_dict: - if "sheet" not in rng or rng['sheet'] == "": - rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality" + # for rng in self.range_dict: + # if "sheet" not in rng or rng['sheet'] == "": + # rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality" + + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int = 1, *args, **kwargs) -> Workbook: + workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.pydant_obj.proceduretype.name} Quality") + return workbook class ProcedureReagentWriter(DefaultTABLEWriter): - default_range_dict = [dict( - header_row=8 - )] + exclude = ["id", "comments", "missing"] + header_order = ["reagentrole", "name", "lot", "expiry"] - def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): - super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - for rng in self.range_dict: - if "sheet" not in rng: - rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality" + def __init__(self, pydant_obj, *args, **kwargs): + super().__init__(pydant_obj=pydant_obj, *args, **kwargs) + self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" self.pydant_obj = self.pydant_obj.reagent + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int = 1, *args, **kwargs) -> Workbook: + logger.debug(self.pydant_obj) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) + return workbook + class ProcedureEquipmentWriter(DefaultTABLEWriter): - default_range_dict = [dict( - header_row=14 - )] + exclude = ['id'] + header_order = ['equipmentrole', 'name', 'asset_number', 'process', 'tips'] def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - for rng in self.range_dict: - if "sheet" not in rng: - rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality" + self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" self.pydant_obj = self.pydant_obj.equipment + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int = 1, *args, **kwargs) -> Workbook: + logger.debug(self.pydant_obj) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) + return workbook + class ProcedureSampleWriter(DefaultTABLEWriter): - default_range_dict = [dict( - header_row=21 - )] + exclude = ['id', 'enabled', 'name', "submission_rank"] + header_order = ['procedure_rank', 'sample_id'] def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - for rng in self.range_dict: - if "sheet" not in rng: - rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality" - self.pydant_obj = self.pydant_obj.sample + self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" + # self.pydant_obj = self.pydant_obj.sample + self.pydant_obj = self.pad_samples_to_length(row_count=pydant_obj.max_sample_rank, mode="procedure") - def write_to_workbook(self, workbook: Workbook) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook) - for rng in self.range_dict: - list_worksheet = workbook[rng['sheet']] - row_count = self.get_row_count(list_worksheet, rng) - column_names = [(item.value.lower().replace(" ", "_"), item.column) for item in - list_worksheet[rng['header_row']] if item.value] - samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names) - samples = sorted(samples, key=lambda x: x.plate_rank) - # samples = self.pydant_obj - # logger.debug(f"Samples: {[item.submission_rank for item in samples]}") - for sample in samples: - # logger.debug(f"Writing sample: {sample}") - if sample.row == 0 or sample.column == 0: - continue - write_row = rng['header_row'] + sample.plate_rank - for column in column_names: - if column[0].lower() in ["well"]:#, "row", "column"]: - continue - write_column = column[1] - try: - value = getattr(sample, column[0]) - except KeyError: - value = "" - # logger.debug(f"{column} Writing {value} to row {write_row}, column {write_column}") - list_worksheet.cell(row=write_row, column=write_column, value=value) + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int = 1, *args, **kwargs) -> Workbook: + logger.debug(self.pydant_obj) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) return workbook diff --git a/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py b/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py index 95f8570..118beed 100644 --- a/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py +++ b/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py @@ -1,31 +1,46 @@ +from __future__ import annotations import logging from pathlib import Path -from typing import Generator +from pprint import pformat +from typing import Generator, TYPE_CHECKING from openpyxl import Workbook from openpyxl.styles import Alignment from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter from tools import flatten_list +if TYPE_CHECKING: + from backend.db.models import ProcedureType logger = logging.getLogger(f"submissions.{__name__}") class PCRInfoWriter(DefaultKEYVALUEWriter): - default_range_dict = [dict( - start_row=1, - end_row=24, - key_column=1, - value_column=2, - sheet="Results" - )] + start_row = 1 - def write_to_workbook(self, workbook: Workbook) -> Workbook: - worksheet = workbook[f"{self.proceduretype.name} Results"] - for key, value in self.fill_dictionary['result'].items(): - # logger.debug(f"Filling in {key} with {value}") - worksheet.cell(value['location']['row'], value['location']['key_column'], value=key.replace("_", " ").title()) - worksheet.cell(value['location']['row'], value['location']['value_column'], value=value['value']) + def __init__(self, pydant_obj, proceduretype: "ProcedureType" | None = None, *args, **kwargs): + super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs) + self.fill_dictionary = self.pydant_obj.improved_dict()['result'] + logger.debug(pformat(self.fill_dictionary)) + + def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, + start_row: int | None = None, *args, **kwargs) -> Workbook: + workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.proceduretype.name} Results") + # if not start_row: + # try: + # start_row = self.__class__.start_row + # except AttributeError as e: + # logger.error(f"Couldn't get start row due to {e}") + # start_row = 1 + # # worksheet = workbook[f"{self.proceduretype.name} Results"] + # self.worksheet = workbook.create_sheet(f"{self.proceduretype.name} Results") + # self.worksheet = self.prewrite(self.worksheet, start_row=start_row) + # # self.start_row = self.delineate_start_row(start_row=start_row) + # # self.end_row = self.delineate_end_row(start_row=start_row) + # # for key, value in self.fill_dictionary['result'].items(): + # # # logger.debug(f"Filling in {key} with {value}") + # # self.worksheet.cell(value['location']['row'], value['location']['key_column'], value=key.replace("_", " ").title()) + # # self.worksheet.cell(value['location']['row'], value['location']['value_column'], value=value['value']) return workbook @@ -33,7 +48,7 @@ class PCRSampleWriter(DefaultTABLEWriter): def write_to_workbook(self, workbook: Workbook) -> Workbook: worksheet = workbook[f"{self.proceduretype.name} Results"] - header_row = self.proceduretype.allowed_result_methods['PCR']['sample']['header_row'] + header_row = self.proceduretype.allowed_result_methods['PCR']['sample']['start_row'] proto_columns = [(1, "sample"), (2, "target")] columns = [] for iii, header in enumerate(self.column_headers, start=3): diff --git a/src/submissions/backend/managers/__init__.py b/src/submissions/backend/managers/__init__.py index 4b30370..e0b53e8 100644 --- a/src/submissions/backend/managers/__init__.py +++ b/src/submissions/backend/managers/__init__.py @@ -14,7 +14,6 @@ class DefaultManager(object): def __init__(self, parent, input_object: Path | str | None = None): logger.debug(f"FName before correction: {type(input_object)}") - # if input_object != "no_file": self.parent = parent match input_object: case str(): diff --git a/src/submissions/backend/managers/clientsubmissions.py b/src/submissions/backend/managers/clientsubmissions.py index ecaa508..8345993 100644 --- a/src/submissions/backend/managers/clientsubmissions.py +++ b/src/submissions/backend/managers/clientsubmissions.py @@ -1,16 +1,14 @@ from __future__ import annotations import logging -from io import BytesIO +import sys from typing import TYPE_CHECKING from pathlib import Path - from openpyxl.reader.excel import load_workbook from openpyxl.workbook import Workbook from backend.validators import RSLNamer from backend.managers import DefaultManager from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser from backend.excel.writers.clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter - if TYPE_CHECKING: from backend.db.models import SubmissionType @@ -42,24 +40,26 @@ class DefaultClientSubmissionManager(DefaultManager): def to_pydantic(self): self.info_parser = ClientSubmissionInfoParser(filepath=self.input_object, submissiontype=self.submissiontype) self.sample_parser = ClientSubmissionSampleParser(filepath=self.input_object, - submissiontype=self.submissiontype) - logger.debug(f"Info Parser range dict: {self.info_parser.range_dict}") + submissiontype=self.submissiontype, + start_row=self.info_parser.end_row) + logger.debug(self.sample_parser.__dict__) self.clientsubmission = self.info_parser.to_pydantic() - + self.clientsubmission.full_batch_size = self.sample_parser.end_row - self.sample_parser.start_row self.clientsubmission.sample = self.sample_parser.to_pydantic() + return self.clientsubmission # def to_pydantic(self): # self.clientsubmission = self.info_parser.to_pydantic() # self.clientsubmission.sample = self.sample_parser.to_pydantic() - def write(self): - workbook: Workbook = load_workbook(BytesIO(self.submissiontype.template_file)) + def write(self, workbook: Workbook) -> Workbook: + # workbook: Workbook = load_workbook(BytesIO(self.submissiontype.template_file)) + self.info_writer = ClientSubmissionInfoWriter(pydant_obj=self.pyd) assert isinstance(self.info_writer, ClientSubmissionInfoWriter) logger.debug("Attempting write.") workbook = self.info_writer.write_to_workbook(workbook) self.sample_writer = ClientSubmissionSampleWriter(pydant_obj=self.pyd) - workbook = self.sample_writer.write_to_workbook(workbook) - # workbook.save(output_path) + workbook = self.sample_writer.write_to_workbook(workbook, start_row=self.info_writer.worksheet.max_row + 1) return workbook \ No newline at end of file diff --git a/src/submissions/backend/managers/procedures.py b/src/submissions/backend/managers/procedures.py index d59c349..f6027c4 100644 --- a/src/submissions/backend/managers/procedures.py +++ b/src/submissions/backend/managers/procedures.py @@ -56,41 +56,42 @@ class DefaultProcedureManager(DefaultManager): self.samples = self.sample_parser.to_pydantic() self.equipment = self.equipment_parser.to_pydantic() - def write(self, worksheet_only: bool=False) -> Workbook: - workbook = load_workbook(BytesIO(self.proceduretype.template_file)) + def write(self, workbook: Workbook) -> Workbook: + # workbook = load_workbook(BytesIO(self.proceduretype.template_file)) try: info_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}InfoWriter") except AttributeError: info_writer = procedure_writers.ProcedureInfoWriter - self.info_writer = info_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.info_map) + self.info_writer = info_writer(pydant_obj=self.pyd) workbook = self.info_writer.write_to_workbook(workbook) try: reagent_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}ReagentWriter") except AttributeError: reagent_writer = procedure_writers.ProcedureReagentWriter - self.reagent_writer = reagent_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.reagent_map) + self.reagent_writer = reagent_writer(pydant_obj=self.pyd) workbook = self.reagent_writer.write_to_workbook(workbook) try: equipment_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}EquipmentWriter") except AttributeError: equipment_writer = procedure_writers.ProcedureEquipmentWriter - self.equipment_writer = equipment_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.equipment_map) + self.equipment_writer = equipment_writer(pydant_obj=self.pyd) workbook = self.equipment_writer.write_to_workbook(workbook) try: sample_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}SampleWriter") except AttributeError: sample_writer = procedure_writers.ProcedureSampleWriter - self.sample_writer = sample_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.sample_map) + self.sample_writer = sample_writer(pydant_obj=self.pyd) workbook = self.sample_writer.write_to_workbook(workbook) - # logger.debug(self.pyd.result) - # TODO: Find way to group results by result_type. + # # logger.debug(self.pyd.result) + # # TODO: Find way to group results by result_type. for result in self.pyd.result: + logger.debug(f"Writing {result.result_type}") Writer = getattr(results_writers, f"{result.result_type}InfoWriter") res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype) workbook = res_info_writer.write_to_workbook(workbook=workbook) - # sample_results = [sample.result for sample in self.pyd.sample] - # logger.debug(pformat(self.pyd.sample_results)) - Writer = getattr(results_writers, "PCRSampleWriter") - res_sample_writer = Writer(pydant_obj=self.pyd.sample_results, proceduretype=self.proceduretype) - workbook = res_sample_writer.write_to_workbook(workbook=workbook) + # # sample_results = [sample.result for sample in self.pyd.sample] + # # logger.debug(pformat(self.pyd.sample_results)) + # Writer = getattr(results_writers, "PCRSampleWriter") + # res_sample_writer = Writer(pydant_obj=self.pyd.sample_results, proceduretype=self.proceduretype) + # workbook = res_sample_writer.write_to_workbook(workbook=workbook) return workbook diff --git a/src/submissions/backend/managers/results/pcr_results_manager.py b/src/submissions/backend/managers/results/pcr_results_manager.py index ee42cca..fe81122 100644 --- a/src/submissions/backend/managers/results/pcr_results_manager.py +++ b/src/submissions/backend/managers/results/pcr_results_manager.py @@ -26,7 +26,7 @@ class PCRManager(DefaultResultsManager): def parse(self): self.info_parser = PCRInfoParser(filepath=self.fname, procedure=self.procedure) - self.sample_parser = PCRSampleParser(filepath=self.fname, procedure=self.procedure) + self.sample_parser = PCRSampleParser(filepath=self.fname, procedure=self.procedure, start_row=self.info_parser.end_row) def write(self): workbook = load_workbook(BytesIO(self.procedure.proceduretype.template_file)) diff --git a/src/submissions/backend/managers/runs.py b/src/submissions/backend/managers/runs.py index eb37b47..99b8727 100644 --- a/src/submissions/backend/managers/runs.py +++ b/src/submissions/backend/managers/runs.py @@ -18,13 +18,14 @@ class DefaultRunManager(DefaultManager): from backend.managers import DefaultClientSubmissionManager, DefaultProcedureManager logger.debug(f"Initializing write") clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype) - workbook = clientsubmission.write() + workbook = Workbook() + workbook = clientsubmission.write(workbook=workbook) for procedure in self.pyd.procedure: - # logger.debug(f"Running procedure: {pformat(procedure.__dict__)}") + # # logger.debug(f"Running procedure: {pformat(procedure.__dict__)}") procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure) - wb: Workbook = procedure.write() - for sheetname in wb.sheetnames: - source_sheet = wb[sheetname] - ws = workbook.create_sheet(sheetname) - copy_xl_sheet(source_sheet, ws) + workbook: Workbook = procedure.write(workbook=workbook) + # for sheetname in wb.sheetnames: + # source_sheet = wb[sheetname] + # ws = workbook.create_sheet(sheetname) + # copy_xl_sheet(source_sheet, ws) return workbook diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py index 5c82a5f..7322732 100644 --- a/src/submissions/backend/validators/__init__.py +++ b/src/submissions/backend/validators/__init__.py @@ -43,22 +43,28 @@ class ClientSubmissionNamer(DefaultNamer): logger.warning(f"Getting submissiontype from file properties failed, falling back on preparse.\nDepending on excel structure this might yield an incorrect submissiontype") sub_type = self.get_subtype_from_preparse() if not sub_type: - logger.warning(f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on excel structure this might yield an incorrect submissiontype") + logger.warning(f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on file name this might yield an incorrect submissiontype") sub_type = self.get_subtype_from_regex() + if not sub_type: + logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.") + sub_type = SubmissionType.query(name="Test") + logger.debug(f"Submission Type: {sub_type}") + sys.exit() return sub_type - def get_subtype_from_regex(self): + def get_subtype_from_regex(self) -> SubmissionType: regex = SubmissionType.regex m = regex.search(self.filepath.__str__()) try: sub_type = m.lastgroup + sub_type = SubmissionType.query(name=sub_type) except AttributeError as e: sub_type = None logger.critical(f"No procedure type found or procedure type found!: {e}") return sub_type - def get_subtype_from_preparse(self): + def get_subtype_from_preparse(self) -> SubmissionType: from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser parser = ClientSubmissionInfoParser(self.filepath) sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None) @@ -67,7 +73,7 @@ class ClientSubmissionNamer(DefaultNamer): sub_type = None return sub_type - def get_subtype_from_properties(self): + def get_subtype_from_properties(self) -> SubmissionType: wb = load_workbook(self.filepath) # NOTE: Gets first category in the metadata. categories = wb.properties.category.split(";") @@ -238,11 +244,13 @@ class RSLNamer(object): today = datetime.now() if isinstance(today, str): today = datetime.strptime(today, "%Y-%m-%d") - if "name" in data.keys(): - plate_number = data['name'].split("-")[-1][0] - else: - previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype']) - plate_number = len(previous) + 1 + # if "name" in data.keys(): + # logger.debug(f"Found name: {data['name']}") + # plate_number = data['name'].split("-")[-1][0] + # else: + previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype']) + plate_number = len(previous) + 1 + logger.debug(f"Using plate number: {plate_number}") return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}" @classmethod diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 713a693..cbfaa2a 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -2,21 +2,19 @@ Contains pydantic models and accompanying validators """ from __future__ import annotations -import uuid, re, logging, csv, sys, string -from pydantic import BaseModel, field_validator, Field, model_validator, PrivateAttr +import re, logging, csv, sys, string +from pydantic import BaseModel, field_validator, Field, model_validator from datetime import date, datetime, timedelta from dateutil.parser import parse from dateutil.parser import ParserError from typing import List, Tuple, Literal from types import GeneratorType - -import backend from . import RSLNamer from pathlib import Path -from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone +from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list from backend.db import models from backend.db.models import * -from sqlalchemy.exc import StatementError, IntegrityError +from sqlalchemy.exc import StatementError from sqlalchemy.orm.properties import ColumnProperty from sqlalchemy.orm.relationships import _RelationshipDeclared from sqlalchemy.orm.attributes import InstrumentedAttribute @@ -26,9 +24,9 @@ logger = logging.getLogger(f"submission.{__name__}") class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): - _sql_object: ClassVar = None - # _misc_info: dict|None = None + _sql_object: ClassVar = None + key_value_order: ClassVar = [] @model_validator(mode="before") @classmethod @@ -37,7 +35,8 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): output = {} try: items = data.items() - except AttributeError: + except AttributeError as e: + logger.error(f"Could not prevalidate {cls.__name__} due to {e}") return data for key, value in items: new_key = key.replace("_", "") @@ -69,10 +68,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): def __init__(self, **data): # NOTE: Grab the sql model for validation purposes. self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", "")) - # try: - # self.template_file = self.__class__._sql_object.template_file - # except AttributeError: - # pass super().__init__(**data) def filter_field(self, key: str) -> Any: @@ -111,13 +106,11 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): output = {k: getattr(self, k) for k in fields} else: output = {k: self.filter_field(k) for k in fields} - if hasattr(self, "misc_info") and "info_placement" in self.misc_info: - for k, v in output.items(): - try: - output[k]['location'] = [item['location'] for item in self.misc_info['info_placement'] if - item['name'] == k] - except (TypeError, KeyError): - continue + if "misc_info" in output.keys(): + for k, v in output['misc_info'].items(): + if k not in output.keys(): + output[k] = v + del output['misc_info'] return output def to_sql(self): @@ -129,6 +122,19 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}") return sql + @property + def fields(self): + output = [] + for k, v in self.improved_dict().items(): + match v: + case str() | int() | float() | datetime() | date(): + output.append(k) + case x if issubclass(v.__class__, PydBaseClass): + output.append(k) + case _: + continue + return list(set(output)) + class PydReagent(PydBaseClass): lot: str | None @@ -496,7 +502,7 @@ class PydEquipment(PydBaseClass): return {k: getattr(self, k) for k in fields} -class PydRun(PydBaseClass, extra='allow'): +class PydRun(PydBaseClass): #, extra='allow'): clientsubmission: PydClientSubmission | None = Field(default=None) rsl_plate_number: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) @@ -1154,65 +1160,6 @@ class PydEquipmentRole(BaseModel): return RoleComboBox(parent=parent, role=self, used=used) -# class PydPCRControl(BaseModel): -# -# name: str -# subtype: str -# target: str -# ct: float -# reagent_lot: str -# submitted_date: datetime #: Date submitted to Robotics -# procedure_id: int -# controltype_name: str -# -# @report_result -# def to_sql(self): -# report = Report -# instance = PCRControl.query(name=self.name) -# if not instance: -# instance = PCRControl() -# for key in self.model_fields: -# field_value = self.__getattribute__(key) -# if instance.__getattribute__(key) != field_value: -# instance.__setattr__(key, field_value) -# return instance, report -# -# -# class PydIridaControl(BaseModel, extra='ignore'): -# -# name: str -# contains: list | dict #: unstructured hashes in contains.tsv for each organism -# matches: list | dict #: unstructured hashes in matches.tsv for each organism -# kraken: list | dict #: unstructured output from kraken_report -# subtype: Literal["ATCC49226", "ATCC49619", "EN-NOS", "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"] -# refseq_version: str #: version of refseq used in fastq parsing -# kraken2_version: str -# kraken2_db_version: str -# sample_id: int -# submitted_date: datetime #: Date submitted to Robotics -# procedure_id: int -# controltype_name: str -# -# @field_validator("refseq_version", "kraken2_version", "kraken2_db_version", mode='before') -# @classmethod -# def enforce_string(cls, value): -# if not value: -# value = "" -# return value -# -# @report_result -# def to_sql(self): -# report = Report() -# instance = IridaControl.query(name=self.name) -# if not instance: -# instance = IridaControl() -# for key in self.model_fields: -# field_value = self.__getattribute__(key) -# if instance.__getattribute__(key) != field_value: -# instance.__setattr__(key, field_value) -# return instance, report - - class PydProcess(PydBaseClass, extra="allow"): name: str version: str = Field(default="1") @@ -1404,6 +1351,27 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): value = None return value + @property + def rows_columns_count(self) -> tuple[int, int]: + try: + proc: ProcedureType = Procedure.query(name=self.name).proceduretype + except AttributeError as e: + logger.error(f"Can't get rows, columns due to {e}") + return 0, 0 + return proc.plate_rows, proc.plate_columns + + @property + def max_sample_rank(self) -> int: + rows, columns = self.rows_columns_count + output = rows * columns + if output > 0: + return output + else: + try: + return max([item.procedure_rank for item in self.sample]) + except TypeError: + return len(self.sample) + def update_kittype_reagentroles(self, kittype: str | KitType): if kittype == self.__class__.model_fields['kittype'].default['value']: return @@ -1471,7 +1439,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): sample.well_id = sample_dict['sample_id'] sample.row = row sample.column = column - sample.plate_rank = sample_dict['index'] + sample.procedure_rank = sample_dict['index'] logger.debug(f"Sample of interest: {sample.improved_dict()}") # logger.debug(f"Updated samples:\n{pformat(self.sample)}") @@ -1495,7 +1463,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): reg = reagent.to_sql() reg.save() - def to_sql(self, new: bool=False): + def to_sql(self, new: bool = False): from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation if new: sql = Procedure() @@ -1514,8 +1482,9 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): sql.repeat = self.repeat if sql.repeat: regex = re.compile(r".*\dR\d$") - repeats = [item for item in self.run.procedure if self.repeat_of in item.name and bool(regex.match(item.name))] - sql.name = f"{self.repeat_of}R{str(len(repeats)+1)}" + repeats = [item for item in self.run.procedure if + self.repeat_of in item.name and bool(regex.match(item.name))] + sql.name = f"{self.repeat_of}R{str(len(repeats) + 1)}" sql.repeat_of = self.repeat_of sql.started_date = datetime.now() if self.run: @@ -1572,7 +1541,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): logger.debug(f"sample {sample_sql} found in {sql.run.sample}") if sample_sql not in sql.sample: proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, - row=sample.row, column=sample.column, plate_rank=sample.plate_rank) + row=sample.row, column=sample.column, + procedure_rank=sample.procedure_rank) if self.kittype['value'] not in ["NA", None, ""]: kittype = KitType.query(name=self.kittype['value'], limit=1) if kittype: @@ -1592,11 +1562,22 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): class PydClientSubmission(PydBaseClass): # sql_object: ClassVar = ClientSubmission + key_value_order = ["submitter_plate_id", + "submitted_date", + "client_lab", + "contact", + "contact_email", + "cost_centre", + "submission_type", + "sample_count", + "submission_category"] + filepath: Path | None = Field(default=None) submissiontype: dict | None submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True) clientlab: dict | None sample_count: dict | None + full_batch_size: int | dict = Field(default=0) submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True) cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) @@ -1707,6 +1688,14 @@ class PydClientSubmission(PydBaseClass): value = dict(value=value, missing=True) return value + @field_validator("full_batch_size") + @classmethod + def dict_to_int(cls, value): + if isinstance(value, dict): + value = value['value'] + value = int(value) + return value + def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None): """ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget @@ -1721,6 +1710,7 @@ class PydClientSubmission(PydBaseClass): """ from frontend.widgets.submission_widget import ClientSubmissionFormWidget if not samples: + # samples = [sample for sample in self.sample if sample.sample_id.lower() not in ["", "blank"]] samples = self.sample return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable) @@ -1728,9 +1718,18 @@ class PydClientSubmission(PydBaseClass): sql = super().to_sql() assert not any([isinstance(item, PydSample) for item in sql.sample]) sql.sample = [] - if "info_placement" not in sql._misc_info: - sql._misc_info['info_placement'] = [] - info_placement = [] + # if "info_placement" not in sql._misc_info: + # sql._misc_info['info_placement'] = [] + # info_placement = [] + logger.debug(f"PYD Submission type: {self.submissiontype}") + logger.debug(f"SQL Submission Type: {sql.submissiontype}") + if not sql.submissiontype: + sql.submissiontype = SubmissionType.query(name=self.submissiontype['value']) + match sql.submissiontype: + case SubmissionType(): + pass + case _: + sql.submissiontype = SubmissionType.query(name="Test") for k in list(self.model_fields.keys()) + list(self.model_extra.keys()): logger.debug(f"Running {k}") attribute = getattr(self, k) @@ -1738,41 +1737,52 @@ class PydClientSubmission(PydBaseClass): case "filepath": sql._misc_info[k] = attribute.__str__() continue - # case "sample": - # sample = [item.to_sql() for item in self.sample] - # logger.debug(sample) - # for s in sample: - # logger.debug(f"adding {s}") - # sql.add_sample(sample=s) case _: pass - logger.debug(f"Setting {k} to {attribute}") - if isinstance(attribute, dict): - if "location" in attribute: - info_placement.append(dict(name=k, location=attribute['location'])) - else: - info_placement.append(dict(name=k, location=None)) - # max_row = max([value['location']['row'] for value in info_placement if value]) - sql._misc_info['info_placement'] = info_placement + # logger.debug(f"Setting {k} to {attribute}") + # if isinstance(attribute, dict): + # if "location" in attribute: + # info_placement.append(dict(name=k, location=attribute['location'])) + # else: + # info_placement.append(dict(name=k, location=None)) + # # max_row = max([value['location']['row'] for value in info_placement if value]) + # sql._misc_info['info_placement'] = info_placement return sql - def pad_samples_to_length(self, row_count, column_names): - output_samples = [] - for iii in range(1, row_count + 1): - try: - sample = next((item for item in self.samples if item.submission_rank == iii)) - except StopIteration: - sample = PydSample(sample_id="") - for column in column_names: - setattr(sample, column[0], "") - sample.submission_rank = iii - output_samples.append(sample) - return sorted(output_samples, key=lambda x: x.submission_rank) + @property + def max_sample_rank(self) -> int: + output = self.full_batch_size + if output > 0: + return output + else: + return max([item.submission_rank for item in self.sample]) + + # def pad_samples_to_length(self, row_count, column_names): + # output_samples = [] + # for iii in range(1, row_count + 1): + # try: + # sample = next((item for item in self.samples if item.submission_rank == iii)) + # except StopIteration: + # sample = PydSample(sample_id="") + # for column in column_names: + # setattr(sample, column[0], "") + # sample.submission_rank = iii + # output_samples.append(sample) + # return sorted(output_samples, key=lambda x: x.submission_rank) def improved_dict(self, dictionaries: bool = True) -> dict: output = super().improved_dict(dictionaries=dictionaries) output['sample'] = self.sample - return output + output['client_lab'] = output['clientlab'] + try: + output['contact_email'] = output['contact']['email'] + except TypeError: + pass + return sort_dict_by_list(output, self.key_value_order) + + # @property + # def writable_dict(self): + # output = self.improved_dict() @property def filename_template(self): diff --git a/src/submissions/frontend/widgets/sample_checker.py b/src/submissions/frontend/widgets/sample_checker.py index 0dfc948..368ec6d 100644 --- a/src/submissions/frontend/widgets/sample_checker.py +++ b/src/submissions/frontend/widgets/sample_checker.py @@ -55,8 +55,8 @@ class SampleChecker(QDialog): self.layout.addWidget(self.buttonBox, 11, 9, 1, 1, alignment=Qt.AlignmentFlag.AlignRight) self.setLayout(self.layout) - with open("sample_checker_rendered.html", "w") as f: - f.write(html) + # with open("sample_checker_rendered.html", "w") as f: + # f.write(html) logger.debug(f"HTML sample checker written!") @pyqtSlot(str, str, str) @@ -88,7 +88,7 @@ class SampleChecker(QDialog): def formatted_list(self) -> List[dict]: output = [] for sample in self.samples: - logger.debug(sample) + # logger.debug(sample) s = sample.improved_dict(dictionaries=False) if s['sample_id'] in [item['sample_id'] for item in output]: s['color'] = "red" diff --git a/src/submissions/frontend/widgets/submission_details.py b/src/submissions/frontend/widgets/submission_details.py index e76a169..bb45996 100644 --- a/src/submissions/frontend/widgets/submission_details.py +++ b/src/submissions/frontend/widgets/submission_details.py @@ -66,9 +66,9 @@ class SubmissionDetails(QDialog): html = template.render(**d, css=[css]) self.webview.setHtml(html) self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}") - with open(f"{object.__class__.__name__}_details_rendered.html", "w") as f: + # with open(f"{object.__class__.__name__}_details_rendered.html", "w") as f: # f.write(html) - pass + # pass def activate_export(self) -> None: diff --git a/src/submissions/frontend/widgets/submission_widget.py b/src/submissions/frontend/widgets/submission_widget.py index 0821193..04d6060 100644 --- a/src/submissions/frontend/widgets/submission_widget.py +++ b/src/submissions/frontend/widgets/submission_widget.py @@ -8,24 +8,20 @@ from PyQt6.QtWidgets import ( QComboBox, QDateEdit, QLineEdit, QLabel, QCheckBox, QHBoxLayout, QGridLayout ) from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker - - from .functions import select_open_file, select_save_file import logging from pathlib import Path from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent -from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser -from backend.validators import PydRun, PydReagent, PydClientSubmission, PydSample +from backend.validators import PydReagent, PydClientSubmission, PydSample from backend.db import ( ClientLab, SubmissionType, Reagent, - ReagentRole, KitTypeReagentRoleAssociation, Run + ReagentRole, KitTypeReagentRoleAssociation, Run, ClientSubmission ) from pprint import pformat from .pop_ups import QuestionAsker, AlertPop from .omni_add_edit import AddEdit from typing import List, Tuple from datetime import date - from .sample_checker import SampleChecker logger = logging.getLogger(f"submissions.{__name__}") @@ -368,33 +364,33 @@ class SubmissionFormWidget(QWidget): return report base_submission = self.pyd.to_sql() # NOTE: check output message for issues - try: - trigger = result.results[-1] - code = trigger.code - except IndexError as e: - logger.error(result.results) - logger.error(f"Problem getting error code: {e}") - code = 0 - match code: - # NOTE: code 0: everything is fine. - case 0: - pass - # NOTE: code 1: ask for overwrite - case 1: - dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_number}?", message=trigger.msg) - if dlg.exec(): - # NOTE: Do not add duplicate reagents. - pass - else: - self.app.ctx.database_session.rollback() - report.add_result(Result(msg="Overwrite cancelled", status="Information")) - return report - # NOTE: code 2: No RSL plate number given - case 2: - report.add_result(result) - return report - case _: - pass + # try: + # trigger = result.results[-1] + # code = trigger.code + # except IndexError as e: + # logger.error(result.results) + # logger.error(f"Problem getting error code: {e}") + # code = 0 + # match code: + # # NOTE: code 0: everything is fine. + # case 0: + # pass + # # NOTE: code 1: ask for overwrite + # case 1: + # dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_number}?", message=trigger.msg) + # if dlg.exec(): + # # NOTE: Do not add duplicate reagents. + # pass + # else: + # self.app.ctx.database_session.rollback() + # report.add_result(Result(msg="Overwrite cancelled", status="Information")) + # return report + # # NOTE: code 2: No RSL plate number given + # case 2: + # report.add_result(result) + # return report + # case _: + # pass # NOTE: add reagents to procedure object if base_submission is None: return @@ -517,7 +513,7 @@ class SubmissionFormWidget(QWidget): def set_widget(self, parent: QWidget, key: str, value: dict, submission_type: str | SubmissionType | None = None, - sub_obj: Run | None = None) -> QWidget: + sub_obj: ClientSubmission | None = None) -> QWidget: """ Creates form widget @@ -596,7 +592,7 @@ class SubmissionFormWidget(QWidget): add_widget.addItems(categories) add_widget.setToolTip("Enter procedure category or select from list.") case _: - if key in sub_obj.timestamps: + if key in ClientSubmission.timestamps: add_widget = MyQDateEdit(calendarPopup=True, scrollWidget=parent) # NOTE: sets submitted date based on date found in excel sheet try: @@ -875,6 +871,9 @@ class ClientSubmissionFormWidget(SubmissionFormWidget): if isinstance(sample, PydSample): sample = sample.to_sql() assert not isinstance(sample, PydSample) + if sample.sample_id.lower() in ["", "blank"]: + continue + sample.save() # if sample not in sql.sample: sql.add_sample(sample=sample) logger.debug(pformat(sql.__dict__)) diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 53b57d5..2983645 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -4,6 +4,7 @@ Contains miscellaenous functions used by both frontend and backend. from __future__ import annotations import builtins, importlib, time, logging, re, yaml, sys, os, stat, platform, getpass, json, numpy as np, pandas as pd import itertools +from collections import OrderedDict from datetime import date, datetime, timedelta from json import JSONDecodeError from threading import Thread @@ -467,7 +468,6 @@ def render_details_template(template_name:str, css_in:List[str]|str=[], js_in:Li return template.render(css=css_out, js=js_out, **kwargs) - def convert_well_to_row_column(input_str: str) -> Tuple[int, int]: """ Converts typical alphanumeric (i.e. "A2") to row, column @@ -564,6 +564,19 @@ def list_str_comparator(input_str:str, listy: List[str], mode: Literal["starts_w else: return False +def sort_dict_by_list(dictionary: dict, order_list: list) -> dict: + output = OrderedDict() + for item in order_list: + try: + output[item] = dictionary[item] + except KeyError: + continue + for k, v in dictionary.items(): + if k in output: + continue + output[k] = v + return output + def setup_lookup(func): """