Writer improvements.

This commit is contained in:
lwark
2025-07-24 11:16:37 -05:00
parent 1463cf9d2d
commit 2c6a8c4cc7
16 changed files with 85 additions and 32 deletions

View File

@@ -644,7 +644,7 @@ class BaseClass(Base):
return output
@classmethod
def clean_details_dict(cls, dictionary):
def clean_details_for_render(cls, dictionary):
output = {}
for k, value in dictionary.items():
match value:
@@ -668,7 +668,6 @@ class BaseClass(Base):
case _:
pass
output[k] = value
return output

View File

@@ -1572,7 +1572,7 @@ class Procedure(BaseClass):
for reagent in output.reagent:
match reagent:
case dict():
reagent['reagentrole'] = next((reagentrole.name for reagentrole in self.kittype.reagentrole if reagentrole == reagent['reagentrole']), None)
# reagent['reagentrole'] = next((reagentrole.name for reagentrole in self.kittype.reagentrole if reagentrole == reagent['reagentrole']), None)
reagents.append(PydReagent(**reagent))
case PydReagent():
reagents.append(reagent)
@@ -2067,10 +2067,12 @@ class ProcedureReagentAssociation(BaseClass):
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['reagent']}
output = output['reagent'].details_dict()
misc = output['misc_info']
output.update(relevant)
output['reagentrole'] = self.reagentrole
output['misc_info'] = misc
# output['results'] = [result.details_dict() for result in output['results']]
logger.debug(f"Output: {pformat(output)}")
return output
def delete(self, **kwargs):

View File

@@ -438,7 +438,10 @@ class Run(BaseClass, LogMixin):
if self._started_date:
return self._started_date
else:
value = min([proc.started_date for proc in self.procedure])
try:
value = min([proc.started_date for proc in self.procedure])
except ValueError:
value = datetime.now()
return value
@started_date.setter
@@ -2191,6 +2194,7 @@ class ProcedureSampleAssociation(BaseClass):
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
row = Column(INTEGER)
column = Column(INTEGER)
plate_rank = Column(INTEGER)
procedure = relationship(Procedure,
back_populates="proceduresampleassociation") #: associated procedure

View File

