diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index da7b273..27490b0 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -596,6 +596,7 @@ class BaseClass(Base): return output_date def details_dict(self, **kwargs): + relevant = {k: v for k, v in self.__class__.__dict__.items() if isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)} output = {} @@ -618,14 +619,16 @@ class BaseClass(Base): output[k.strip("_")] = value return output - def to_pydantic(self, **kwargs): + def to_pydantic(self, pyd_model_name:str|None=None, **kwargs): from backend.validators import pydant - pyd_model_name = f"Pyd{self.__class__.__name__}" + if not pyd_model_name: + pyd_model_name = f"Pyd{self.__class__.__name__}" logger.debug(f"Looking for pydant model {pyd_model_name}") try: pyd = getattr(pydant, pyd_model_name) except AttributeError: raise AttributeError(f"Could not get pydantic class {pyd_model_name}") + logger.debug(f"Kwargs: {kwargs}") return pyd(**self.details_dict(**kwargs)) def show_details(self, obj): diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 0ecc093..bbe796c 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -1212,7 +1212,7 @@ class ProcedureType(BaseClass): self.save() -def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: + def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: """ Make a map of all locations for tips or equipment. @@ -1524,6 +1524,7 @@ class Procedure(BaseClass): sample['active'] = False # output['sample'] = [sample.details_dict() for sample in output['runsampleassociation']] output['sample'] = active_samples + inactive_samples + logger.debug(f"Procedure samples: \n\n{pformat(output['sample'])}\n\n") # output['sample'] = [sample.details_dict() for sample in output['sample']] output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentassociation']] output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']] @@ -1538,18 +1539,22 @@ class Procedure(BaseClass): def to_pydantic(self, **kwargs): from backend.validators.pydant import PydResults, PydReagent output = super().to_pydantic() + logger.debug(f"Pydantic output: \n\n{pformat(output.__dict__)}\n\n") output.kittype = dict(value=output.kittype['name'], missing=False) + output.sample = [item.to_pydantic() for item in output.proceduresampleassociation] reagents = [] for reagent in output.reagent: match reagent: case dict(): reagent['reagentrole'] = next((reagentrole.name for reagentrole in self.kittype.reagentrole if reagentrole in reagent['reagentrole']), None) - reagents.append(PydResults(**reagent)) + reagents.append(PydReagent(**reagent)) case PydReagent(): reagents.append(reagent) case _: pass - output.reagent = [PydReagent(**item) for item in output.reagent] + # output.reagent = [PydReagent(**item) for item in output.reagent] + output.reagent = reagents + results = [] for result in output.results: match result: @@ -2438,6 +2443,9 @@ class ProcedureEquipmentAssociation(BaseClass): equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment + tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete="SET NULL", + name="SEA_Process_id")) + def __repr__(self) -> str: try: return f"" @@ -2472,6 +2480,13 @@ class ProcedureEquipmentAssociation(BaseClass): def process(self): return Process.query(id=self.process_id) + @property + def tips(self): + try: + return Tips.query(id=self.tips_id, limit=1) + except AttributeError: + return None + def to_sub_dict(self) -> dict: """ This RunEquipmentAssociation as a dictionary @@ -2495,7 +2510,7 @@ class ProcedureEquipmentAssociation(BaseClass): PydEquipment: pydantic equipment model """ from backend.validators import PydEquipment - return PydEquipment(**self.to_sub_dict()) + return PydEquipment(**self.details_dict()) @classmethod @setup_lookup @@ -2533,6 +2548,10 @@ class ProcedureEquipmentAssociation(BaseClass): output.update(relevant) output['misc_info'] = misc output['process'] = self.process.details_dict() + try: + output['tips'] = self.tips.details_dict() + except AttributeError: + output['tips'] = None return output diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 2281c08..366fc41 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -2197,8 +2197,20 @@ class ProcedureSampleAssociation(BaseClass): # NOTE: Figure out how to merge the misc_info if doing .update instead. relevant = {k: v for k, v in output.items() if k not in ['sample']} output = output['sample'].details_dict() + logger.debug(f"Output: {pformat(output)}") + logger.debug(f"Relevant: {pformat(relevant)}") + # relevant['submission_rank'] = output['misc_info']['submission_rank'] misc = output['misc_info'] output.update(relevant) output['misc_info'] = misc output['results'] = [result.details_dict() for result in output['results']] return output + + def to_pydantic(self, **kwargs): + output = super().to_pydantic(pyd_model_name="PydSample") + try: + output.submission_rank = output.misc_info['submission_rank'] + except KeyError: + logger.error(output) + return output + diff --git a/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py b/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py index 4107001..c3d1ace 100644 --- a/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py +++ b/src/submissions/backend/excel/parsers/procedure_parsers/__init__.py @@ -8,11 +8,11 @@ if TYPE_CHECKING: from backend.db.models import ProcedureType -class DefaultInfoParser(DefaultKEYVALUEParser): +class ProcedureInfoParser(DefaultKEYVALUEParser): default_range_dict = [dict( start_row=1, - end_row=14, + end_row=6, key_column=1, value_column=2, sheet="" @@ -31,7 +31,7 @@ class DefaultInfoParser(DefaultKEYVALUEParser): self._pyd_object = PydProcedure -class DefaultSampleParser(DefaultTABLEParser): +class ProcedureSampleParser(DefaultTABLEParser): default_range_dict = [dict( header_row=41, @@ -51,7 +51,7 @@ class DefaultSampleParser(DefaultTABLEParser): self._pyd_object = PydSample -class DefaultReagentParser(DefaultTABLEParser): +class ProcedureReagentParser(DefaultTABLEParser): default_range_dict = [dict( header_row=17, @@ -80,7 +80,7 @@ class DefaultReagentParser(DefaultTABLEParser): item['reagentrole'] = item['reagent_role'] yield item -class DefaultEquipmentParser(DefaultTABLEParser): +class ProcedureEquipmentParser(DefaultTABLEParser): default_range_dict = [dict( header_row=32, diff --git a/src/submissions/backend/managers/procedures.py b/src/submissions/backend/managers/procedures.py index 6df90f6..ac9ded0 100644 --- a/src/submissions/backend/managers/procedures.py +++ b/src/submissions/backend/managers/procedures.py @@ -1,9 +1,15 @@ from __future__ import annotations import logging +from io import BytesIO + +from openpyxl.reader.excel import load_workbook +from openpyxl.workbook import Workbook + from backend.managers import DefaultManager from typing import TYPE_CHECKING from pathlib import Path from backend.excel.parsers import procedure_parsers +from backend.excel.writers import procedure_writers if TYPE_CHECKING: from backend.db.models import ProcedureType @@ -13,6 +19,7 @@ logger = logging.getLogger(f"submissions.{__name__}") class DefaultProcedureManager(DefaultManager): def __init__(self, proceduretype: "ProcedureType"|str, parent, input_object: Path | str | None = None): + from backend.db.models import ProcedureType if isinstance(proceduretype, str): proceduretype = ProcedureType.query(name=proceduretype) self.proceduretype = proceduretype @@ -23,22 +30,22 @@ class DefaultProcedureManager(DefaultManager): try: info_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}InfoParser") except AttributeError: - info_parser = procedure_parsers.DefaultInfoParser + info_parser = procedure_parsers.ProcedureInfoParser self.info_parser = info_parser(filepath=self.fname, proceduretype=self.proceduretype) try: reagent_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}ReagentParser") except AttributeError: - reagent_parser = procedure_parsers.DefaultReagentParser + reagent_parser = procedure_parsers.ProcedureReagentParser self.reagent_parser = reagent_parser(filepath=self.fname, proceduretype=self.proceduretype) try: sample_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}SampleParser") except AttributeError: - sample_parser = procedure_parsers.DefaultSampleParser + sample_parser = procedure_parsers.ProcedureSampleParser self.sample_parser = sample_parser(filepath=self.fname, proceduretype=self.proceduretype) try: equipment_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}EquipmentParser") except AttributeError: - equipment_parser = procedure_parsers.DefaultEquipmentParser + equipment_parser = procedure_parsers.ProcedureEquipmentParser self.equipment_parser = equipment_parser(filepath=self.fname, proceduretype=self.proceduretype) self.to_pydantic() @@ -46,4 +53,33 @@ class DefaultProcedureManager(DefaultManager): self.procedure = self.info_parser.to_pydantic() self.reagents = self.reagent_parser.to_pydantic() self.samples = self.sample_parser.to_pydantic() - self.equipment = self.equipment_parser.to_pydantic() \ No newline at end of file + self.equipment = self.equipment_parser.to_pydantic() + + def write(self, worksheet_only: bool=False) -> Workbook: + workbook = load_workbook(BytesIO(self.proceduretype.template_file)) + try: + info_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}InfoWriter") + except AttributeError: + info_writer = procedure_writers.ProcedureInfoWriter + self.info_writer = info_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.info_map) + workbook = self.info_writer.write_to_workbook(workbook) + try: + reagent_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}ReagentWriter") + except AttributeError: + reagent_writer = procedure_writers.ProcedureReagentWriter + self.reagent_writer = reagent_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.reagent_map) + workbook = self.reagent_writer.write_to_workbook(workbook) + try: + equipment_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}EquipmentWriter") + except AttributeError: + equipment_writer = procedure_writers.ProcedureEquipmentWriter + self.equipment_writer = equipment_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.equipment_map) + workbook = self.equipment_writer.write_to_workbook(workbook) + try: + sample_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}SampleWriter") + except AttributeError: + sample_writer = procedure_writers.ProcedureSampleWriter + self.sample_writer = sample_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.sample_map) + workbook = self.sample_writer.write_to_workbook(workbook) + return workbook + diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 45380f6..5b9e457 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -363,14 +363,14 @@ class PydTips(PydBaseClass): return assoc, report -class PydEquipment(PydBaseClass, extra='ignore'): +class PydEquipment(PydBaseClass): asset_number: str name: str nickname: str | None # process: List[dict] | None process: PydProcess | None equipmentrole: str | PydEquipmentRole | None - tips: List[PydTips] | None = Field(default=[]) + tips: List[PydTips] | PydTips | None = Field(default=[]) @field_validator('equipmentrole', mode='before') @classmethod @@ -407,13 +407,27 @@ class PydEquipment(PydBaseClass, extra='ignore'): @field_validator('tips', mode='before') @classmethod def tips_to_pydantic(cls, value): - output = [] - for tips in value: - if isinstance(tips, Tips): - tips = tips.to_pydantic() - output.append(tips) + match value: + case list(): + output = [] + for tips in value: + match tips: + case Tips(): + tips = tips.to_pydantic() + case dict(): + tips = PydTips(**tips) + case _: + continue + output.append(tips) + case _: + output = value return output + @field_validator('tips') + @classmethod + def single_out_tips(cls, value, values): + return value + @report_result def to_sql(self, procedure: Procedure | str = None, kittype: KitType | str = None) -> Tuple[ Equipment, ProcedureEquipmentAssociation]: @@ -1402,7 +1416,7 @@ class PydEquipmentRole(BaseModel): # return instance, report -class PydProcess(BaseModel, extra="allow"): +class PydProcess(PydBaseClass, extra="allow"): name: str version: str = Field(default="1") proceduretype: List[str] @@ -1744,6 +1758,7 @@ class PydClientSubmission(PydBaseClass): cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) + sample: List[PydSample] | None = Field(default=[]) @field_validator("submitted_date", mode="before") @classmethod @@ -1869,7 +1884,7 @@ class PydClientSubmission(PydBaseClass): output_samples = [] for iii in range(1, row_count + 1): try: - sample = next((item for item in self.model_extra['samples'] if item.submission_rank == iii)) + sample = next((item for item in self.samples if item.submission_rank == iii)) except StopIteration: sample = PydSample(sample_id="") for column in column_names: @@ -1878,6 +1893,11 @@ class PydClientSubmission(PydBaseClass): output_samples.append(sample) return sorted(output_samples, key=lambda x: x.submission_rank) + def improved_dict(self, dictionaries: bool = True) -> dict: + output = super().improved_dict(dictionaries=dictionaries) + output['sample'] = self.sample + return output + @property def filename_template(self): submissiontype = SubmissionType.query(name=self.submissiontype['value']) diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 97571b1..6b86716 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -486,6 +486,69 @@ def convert_well_to_row_column(input_str: str) -> Tuple[int, int]: return None, None return row, column +# Copy a sheet with style, format, layout, ect. from one Excel file to another Excel file +# Please add the ..path\\+\\file.. and ..sheet_name.. according to your desire. +import openpyxl +from copy import copy + + +def copy_xl_sheet(source_sheet, target_sheet): + copy_cells(source_sheet, target_sheet) # copy all the cel values and styles + copy_sheet_attributes(source_sheet, target_sheet) + + +def copy_sheet_attributes(source_sheet, target_sheet): + if isinstance(source_sheet, openpyxl.worksheet._read_only.ReadOnlyWorksheet): + return + target_sheet.sheet_format = copy(source_sheet.sheet_format) + target_sheet.sheet_properties = copy(source_sheet.sheet_properties) + target_sheet.merged_cells = copy(source_sheet.merged_cells) + target_sheet.page_margins = copy(source_sheet.page_margins) + target_sheet.freeze_panes = copy(source_sheet.freeze_panes) + + # set row dimensions + # So you cannot copy the row_dimensions attribute. Does not work (because of meta data in the attribute I think). So we copy every row's row_dimensions. That seems to work. + for rn in range(len(source_sheet.row_dimensions)): + target_sheet.row_dimensions[rn] = copy(source_sheet.row_dimensions[rn]) + + if source_sheet.sheet_format.defaultColWidth is None: + print('Unable to copy default column wide') + else: + target_sheet.sheet_format.defaultColWidth = copy(source_sheet.sheet_format.defaultColWidth) + + # set specific column width and hidden property + # we cannot copy the entire column_dimensions attribute so we copy selected attributes + for key, value in source_sheet.column_dimensions.items(): + target_sheet.column_dimensions[key].min = copy(source_sheet.column_dimensions[key].min) # Excel actually groups multiple columns under 1 key. Use the min max attribute to also group the columns in the targetSheet + target_sheet.column_dimensions[key].max = copy(source_sheet.column_dimensions[key].max) # https://stackoverflow.com/questions/36417278/openpyxl-can-not-read-consecutive-hidden-columns discussed the issue. Note that this is also the case for the width, not onl;y the hidden property + target_sheet.column_dimensions[key].width = copy(source_sheet.column_dimensions[key].width) # set width for every column + target_sheet.column_dimensions[key].hidden = copy(source_sheet.column_dimensions[key].hidden) + + +def copy_cells(source_sheet, target_sheet): + for r, row in enumerate(source_sheet.iter_rows()): + for c, cell in enumerate(row): + source_cell = cell + if isinstance(source_cell, openpyxl.cell.read_only.EmptyCell): + continue + target_cell = target_sheet.cell(column=c+1, row=r+1) + + target_cell._value = source_cell._value + target_cell.data_type = source_cell.data_type + + if source_cell.has_style: + target_cell.font = copy(source_cell.font) + target_cell.border = copy(source_cell.border) + target_cell.fill = copy(source_cell.fill) + target_cell.number_format = copy(source_cell.number_format) + target_cell.protection = copy(source_cell.protection) + target_cell.alignment = copy(source_cell.alignment) + + if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.hyperlink: + target_cell._hyperlink = copy(source_cell.hyperlink) + + if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.comment: + target_cell.comment = copy(source_cell.comment) def setup_lookup(func): """