This commit is contained in:
lwark
2025-07-11 13:25:04 -05:00
parent 2386c7a8ff
commit 02c88256f2
7 changed files with 178 additions and 25 deletions

View File

@@ -596,6 +596,7 @@ class BaseClass(Base):
return output_date
def details_dict(self, **kwargs):
relevant = {k: v for k, v in self.__class__.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)}
output = {}
@@ -618,14 +619,16 @@ class BaseClass(Base):
output[k.strip("_")] = value
return output
def to_pydantic(self, **kwargs):
def to_pydantic(self, pyd_model_name:str|None=None, **kwargs):
from backend.validators import pydant
if not pyd_model_name:
pyd_model_name = f"Pyd{self.__class__.__name__}"
logger.debug(f"Looking for pydant model {pyd_model_name}")
try:
pyd = getattr(pydant, pyd_model_name)
except AttributeError:
raise AttributeError(f"Could not get pydantic class {pyd_model_name}")
logger.debug(f"Kwargs: {kwargs}")
return pyd(**self.details_dict(**kwargs))
def show_details(self, obj):

View File

@@ -1212,7 +1212,7 @@ class ProcedureType(BaseClass):
self.save()
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
"""
Make a map of all locations for tips or equipment.
@@ -1524,6 +1524,7 @@ class Procedure(BaseClass):
sample['active'] = False
# output['sample'] = [sample.details_dict() for sample in output['runsampleassociation']]
output['sample'] = active_samples + inactive_samples
logger.debug(f"Procedure samples: \n\n{pformat(output['sample'])}\n\n")
# output['sample'] = [sample.details_dict() for sample in output['sample']]
output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentassociation']]
output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']]
@@ -1538,18 +1539,22 @@ class Procedure(BaseClass):
def to_pydantic(self, **kwargs):
from backend.validators.pydant import PydResults, PydReagent
output = super().to_pydantic()
logger.debug(f"Pydantic output: \n\n{pformat(output.__dict__)}\n\n")
output.kittype = dict(value=output.kittype['name'], missing=False)
output.sample = [item.to_pydantic() for item in output.proceduresampleassociation]
reagents = []
for reagent in output.reagent:
match reagent:
case dict():
reagent['reagentrole'] = next((reagentrole.name for reagentrole in self.kittype.reagentrole if reagentrole in reagent['reagentrole']), None)
reagents.append(PydResults(**reagent))
reagents.append(PydReagent(**reagent))
case PydReagent():
reagents.append(reagent)
case _:
pass
output.reagent = [PydReagent(**item) for item in output.reagent]
# output.reagent = [PydReagent(**item) for item in output.reagent]
output.reagent = reagents
results = []
for result in output.results:
match result:
@@ -2438,6 +2443,9 @@ class ProcedureEquipmentAssociation(BaseClass):
equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment
tips_id = Column(INTEGER, ForeignKey("_tips.id", ondelete="SET NULL",
name="SEA_Process_id"))
def __repr__(self) -> str:
try:
return f"<ProcedureEquipmentAssociation({self.procedure.name} & {self.equipment.name})>"
@@ -2472,6 +2480,13 @@ class ProcedureEquipmentAssociation(BaseClass):
def process(self):
return Process.query(id=self.process_id)
@property
def tips(self):
try:
return Tips.query(id=self.tips_id, limit=1)
except AttributeError:
return None
def to_sub_dict(self) -> dict:
"""
This RunEquipmentAssociation as a dictionary
@@ -2495,7 +2510,7 @@ class ProcedureEquipmentAssociation(BaseClass):
PydEquipment: pydantic equipment model
"""
from backend.validators import PydEquipment
return PydEquipment(**self.to_sub_dict())
return PydEquipment(**self.details_dict())
@classmethod
@setup_lookup
@@ -2533,6 +2548,10 @@ class ProcedureEquipmentAssociation(BaseClass):
output.update(relevant)
output['misc_info'] = misc
output['process'] = self.process.details_dict()
try:
output['tips'] = self.tips.details_dict()
except AttributeError:
output['tips'] = None
return output

View File

@@ -2197,8 +2197,20 @@ class ProcedureSampleAssociation(BaseClass):
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']}
output = output['sample'].details_dict()
logger.debug(f"Output: {pformat(output)}")
logger.debug(f"Relevant: {pformat(relevant)}")
# relevant['submission_rank'] = output['misc_info']['submission_rank']
misc = output['misc_info']
output.update(relevant)
output['misc_info'] = misc
output['results'] = [result.details_dict() for result in output['results']]
return output
def to_pydantic(self, **kwargs):
output = super().to_pydantic(pyd_model_name="PydSample")
try:
output.submission_rank = output.misc_info['submission_rank']
except KeyError:
logger.error(output)
return output

View File

@@ -8,11 +8,11 @@ if TYPE_CHECKING:
from backend.db.models import ProcedureType
class DefaultInfoParser(DefaultKEYVALUEParser):
class ProcedureInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict(
start_row=1,
end_row=14,
end_row=6,
key_column=1,
value_column=2,
sheet=""
@@ -31,7 +31,7 @@ class DefaultInfoParser(DefaultKEYVALUEParser):
self._pyd_object = PydProcedure
class DefaultSampleParser(DefaultTABLEParser):
class ProcedureSampleParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=41,
@@ -51,7 +51,7 @@ class DefaultSampleParser(DefaultTABLEParser):
self._pyd_object = PydSample
class DefaultReagentParser(DefaultTABLEParser):
class ProcedureReagentParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=17,
@@ -80,7 +80,7 @@ class DefaultReagentParser(DefaultTABLEParser):
item['reagentrole'] = item['reagent_role']
yield item
class DefaultEquipmentParser(DefaultTABLEParser):
class ProcedureEquipmentParser(DefaultTABLEParser):
default_range_dict = [dict(
header_row=32,

View File

@@ -1,9 +1,15 @@
from __future__ import annotations
import logging
from io import BytesIO
from openpyxl.reader.excel import load_workbook
from openpyxl.workbook import Workbook
from backend.managers import DefaultManager
from typing import TYPE_CHECKING
from pathlib import Path
from backend.excel.parsers import procedure_parsers
from backend.excel.writers import procedure_writers
if TYPE_CHECKING:
from backend.db.models import ProcedureType
@@ -13,6 +19,7 @@ logger = logging.getLogger(f"submissions.{__name__}")
class DefaultProcedureManager(DefaultManager):
def __init__(self, proceduretype: "ProcedureType"|str, parent, input_object: Path | str | None = None):
from backend.db.models import ProcedureType
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype
@@ -23,22 +30,22 @@ class DefaultProcedureManager(DefaultManager):
try:
info_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}InfoParser")
except AttributeError:
info_parser = procedure_parsers.DefaultInfoParser
info_parser = procedure_parsers.ProcedureInfoParser
self.info_parser = info_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
reagent_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}ReagentParser")
except AttributeError:
reagent_parser = procedure_parsers.DefaultReagentParser
reagent_parser = procedure_parsers.ProcedureReagentParser
self.reagent_parser = reagent_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
sample_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}SampleParser")
except AttributeError:
sample_parser = procedure_parsers.DefaultSampleParser
sample_parser = procedure_parsers.ProcedureSampleParser
self.sample_parser = sample_parser(filepath=self.fname, proceduretype=self.proceduretype)
try:
equipment_parser = getattr(procedure_parsers, f"{self.proceduretype.name.replace(' ', '')}EquipmentParser")
except AttributeError:
equipment_parser = procedure_parsers.DefaultEquipmentParser
equipment_parser = procedure_parsers.ProcedureEquipmentParser
self.equipment_parser = equipment_parser(filepath=self.fname, proceduretype=self.proceduretype)
self.to_pydantic()
@@ -47,3 +54,32 @@ class DefaultProcedureManager(DefaultManager):
self.reagents = self.reagent_parser.to_pydantic()
self.samples = self.sample_parser.to_pydantic()
self.equipment = self.equipment_parser.to_pydantic()
def write(self, worksheet_only: bool=False) -> Workbook:
workbook = load_workbook(BytesIO(self.proceduretype.template_file))
try:
info_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}InfoWriter")
except AttributeError:
info_writer = procedure_writers.ProcedureInfoWriter
self.info_writer = info_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.info_map)
workbook = self.info_writer.write_to_workbook(workbook)
try:
reagent_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}ReagentWriter")
except AttributeError:
reagent_writer = procedure_writers.ProcedureReagentWriter
self.reagent_writer = reagent_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.reagent_map)
workbook = self.reagent_writer.write_to_workbook(workbook)
try:
equipment_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}EquipmentWriter")
except AttributeError:
equipment_writer = procedure_writers.ProcedureEquipmentWriter
self.equipment_writer = equipment_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.equipment_map)
workbook = self.equipment_writer.write_to_workbook(workbook)
try:
sample_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}SampleWriter")
except AttributeError:
sample_writer = procedure_writers.ProcedureSampleWriter
self.sample_writer = sample_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.sample_map)
workbook = self.sample_writer.write_to_workbook(workbook)
return workbook

View File

@@ -363,14 +363,14 @@ class PydTips(PydBaseClass):
return assoc, report
class PydEquipment(PydBaseClass, extra='ignore'):
class PydEquipment(PydBaseClass):
asset_number: str
name: str
nickname: str | None
# process: List[dict] | None
process: PydProcess | None
equipmentrole: str | PydEquipmentRole | None
tips: List[PydTips] | None = Field(default=[])
tips: List[PydTips] | PydTips | None = Field(default=[])
@field_validator('equipmentrole', mode='before')
@classmethod
@@ -407,13 +407,27 @@ class PydEquipment(PydBaseClass, extra='ignore'):
@field_validator('tips', mode='before')
@classmethod
def tips_to_pydantic(cls, value):
match value:
case list():
output = []
for tips in value:
if isinstance(tips, Tips):
match tips:
case Tips():
tips = tips.to_pydantic()
case dict():
tips = PydTips(**tips)
case _:
continue
output.append(tips)
case _:
output = value
return output
@field_validator('tips')
@classmethod
def single_out_tips(cls, value, values):
return value
@report_result
def to_sql(self, procedure: Procedure | str = None, kittype: KitType | str = None) -> Tuple[
Equipment, ProcedureEquipmentAssociation]:
@@ -1402,7 +1416,7 @@ class PydEquipmentRole(BaseModel):
# return instance, report
class PydProcess(BaseModel, extra="allow"):
class PydProcess(PydBaseClass, extra="allow"):
name: str
version: str = Field(default="1")
proceduretype: List[str]
@@ -1744,6 +1758,7 @@ class PydClientSubmission(PydBaseClass):
cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
sample: List[PydSample] | None = Field(default=[])
@field_validator("submitted_date", mode="before")
@classmethod
@@ -1869,7 +1884,7 @@ class PydClientSubmission(PydBaseClass):
output_samples = []
for iii in range(1, row_count + 1):
try:
sample = next((item for item in self.model_extra['samples'] if item.submission_rank == iii))
sample = next((item for item in self.samples if item.submission_rank == iii))
except StopIteration:
sample = PydSample(sample_id="")
for column in column_names:
@@ -1878,6 +1893,11 @@ class PydClientSubmission(PydBaseClass):
output_samples.append(sample)
return sorted(output_samples, key=lambda x: x.submission_rank)
def improved_dict(self, dictionaries: bool = True) -> dict:
output = super().improved_dict(dictionaries=dictionaries)
output['sample'] = self.sample
return output
@property
def filename_template(self):
submissiontype = SubmissionType.query(name=self.submissiontype['value'])

View File

@@ -486,6 +486,69 @@ def convert_well_to_row_column(input_str: str) -> Tuple[int, int]:
return None, None
return row, column
# Copy a sheet with style, format, layout, ect. from one Excel file to another Excel file
# Please add the ..path\\+\\file.. and ..sheet_name.. according to your desire.
import openpyxl
from copy import copy
def copy_xl_sheet(source_sheet, target_sheet):
copy_cells(source_sheet, target_sheet) # copy all the cel values and styles
copy_sheet_attributes(source_sheet, target_sheet)
def copy_sheet_attributes(source_sheet, target_sheet):
if isinstance(source_sheet, openpyxl.worksheet._read_only.ReadOnlyWorksheet):
return
target_sheet.sheet_format = copy(source_sheet.sheet_format)
target_sheet.sheet_properties = copy(source_sheet.sheet_properties)
target_sheet.merged_cells = copy(source_sheet.merged_cells)
target_sheet.page_margins = copy(source_sheet.page_margins)
target_sheet.freeze_panes = copy(source_sheet.freeze_panes)
# set row dimensions
# So you cannot copy the row_dimensions attribute. Does not work (because of meta data in the attribute I think). So we copy every row's row_dimensions. That seems to work.
for rn in range(len(source_sheet.row_dimensions)):
target_sheet.row_dimensions[rn] = copy(source_sheet.row_dimensions[rn])
if source_sheet.sheet_format.defaultColWidth is None:
print('Unable to copy default column wide')
else:
target_sheet.sheet_format.defaultColWidth = copy(source_sheet.sheet_format.defaultColWidth)
# set specific column width and hidden property
# we cannot copy the entire column_dimensions attribute so we copy selected attributes
for key, value in source_sheet.column_dimensions.items():
target_sheet.column_dimensions[key].min = copy(source_sheet.column_dimensions[key].min) # Excel actually groups multiple columns under 1 key. Use the min max attribute to also group the columns in the targetSheet
target_sheet.column_dimensions[key].max = copy(source_sheet.column_dimensions[key].max) # https://stackoverflow.com/questions/36417278/openpyxl-can-not-read-consecutive-hidden-columns discussed the issue. Note that this is also the case for the width, not onl;y the hidden property
target_sheet.column_dimensions[key].width = copy(source_sheet.column_dimensions[key].width) # set width for every column
target_sheet.column_dimensions[key].hidden = copy(source_sheet.column_dimensions[key].hidden)
def copy_cells(source_sheet, target_sheet):
for r, row in enumerate(source_sheet.iter_rows()):
for c, cell in enumerate(row):
source_cell = cell
if isinstance(source_cell, openpyxl.cell.read_only.EmptyCell):
continue
target_cell = target_sheet.cell(column=c+1, row=r+1)
target_cell._value = source_cell._value
target_cell.data_type = source_cell.data_type
if source_cell.has_style:
target_cell.font = copy(source_cell.font)
target_cell.border = copy(source_cell.border)
target_cell.fill = copy(source_cell.fill)
target_cell.number_format = copy(source_cell.number_format)
target_cell.protection = copy(source_cell.protection)
target_cell.alignment = copy(source_cell.alignment)
if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.hyperlink:
target_cell._hyperlink = copy(source_cell.hyperlink)
if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.comment:
target_cell.comment = copy(source_cell.comment)
def setup_lookup(func):
"""