@@ -78,7 +78,7 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
default_range_dict = [dict(
start_row=2,
end_row=18,
end_row=16,
key_column=1,
value_column=2,
sheet="Sample List"
@@ -117,8 +117,8 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
pyd_name = "PydSample"
default_range_dict = [dict(
header_row=19,
end_row=115,
header_row=18,
end_row=114,
sheet="Sample List"
)]

View File

@@ -61,7 +61,7 @@ class PCRSampleParser(DefaultTABLEParser):
yield {sample: multi}
def to_pydantic(self):
logger.debug(f"running to pydantic")
logger.debug("running to pydantic")
for item in self.parsed_info:
# sample_obj = Sample.query(sample_id=list(item.keys())[0])
# NOTE: Ensure that only samples associated with the procedure are used.
@@ -70,12 +70,15 @@ class PCRSampleParser(DefaultTABLEParser):
(sample for sample in self.procedure.sample if sample.sample_id == list(item.keys())[0]))
except StopIteration:
continue
logger.debug(f"Sample object {sample_obj}")
# logger.debug(f"Sample object {sample_obj}")
assoc = ProcedureSampleAssociation.query(sample=sample_obj, procedure=self.procedure)
if assoc and not isinstance(assoc, list):
output = self._pyd_object(results=list(item.values())[0], parent=assoc)
output.result_type = "PCR"
del output.result['result_type']
try:
del output.result['result_type']
except KeyError:
pass
yield output
else:
continue

View File

@@ -128,6 +128,7 @@ class DefaultTABLEWriter(DefaultWriter):
for column in column_names:
setattr(sample, column[0], "")
sample.submission_rank = iii
sample.plate_rank = iii
# logger.debug(f"Appending {sample.sample_id}")
# logger.debug(f"Iterator now: {[item.submission_rank for item in iterator]}")
output_samples.append(sample)
@@ -137,7 +138,7 @@ class DefaultTABLEWriter(DefaultWriter):
workbook = super().write_to_workbook(workbook=workbook)
for rng in self.range_dict:
list_worksheet = workbook[rng['sheet']]
column_names = [(item.value.lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
for iii, object in enumerate(self.pydant_obj, start=1):
# logger.debug(f"Writing object: {object}")
write_row = rng['header_row'] + iii
@@ -156,6 +157,16 @@ class DefaultTABLEWriter(DefaultWriter):
list_worksheet.cell(row=write_row, column=write_column, value=self.stringify_value(value))
return workbook
@classmethod
def construct_column_names(cls, column_item):
column = column_item.column
match column_item.value:
case str():
value = column_item.value.lower().replace(" ", "_")
case _:
value = column_item.value
return value, column
from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter

View File

@@ -47,15 +47,18 @@ class ClientSubmissionSampleWriter(DefaultTABLEWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook)
# logger.debug(f"\n\nHello from {self.__class__.__name__} with range_dict: {pformat(self.range_dict)}")
for rng in self.range_dict:
list_worksheet = workbook[rng['sheet']]
row_count = self.get_row_count(list_worksheet, rng)
column_names = [(item.value.lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
column_names = [(str(item.value).lower().replace(" ", "_"), item.column) for item in list_worksheet[rng['header_row']] if item.value]
samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names)
# logger.debug(f"Samples: {pformat(samples)}")
for sample in samples:
# logger.debug(f"Writing sample: {sample}")
write_row = rng['header_row'] + sample.submission_rank
# logger.debug(f"Writing sample: {sample} to row {write_row}")
for column in column_names:
# logger.debug(f"At column {column}")
if column[0].lower() in ["well", "row", "column"]:
continue
write_column = column[1]

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import logging
import sys
from pprint import pformat
from openpyxl.workbook import Workbook
@@ -24,7 +25,7 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter):
'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', 'reagentrole',
'results', 'sample', 'tips']
self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in exclude}
logger.debug(pformat(self.fill_dictionary))
# logger.debug(pformat(self.fill_dictionary))
for rng in self.range_dict:
if "sheet" not in rng or rng['sheet'] == "":
rng['sheet'] = f"{pydant_obj.proceduretype.name} Quality"
@@ -79,11 +80,14 @@ class ProcedureSampleWriter(DefaultTABLEWriter):
column_names = [(item.value.lower().replace(" ", "_"), item.column) for item in
list_worksheet[rng['header_row']] if item.value]
samples = self.pad_samples_to_length(row_count=row_count, column_names=column_names)
samples = sorted(samples, key=lambda x: x.plate_rank)
# samples = self.pydant_obj
# logger.debug(f"Samples: {[item.submission_rank for item in samples]}")
for sample in samples:
# logger.debug(f"Writing sample: {sample}")
write_row = rng['header_row'] + sample.submission_rank
if sample.row == 0 or sample.column == 0:
continue
write_row = rng['header_row'] + sample.plate_rank
for column in column_names:
if column[0].lower() in ["well"]:#, "row", "column"]:
continue

View File

@@ -23,7 +23,7 @@ class PCRInfoWriter(DefaultKEYVALUEWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook:
worksheet = workbook[f"{self.proceduretype.name} Results"]
for key, value in self.fill_dictionary['result'].items():
logger.debug(f"Filling in {key} with {value}")
# logger.debug(f"Filling in {key} with {value}")
worksheet.cell(value['location']['row'], value['location']['key_column'], value=key.replace("_", " ").title())
worksheet.cell(value['location']['row'], value['location']['value_column'], value=value['value'])
return workbook
@@ -41,7 +41,7 @@ class PCRSampleWriter(DefaultTABLEWriter):
columns.append((iii, header))
columns = sorted(columns, key=lambda x: x[0])
columns = proto_columns + columns
logger.debug(columns)
# logger.debug(columns)
all_results = flatten_list([[item for item in self.rearrange_results(result)] for result in self.pydant_obj])
if len(all_results) > 0 :
worksheet.cell(row=header_row, column=1, value="Sample")
@@ -58,6 +58,8 @@ class PCRSampleWriter(DefaultTABLEWriter):
@classmethod
def rearrange_results(cls, result) -> Generator[dict, None, None]:
for target, values in result.result.items():
if not isinstance(values, dict):
continue
values['target'] = target
values['sample'] = result.sample_id
yield values
@@ -66,9 +68,12 @@ class PCRSampleWriter(DefaultTABLEWriter):
def column_headers(self):
output = []
for item in self.pydant_obj:
logger.debug(item)
# logger.debug(item)
dicto: dict = item.result
for value in dicto.values():
if not isinstance(value, dict):
# logger.debug(f"Will not include {value} in column headers.")
continue
for key in value.keys():
output.append(key)
return sorted(list(set(output)))

View File

@@ -24,10 +24,10 @@ class DefaultManager(object):
self.input_object = input_object
self.pyd = self.parse()
case x if issubclass(input_object.__class__, PydBaseClass):
logger.debug("Subclass of PydBaseClass")
# logger.debug("Subclass of PydBaseClass")
self.pyd = input_object
case x if issubclass(input_object.__class__, BaseClass):
logger.debug("Subclass of BaseClass")
# logger.debug("Subclass of BaseClass")
self.pyd = input_object.to_pydantic()
case _:
self.input_object = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent))

