Various bug fixes and streamlining.

This commit is contained in:
lwark
2024-10-03 15:09:41 -05:00
parent acab9d0f4c
commit c5470b9062
22 changed files with 222 additions and 380 deletions

View File

@@ -27,7 +27,7 @@ from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \
report_result
from datetime import datetime, date
from typing import List, Any, Tuple, Literal
from typing import List, Any, Tuple, Literal, Generator
from dateutil.parser import parse
from pathlib import Path
from jinja2.exceptions import TemplateNotFound
@@ -592,8 +592,8 @@ class BasicSubmission(BaseClass):
case "ctx" | "csv" | "filepath" | "equipment":
return
case item if item in self.jsons():
match value:
case dict():
match key:
case "custom":
existing = value
case _:
# logger.debug(f"Setting JSON attribute.")
@@ -611,10 +611,7 @@ class BasicSubmission(BaseClass):
existing += value
else:
if value is not None:
if key == "custom":
existing = value
else:
existing.append(value)
existing.append(value)
self.__setattr__(key, existing)
flag_modified(self, key)
return
@@ -889,19 +886,6 @@ class BasicSubmission(BaseClass):
ws.cell(row=item['row'], column=item['column'], value=item['value'])
return input_excel
@classmethod
def custom_docx_writer(cls, input_dict: dict, tpl_obj=None):
"""
Adds custom fields to docx template writer for exported details.
Args:
input_dict (dict): Incoming default dictionary.
tpl_obj (_type_, optional): Template object. Defaults to None.
Returns:
dict: Dictionary with information added.
"""
return input_dict
@classmethod
def enforce_name(cls, instr: str, data: dict | None = {}) -> str:
@@ -962,7 +946,7 @@ class BasicSubmission(BaseClass):
return re.sub(rf"{data['abbreviation']}(\d)", rf"{data['abbreviation']}-\1", outstr)
@classmethod
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list:
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]:
"""
Perform parsing of pcr info. Since most of our PC outputs are the same format, this should work for most.
@@ -977,7 +961,7 @@ class BasicSubmission(BaseClass):
pcr_sample_map = cls.get_submission_type().sample_map['pcr_samples']
# logger.debug(f'sample map: {pcr_sample_map}')
main_sheet = xl[pcr_sample_map['main_sheet']]
samples = []
# samples = []
fields = {k: v for k, v in pcr_sample_map.items() if k not in ['main_sheet', 'start_row']}
for row in main_sheet.iter_rows(min_row=pcr_sample_map['start_row']):
idx = row[0].row
@@ -985,8 +969,9 @@ class BasicSubmission(BaseClass):
for k, v in fields.items():
sheet = xl[v['sheet']]
sample[k] = sheet.cell(row=idx, column=v['column']).value
samples.append(sample)
return samples
yield sample
# samples.append(sample)
# return samples
@classmethod
def filename_template(cls) -> str:
@@ -1533,17 +1518,17 @@ class Wastewater(BasicSubmission):
return input_dict
@classmethod
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> List[dict]:
def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> Generator[dict, None, None]:
"""
Parse specific to wastewater samples.
"""
samples = super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num)
samples = [item for item in super().parse_pcr(xl=xl, rsl_plate_num=rsl_plate_num)]
# logger.debug(f'Samples from parent pcr parser: {pformat(samples)}')
output = []
for sample in samples:
# NOTE: remove '-{target}' from controls
sample['sample'] = re.sub('-N\\d$', '', sample['sample'])
# NOTE: if sample is already in output skip
# # NOTE: if sample is already in output skip
if sample['sample'] in [item['sample'] for item in output]:
logger.warning(f"Already have {sample['sample']}")
continue
@@ -1564,8 +1549,10 @@ class Wastewater(BasicSubmission):
del sample['assessment']
except KeyError:
pass
# yield sample
output.append(sample)
return output
for sample in output:
yield sample
@classmethod
def enforce_name(cls, instr: str, data: dict | None = {}) -> str:
@@ -1677,49 +1664,18 @@ class Wastewater(BasicSubmission):
return report
parser = PCRParser(filepath=fname)
self.set_attribute("pcr_info", parser.pcr)
pcr_samples = [sample for sample in parser.samples]
self.save(original=False)
# logger.debug(f"Got {len(parser.samples)} samples to update!")
# logger.debug(f"Parser samples: {parser.samples}")
for sample in self.samples:
# logger.debug(f"Running update on: {sample}")
try:
sample_dict = next(item for item in parser.samples if item['sample'] == sample.rsl_number)
sample_dict = next(item for item in pcr_samples if item['sample'] == sample.rsl_number)
except StopIteration:
continue
self.update_subsampassoc(sample=sample, input_dict=sample_dict)
@classmethod
def custom_docx_writer(cls, input_dict: dict, tpl_obj=None) -> dict:
"""
Adds custom fields to docx template writer for exported details. Extends parent.
Args:
input_dict (dict): Incoming default dictionary.
tpl_obj (_type_, optional): Template object. Defaults to None.
Returns:
dict: Dictionary with information added.
"""
from backend.excel.writer import DocxWriter
input_dict = super().custom_docx_writer(input_dict)
well_24 = []
input_dict['samples'] = [item for item in input_dict['samples']]
samples_copy = deepcopy(input_dict['samples'])
for sample in sorted(samples_copy, key=itemgetter('column', 'row')):
try:
row = sample['source_row']
except KeyError:
continue
try:
column = sample['source_column']
except KeyError:
continue
copy = dict(submitter_id=sample['submitter_id'], row=row, column=column)
well_24.append(copy)
input_dict['origin_plate'] = [item for item in
DocxWriter.create_plate_map(sample_list=well_24, rows=4, columns=6)]
return input_dict
class WastewaterArtic(BasicSubmission):
"""
@@ -2038,11 +1994,17 @@ class WastewaterArtic(BasicSubmission):
"""
input_dict = super().custom_validation(pyd)
# logger.debug(f"Incoming input_dict: {pformat(input_dict)}")
exclude_plates = [None, "", "none", "na"]
pyd.source_plates = [plate for plate in pyd.source_plates if plate['plate'].lower() not in exclude_plates]
for sample in pyd.samples:
# logger.debug(f"Sample: {sample}")
if re.search(r"^NTC", sample.submitter_id):
sample.submitter_id = f"{sample.submitter_id}-WWG-{pyd.rsl_plate_num}"
# input_dict['csv'] = xl["hitpicks_csv_to_export"]
if isinstance(pyd.rsl_plate_num, dict):
placeholder = pyd.rsl_plate_num['value']
else:
placeholder = pyd.rsl_plate_num
sample.submitter_id = f"{sample.submitter_id}-WWG-{placeholder}"
# logger.debug(f"sample id: {sample.submitter_id}")
return input_dict
@classmethod
@@ -2075,6 +2037,7 @@ class WastewaterArtic(BasicSubmission):
for iii, plate in enumerate(info['source_plates']['value']):
# logger.debug(f"Plate: {plate}")
row = start_row + iii
logger.debug(f"Writing {plate} to row {iii}")
try:
worksheet.cell(row=row, column=source_plates_section['plate_column'], value=plate['plate'])
except TypeError:
@@ -2209,30 +2172,6 @@ class WastewaterArtic(BasicSubmission):
zipf.write(img_path, self.gel_image)
self.save()
@classmethod
def custom_docx_writer(cls, input_dict: dict, tpl_obj=None) -> dict:
"""
Adds custom fields to docx template writer for exported details.
Args:
input_dict (dict): Incoming default dictionary/
tpl_obj (_type_, optional): Template object. Defaults to None.
Returns:
dict: Dictionary with information added.
"""
input_dict = super().custom_docx_writer(input_dict)
# NOTE: if there's a gel image, extract it.
if check_key_or_attr(key='gel_image_path', interest=input_dict, check_none=True):
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
img = zipped.read(input_dict['gel_image_path'])
with tempfile.TemporaryFile(mode="wb", suffix=".jpg", delete=False) as tmp:
tmp.write(img)
# logger.debug(f"Tempfile: {tmp.name}")
img = InlineImage(tpl_obj, image_descriptor=tmp.name, width=Inches(5.5)) #, width=5.5)#, height=400)
input_dict['gel_image'] = img
return input_dict
# Sample Classes
@@ -2493,6 +2432,8 @@ class BasicSample(BaseClass):
model = cls.find_polymorphic_subclass(polymorphic_identity=sample_type)
case BasicSample():
model = sample_type
case None:
model = cls
case _:
model = cls.find_polymorphic_subclass(attrs=kwargs)
# logger.debug(f"Length of kwargs: {len(kwargs)}")
@@ -2514,7 +2455,7 @@ class BasicSample(BaseClass):
raise AttributeError(f"Delete not implemented for {self.__class__}")
@classmethod
def get_searchables(cls):
def get_searchables(cls) -> List[dict]:
"""
Delivers a list of fields that can be used in fuzzy search.