New docx templates

This commit is contained in:
lwark
2024-06-20 13:31:36 -05:00
parent 337112a27d
commit 384bd567ae
12 changed files with 175 additions and 80 deletions

View File

@@ -4,6 +4,7 @@ Models for the main submission and sample types.
from __future__ import annotations
import sys
from copy import deepcopy
from getpass import getuser
import logging, uuid, tempfile, re, yaml, base64
from zipfile import ZipFile
@@ -728,6 +729,11 @@ class BasicSubmission(BaseClass):
logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} autofill")
return input_excel
@classmethod
def custom_docx_writer(cls, input_dict):
return input_dict
@classmethod
def enforce_name(cls, instr: str, data: dict | None = {}) -> str:
"""
@@ -1439,7 +1445,7 @@ class Wastewater(BasicSubmission):
dummy_samples = []
for item in input_dict['samples']:
# logger.debug(f"Sample dict: {item}")
thing = item
thing = deepcopy(item)
try:
thing['row'] = thing['source_row']
thing['column'] = thing['source_column']
@@ -1486,6 +1492,28 @@ class Wastewater(BasicSubmission):
self.update_subsampassoc(sample=sample, input_dict=sample_dict)
# self.report.add_result(Result(msg=f"We added PCR info to {sub.rsl_plate_num}.", status='Information'))
@classmethod
def custom_docx_writer(cls, input_dict):
from backend.excel.writer import DocxWriter
input_dict = super().custom_docx_writer(input_dict)
well_24 = []
samples_copy = deepcopy(input_dict['samples'])
for sample in sorted(samples_copy, key=itemgetter('column', 'row')):
# for sample in input_dict['samples']:
try:
row = sample['source_row']
except KeyError:
continue
try:
column = sample['source_column']
except KeyError:
continue
copy = dict(submitter_id=sample['submitter_id'], row=row, column=column)
well_24.append(copy)
input_dict['origin_plate'] = DocxWriter.create_plate_map(sample_list=well_24, rows=4, columns=6)
return input_dict
class WastewaterArtic(BasicSubmission):
"""

View File

@@ -1,10 +1,11 @@
import logging
from copy import copy
from operator import itemgetter
from pathlib import Path
# from pathlib import Path
from pprint import pformat
from typing import List
from collections import OrderedDict
from jinja2 import TemplateNotFound
from openpyxl import load_workbook, Workbook
from backend.db.models import SubmissionType, KitType, BasicSubmission
@@ -13,6 +14,7 @@ from io import BytesIO
from collections import OrderedDict
from tools import jinja_template_loading
from docxtpl import DocxTemplate
from docx import Document
logger = logging.getLogger(f"submissions.{__name__}")
@@ -460,14 +462,65 @@ class TipWriter(object):
class DocxWriter(object):
def __init__(self, base_dict: dict):
self.sub_obj = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=base_dict['submission_type'])
env = jinja_template_loading()
temp_name = f"{base_dict['submission_type'].replace(' ', '').lower()}_document.docx"
path = Path(env.loader.__getattribute__("searchpath")[0]).joinpath(temp_name)
template = DocxTemplate(path)
temp_name = f"{base_dict['submission_type'].replace(' ', '').lower()}_subdocument.docx"
path = Path(env.loader.__getattribute__("searchpath")[0])
main_template = path.joinpath("basicsubmission_document.docx")
subdocument = path.joinpath(temp_name)
if subdocument.exists():
main_template = self.create_merged_template(main_template, subdocument)
self.template = DocxTemplate(main_template)
base_dict['platemap'] = self.create_plate_map(base_dict['samples'], rows=8, columns=12)
# logger.debug(pformat(base_dict['plate_map']))
try:
template.render(base_dict)
except FileNotFoundError:
template = DocxTemplate(
Path(env.loader.__getattribute__("searchpath")[0]).joinpath("basicsubmission_document.docx"))
template.render({"sub": base_dict})
template.save("test.docx")
base_dict['excluded'] += ["platemap"]
except KeyError:
base_dict['excluded'] = ["platemap"]
base_dict = self.sub_obj.custom_docx_writer(base_dict)
# logger.debug(f"Base dict: {pformat(base_dict)}")
self.template.render({"sub": base_dict})
@classmethod
def create_plate_map(self, sample_list: List[dict], rows: int = 0, columns: int = 0) -> List[list]:
sample_list = sorted(sample_list, key=itemgetter('column', 'row'))
if rows == 0:
rows = max([sample['row'] for sample in sample_list])
if columns == 0:
columns = max([sample['column'] for sample in sample_list])
output = []
for row in range(0, rows):
contents = [''] * columns
for column in range(0, columns):
try:
ooi = [item for item in sample_list if item['row']==row+1 and item['column']==column+1][0]
except IndexError:
continue
contents[column] = ooi['submitter_id']
# contents = [sample['submitter_id'] for sample in sample_list if sample['row'] == row + 1]
# contents = [f"{sample['row']},{sample['column']}" for sample in sample_list if sample['row'] == row + 1]
if len(contents) < columns:
contents += [''] * (columns - len(contents))
if not contents:
contents = [''] * columns
output.append(contents)
return output
def create_merged_template(self, *args):
merged_document = Document()
output = BytesIO()
for index, file in enumerate(args):
sub_doc = Document(file)
# Don't add a page break if you've reached the last file.
# if index < len(args) - 1:
# sub_doc.add_page_break()
for element in sub_doc.element.body:
merged_document.element.body.append(element)
merged_document.save(output)
return output
def save(self, filename: Path | str):
if isinstance(filename, str):
filename = Path(filename)
self.template.save(filename)

View File

@@ -327,17 +327,17 @@ class PydEquipment(BaseModel, extra='ignore'):
process = Process.query(name=self.processes[0])
if process is None:
logger.error(f"Found unknown process: {process}.")
from frontend.widgets.pop_ups import QuestionAsker
dlg = QuestionAsker(title="Add Process?",
message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?")
if dlg.exec():
kit = submission.extraction_kit
submission_type = submission.submission_type
process = Process(name=self.processes[0])
process.kit_types.append(kit)
process.submission_types.append(submission_type)
process.equipment.append(equipment)
process.save()
# from frontend.widgets.pop_ups import QuestionAsker
# dlg = QuestionAsker(title="Add Process?",
# message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?")
# if dlg.exec():
# kit = submission.extraction_kit
# submission_type = submission.submission_type
# process = Process(name=self.processes[0])
# process.kit_types.append(kit)
# process.submission_types.append(submission_type)
# process.equipment.append(equipment)
# process.save()
assoc.process = process
assoc.role = self.role
else:
@@ -727,7 +727,7 @@ class PydSubmission(BaseModel, extra='allow'):
if equip is None:
continue
equip, association = equip.toSQL(submission=instance)
if association is not None:
if association is not None and association not in instance.submission_equipment_associations:
# association.save()
# logger.debug(
# f"Equipment association SQL object to be added to submission: {association.__dict__}")
@@ -738,7 +738,7 @@ class PydSubmission(BaseModel, extra='allow'):
continue
logger.debug(f"Converting tips: {tips} to sql.")
association = tips.to_sql(submission=instance)
if association is not None:
if association is not None and association not in instance.submission_tips_associations:
# association.save()
instance.submission_tips_associations.append(association)
case item if item in instance.jsons():