View File

@@ -82,14 +82,14 @@ class DefaultProcedureManager(DefaultManager):
sample_writer = procedure_writers.ProcedureSampleWriter
self.sample_writer = sample_writer(pydant_obj=self.pyd, range_dict=self.proceduretype.sample_map)
workbook = self.sample_writer.write_to_workbook(workbook)
logger.debug(self.pyd.result)
# logger.debug(self.pyd.result)
# TODO: Find way to group results by result_type.
for result in self.pyd.result:
Writer = getattr(results_writers, f"{result.result_type}InfoWriter")
res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype)
workbook = res_info_writer.write_to_workbook(workbook=workbook)
# sample_results = [sample.result for sample in self.pyd.sample]
logger.debug(pformat(self.pyd.sample_results))
# logger.debug(pformat(self.pyd.sample_results))
Writer = getattr(results_writers, "PCRSampleWriter")
res_sample_writer = Writer(pydant_obj=self.pyd.sample_results, proceduretype=self.proceduretype)
workbook = res_sample_writer.write_to_workbook(workbook=workbook)

View File

@@ -20,7 +20,7 @@ class DefaultRunManager(DefaultManager):
clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype)
workbook = clientsubmission.write()
for procedure in self.pyd.procedure:
logger.debug(f"Running procedure: {pformat(procedure.__dict__)}")
# logger.debug(f"Running procedure: {pformat(procedure.__dict__)}")
procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure)
wb: Workbook = procedure.write()
for sheetname in wb.sheetnames:

View File

@@ -1449,7 +1449,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
def update_samples(self, sample_list: List[dict]):
logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}")
for sample_dict in sample_list:
for iii, sample_dict in enumerate(sample_list, start=1):
if sample_dict['sample_id'].startswith("blank_"):
sample_dict['sample_id'] = ""
row, column = self.proceduretype.ranked_plate[sample_dict['index']]
@@ -1467,11 +1467,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
except StopIteration:
logger.error(f"Couldn't find sample: {pformat(sample_dict)}")
continue
logger.debug(f"Sample of interest: {sample.improved_dict()}")
sample.sample_id = sample_dict['sample_id']
sample.well_id = sample_dict['sample_id']
sample.row = row
sample.column = column
sample.plate_rank = sample_dict['index']
logger.debug(f"Sample of interest: {sample.improved_dict()}")
# logger.debug(f"Updated samples:\n{pformat(self.sample)}")
def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str):
@@ -1502,6 +1503,14 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql = super().to_sql()
logger.debug(f"Initial PYD: {pformat(self.__dict__)}")
# sql.results = [result.to_sql() for result in self.results]
if isinstance(self.name, dict):
sql.name = self.name['value']
else:
sql.name = self.name
if isinstance(self.technician, dict):
sql.technician = self.technician['value']
else:
sql.technician = self.technician
sql.repeat = self.repeat
if sql.repeat:
regex = re.compile(r".*\dR\d$")
@@ -1563,7 +1572,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
logger.debug(f"sample {sample_sql} found in {sql.run.sample}")
if sample_sql not in sql.sample:
proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
row=sample.row, column=sample.column)
row=sample.row, column=sample.column, plate_rank=sample.plate_rank)
if self.kittype['value'] not in ["NA", None, ""]:
kittype = KitType.query(name=self.kittype['value'], limit=1)
if kittype:
@@ -1675,6 +1684,13 @@ class PydClientSubmission(PydBaseClass):
value['value'] = "NA"
return value
@field_validator("comment", mode="before")
@classmethod
def convert_comment_string(cls, value):
if isinstance(value, str):
value = dict(value=value, missing=True)
return value
def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None):
"""
Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget
@@ -1763,6 +1779,8 @@ class PydResults(PydBaseClass, arbitrary_types_allowed=True):
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
case datetime():
pass
case date():
value = datetime.combine(value, datetime.max.time())
case _:
value = datetime.now()
return value

View File

@@ -15,7 +15,7 @@ from PyQt6.QtGui import QContextMenuEvent, QAction
from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QDialog, QGridLayout, QMenu, QDialogButtonBox
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, List
if TYPE_CHECKING:
from backend.db.models import Run, Procedure
@@ -36,9 +36,12 @@ class ProcedureCreation(QDialog):
self.setWindowTitle(f"New {self.proceduretype.name} for {self.run.rsl_plate_number}")
# self.created_procedure = self.proceduretype.construct_dummy_procedure(run=self.run)
self.procedure.update_kittype_reagentroles(kittype=self.procedure.possible_kits[0])
# self.created_procedure.samples = self.run.constuct_sample_dicts_for_proceduretype(proceduretype=self.proceduretype)
# logger.debug(f"Samples to map\n{pformat(self.created_procedure.samples)}")
self.plate_map = self.proceduretype.construct_plate_map(sample_dicts=self.procedure.sample)
self.procedure.update_samples(sample_list=[dict(sample_id=sample.sample_id, index=iii) for iii, sample in
enumerate(self.procedure.sample, start=1)])
# logger.debug(f"Plate map: {self.plate_map}")
# logger.debug(f"Created dummy: {self.created_procedure}")
self.app = get_application_from_parent(parent)
@@ -64,6 +67,7 @@ class ProcedureCreation(QDialog):
self.buttonBox.rejected.connect(self.reject)
self.layout.addWidget(self.buttonBox, 11, 1, 1, 1)
def set_html(self):
from .equipment_usage_2 import EquipmentUsage
logger.debug(f"Edit: {self.edit}")
@@ -151,7 +155,7 @@ class ProcedureCreation(QDialog):
self.set_html()
@pyqtSlot(list)
def rearrange_plate(self, sample_list: list):
def rearrange_plate(self, sample_list: List[dict]):
self.procedure.update_samples(sample_list=sample_list)
@pyqtSlot(str)

View File

@@ -55,7 +55,7 @@ class SubmissionDetails(QDialog):
self.webview.page().setWebChannel(self.channel)
def object_details(self, object):
details = object.clean_details_dict(object.details_dict())
details = object.clean_details_for_render(object.details_dict())
template = object.details_template
template_path = Path(template.environment.loader.__getattribute__("searchpath")[0])
with open(template_path.joinpath("css", "styles.css"), "r") as f:

View File

@@ -49,8 +49,8 @@
{% if procedure['results'] %}
<button type="button"><h3><u>Results:</u></h3></button>
{% for result in procedure['results'] %}
<p>{% for k, v in result['result'].items() %}
<b>{{ key | replace("_", " ") | title | replace("Rsl", "RSL") }}:</b> {{ value }}<br>
<p>{% for key, value in result['result'].items() %}
<b>{{ key | replace("_", " ") | title | replace("Rsl", "RSL") }}:</b> {{ value['value'] }}<br>
{% endfor %}</p>
{% endfor %}
{% endif %}