diff --git a/TODO.md b/TODO.md index 886c6b3..01b83bb 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,5 @@ -- [ ] Revamp frontend.widgets.controls_chart to include visualizations? +- [ ] Upgrade to generators when returning lists. +- [x] Revamp frontend.widgets.controls_chart to include visualizations? - [x] Convert Parsers to using openpyxl. - The hardest part of this is going to be the sample parsing. I'm onto using the cell formulas in the plate map to suss out the location in the lookup table, but it could get a little recursive up in here. - [ ] Create a default info return function. diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index 85fc040..6595ef9 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -84,7 +84,7 @@ class ControlType(BaseClass): Returns: List[ControlType]: Control types that have targets """ - return [item for item in cls.query() if item.targets]# != []] + return [item for item in cls.query() if item.targets] @classmethod def build_positive_regex(cls) -> Pattern: @@ -141,7 +141,9 @@ class Control(BaseClass): # logger.debug("calculating kraken count total to use in percentage") kraken_cnt_total = sum([kraken[item]['kraken_count'] for item in kraken]) # logger.debug("Creating new kraken.") - new_kraken = [dict(name=item, kraken_count=kraken[item]['kraken_count'], kraken_percent="{0:.0%}".format(kraken[item]['kraken_count'] / kraken_cnt_total)) for item in kraken] + new_kraken = [dict(name=item, kraken_count=kraken[item]['kraken_count'], + kraken_percent="{0:.0%}".format(kraken[item]['kraken_count'] / kraken_cnt_total)) for item in + kraken] new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True) # logger.debug("setting targets") if not self.controltype.targets: diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index f14efcc..ff58619 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -8,7 +8,7 @@ from sqlalchemy.ext.associationproxy import association_proxy from datetime import date import logging, re from tools import check_authorization, setup_lookup, Report, Result -from typing import List, Literal +from typing import List, Literal, Generator from pandas import ExcelFile from pathlib import Path from . import Base, BaseClass, Organization @@ -168,9 +168,9 @@ class KitType(BaseClass): return [item.reagent_role for item in relevant_associations] # TODO: Move to BasicSubmission? - def construct_xl_map_for_use(self, submission_type: str | SubmissionType) -> dict: + def construct_xl_map_for_use(self, submission_type: str | SubmissionType) -> Generator[str, str]: """ - Creates map of locations in excel workbook for a SubmissionType + Creates map of locations in Excel workbook for a SubmissionType Args: submission_type (str | SubmissionType): Submissiontype.name @@ -178,7 +178,7 @@ class KitType(BaseClass): Returns: dict: Dictionary containing information locations. """ - info_map = {} + # info_map = {} # NOTE: Account for submission_type variable type. match submission_type: case str(): @@ -193,10 +193,10 @@ class KitType(BaseClass): # logger.debug("Get all KitTypeReagentTypeAssociation for SubmissionType") for assoc in assocs: try: - info_map[assoc.reagent_role.name] = assoc.uses + yield assoc.reagent_role.name, assoc.uses except TypeError: continue - return info_map + # return info_map @classmethod @setup_lookup @@ -409,6 +409,7 @@ class Reagent(BaseClass): rtype = reagent_role.name.replace("_", " ") except AttributeError: rtype = "Unknown" + # logger.debug(f"Role for {self.name}: {rtype}") # NOTE: Calculate expiry with EOL from ReagentType try: place_holder = self.expiry + reagent_role.eol_ext @@ -611,7 +612,8 @@ class SubmissionType(BaseClass): ) #: Association of equipmentroles equipment = association_proxy("submissiontype_equipmentrole_associations", "equipment_role", - creator=lambda eq: SubmissionTypeEquipmentRoleAssociation(equipment_role=eq)) #: Proxy of equipmentrole associations + creator=lambda eq: SubmissionTypeEquipmentRoleAssociation( + equipment_role=eq)) #: Proxy of equipmentrole associations submissiontype_kit_rt_associations = relationship( "KitTypeReagentRoleAssociation", @@ -665,7 +667,7 @@ class SubmissionType(BaseClass): def construct_info_map(self, mode: Literal['read', 'write']) -> dict: """ - Make of map of where all fields are located in excel sheet + Make of map of where all fields are located in Excel sheet Args: mode (Literal["read", "write"]): Which mode to get locations for @@ -673,15 +675,16 @@ class SubmissionType(BaseClass): Returns: dict: Map of locations """ - info = {k:v for k,v in self.info_map.items() if k != "custom"} + info = {k: v for k, v in self.info_map.items() if k != "custom"} logger.debug(f"Info map: {info}") - output = {} match mode: case "read": output = {k: v[mode] for k, v in info.items() if v[mode]} case "write": output = {k: v[mode] + v['read'] for k, v in info.items() if v[mode] or v['read']} output = {k: v for k, v in output.items() if all([isinstance(item, dict) for item in v])} + case _: + output = {} output['custom'] = self.info_map['custom'] return output @@ -694,36 +697,38 @@ class SubmissionType(BaseClass): """ return self.sample_map - def construct_equipment_map(self) -> dict: + def construct_equipment_map(self) -> Generator[str, dict]: """ Constructs map of equipment to excel cells. Returns: dict: Map equipment locations in excel sheet """ - output = {} + # output = {} # logger.debug("Iterating through equipment roles") for item in self.submissiontype_equipmentrole_associations: emap = item.uses if emap is None: emap = {} - output[item.equipment_role.name] = emap - return output + # output[item.equipment_role.name] = emap + yield item.equipment_role.name, emap + # return output - def construct_tips_map(self) -> dict: + def construct_tips_map(self) -> Generator[str, dict]: """ Constructs map of tips to excel cells. Returns: dict: Tip locations in the excel sheet. """ - output = {} + # output = {} for item in self.submissiontype_tiprole_associations: tmap = item.uses if tmap is None: tmap = {} - output[item.tip_role.name] = tmap - return output + # output[item.tip_role.name] = tmap + yield item.tip_role.name, tmap + # return output def get_equipment(self, extraction_kit: str | KitType | None = None) -> List['PydEquipmentRole']: """ @@ -1280,15 +1285,16 @@ class EquipmentRole(BaseClass): Returns: dict: This EquipmentRole dict """ - output = {} - for key, value in self.__dict__.items(): - match key: - case "processes": - pass - case _: - value = value - output[key] = value - return output + # output = {} + return {key: value for key, value in self.__dict__.items() if key != "processes"} + # match key: + # case "processes": + # pass + # case _: + # value = value + # yield key, value + # # output[key] = value + # return output def to_pydantic(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None) -> "PydEquipmentRole": @@ -1668,7 +1674,6 @@ class SubmissionTipsAssociation(BaseClass): back_populates="tips_submission_associations") #: associated equipment role_name = Column(String(32), primary_key=True) #, ForeignKey("_tiprole.name")) - def to_sub_dict(self) -> dict: """ This item as a dictionary diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 86031b6..31577e0 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -25,7 +25,7 @@ from openpyxl.worksheet.worksheet import Worksheet from openpyxl.drawing.image import Image as OpenpyxlImage from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report from datetime import datetime, date -from typing import List, Any, Tuple, Literal +from typing import List, Any, Tuple, Literal, Generator from dateutil.parser import parse from pathlib import Path from jinja2.exceptions import TemplateNotFound @@ -289,7 +289,7 @@ class BasicSubmission(BaseClass): try: reagents = [item.to_sub_dict(extraction_kit=self.extraction_kit) for item in self.submission_reagent_associations] - for k in self.extraction_kit.construct_xl_map_for_use(self.submission_type): + for k, v in self.extraction_kit.construct_xl_map_for_use(self.submission_type): if k == 'info': continue if not any([item['role'] == k for item in reagents]): @@ -841,6 +841,7 @@ class BasicSubmission(BaseClass): for k, v in fields.items(): sheet = xl[v['sheet']] sample[k] = sheet.cell(row=idx, column=v['column']).value + # yield sample samples.append(sample) return samples @@ -1381,7 +1382,7 @@ class Wastewater(BasicSubmission): return input_dict @classmethod - def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> list: + def parse_pcr(cls, xl: Workbook, rsl_plate_num: str) -> List[dict]: """ Parse specific to wastewater samples. """ @@ -1393,6 +1394,7 @@ class Wastewater(BasicSubmission): sample['sample'] = re.sub('-N\\d$', '', sample['sample']) # NOTE: if sample is already in output skip if sample['sample'] in [item['sample'] for item in output]: + logger.warning(f"Already have {sample['sample']}") continue # NOTE: Set ct values sample[f"ct_{sample['target'].lower()}"] = sample['ct'] if isinstance(sample['ct'], float) else 0.0 diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 643bdd4..70ed27f 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -84,8 +84,9 @@ class SheetParser(object): if extraction_kit is None: extraction_kit = self.sub['extraction_kit'] # logger.debug(f"Parsing reagents for {extraction_kit}") - self.sub['reagents'] = ReagentParser(xl=self.xl, submission_type=self.submission_type, - extraction_kit=extraction_kit).parse_reagents() + parser = ReagentParser(xl=self.xl, submission_type=self.submission_type, + extraction_kit=extraction_kit) + self.sub['reagents'] = [item for item in parser.parse_reagents()] def parse_samples(self): """ @@ -303,21 +304,21 @@ class ReagentParser(object): if isinstance(submission_type, dict): submission_type = submission_type['value'] - reagent_map = self.kit_object.construct_xl_map_for_use(submission_type) + reagent_map = {k: v for k, v in self.kit_object.construct_xl_map_for_use(submission_type)} try: del reagent_map['info'] except KeyError: pass return reagent_map - def parse_reagents(self) -> List[dict]: + def parse_reagents(self) -> Generator[dict, None, None]: """ - Extracts reagent information from the excel form. + Extracts reagent information from the Excel form. Returns: List[PydReagent]: List of parsed reagents. """ - listo = [] + # listo = [] for sheet in self.xl.sheetnames: ws = self.xl[sheet] relevant = {k.strip(): v for k, v in self.map.items() if sheet in self.map[k]['sheet']} @@ -337,9 +338,8 @@ class ReagentParser(object): else: comment = "" except (KeyError, IndexError): - listo.append( - dict(role=item.strip(), lot=None, expiry=None, name=None, comment="", missing=True)) - continue + yield dict(role=item.strip(), lot=None, expiry=None, name=None, comment="", missing=True) + # continue # NOTE: If the cell is blank tell the PydReagent if check_not_nan(lot): missing = False @@ -355,9 +355,9 @@ class ReagentParser(object): logger.warning(f"name is not a string.") check = True if check: - listo.append(dict(role=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, - missing=missing)) - return listo + yield dict(role=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, + missing=missing) + # return listo class SampleParser(object): @@ -556,14 +556,14 @@ class EquipmentParser(object): self.xl = xl self.map = self.fetch_equipment_map() - def fetch_equipment_map(self) -> List[dict]: + def fetch_equipment_map(self) -> dict: """ Gets the map of equipment locations in the submission type's spreadsheet Returns: List[dict]: List of locations """ - return self.submission_type.construct_equipment_map() + return {k: v for k, v in self.submission_type.construct_equipment_map()} def get_asset_number(self, input: str) -> str: """ @@ -642,14 +642,14 @@ class TipParser(object): self.xl = xl self.map = self.fetch_tip_map() - def fetch_tip_map(self) -> List[dict]: + def fetch_tip_map(self) -> dict: """ Gets the map of equipment locations in the submission type's spreadsheet Returns: List[dict]: List of locations """ - return self.submission_type.construct_tips_map() + return {k:v for k,v in self.submission_type.construct_tips_map()} def parse_tips(self) -> List[dict]: """ diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index f996164..b5dd3d1 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -20,22 +20,20 @@ env = jinja_template_loading() class ReportMaker(object): def __init__(self, start_date: date, end_date: date): - subs = BasicSubmission.query(start_date=start_date, end_date=end_date) - records = [item.to_dict(report=True) for item in subs] - self.detailed_df, self.summary_df = self.make_report_xlsx(records=records) - self.html = self.make_report_html(df=self.summary_df, start_date=start_date, end_date=end_date) + self.start_date = start_date + self.end_date = end_date + self.subs = BasicSubmission.query(start_date=start_date, end_date=end_date) + self.detailed_df, self.summary_df = self.make_report_xlsx() + self.html = self.make_report_html(df=self.summary_df) - def make_report_xlsx(self, records: list[dict]) -> Tuple[DataFrame, DataFrame]: + def make_report_xlsx(self) -> Tuple[DataFrame, DataFrame]: """ create the dataframe for a report - Args: - records (list[dict]): list of dictionaries created from submissions - Returns: DataFrame: output dataframe """ - df = DataFrame.from_records(records) + df = DataFrame.from_records([item.to_dict(report=True) for item in self.subs]) # NOTE: put submissions with the same lab together df = df.sort_values("submitting_lab") # NOTE: aggregate cost and sample count columns @@ -47,7 +45,7 @@ class ReportMaker(object): df = df.sort_values(['submitting_lab', "submitted_date"]) return df, df2 - def make_report_html(self, df: DataFrame, start_date: date, end_date: date) -> str: + def make_report_html(self, df: DataFrame) -> str: """ generates html from the report dataframe @@ -84,7 +82,7 @@ class ReportMaker(object): output.append(adder) old_lab = lab # logger.debug(output) - dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output} + dicto = {'start_date': self.start_date, 'end_date': self.end_date, 'labs': output} temp = env.get_template('summary_report.html') html = temp.render(input=dicto) return html diff --git a/src/submissions/backend/excel/writer.py b/src/submissions/backend/excel/writer.py index 31022d2..727a759 100644 --- a/src/submissions/backend/excel/writer.py +++ b/src/submissions/backend/excel/writer.py @@ -1,13 +1,13 @@ -''' -contains writer objects for pushing values to submission sheet templates. -''' +""" +contains writer objects for pushing values to submission sheet templates. +""" import logging from copy import copy from operator import itemgetter from pathlib import Path # from pathlib import Path from pprint import pformat -from typing import List +from typing import List, Generator from openpyxl import load_workbook, Workbook from backend.db.models import SubmissionType, KitType, BasicSubmission from backend.validators.pydant import PydSubmission @@ -30,7 +30,7 @@ class SheetWriter(object): Args: submission (PydSubmission): Object containing submission information. missing_only (bool, optional): Whether to only fill in missing values. Defaults to False. - """ + """ self.sub = OrderedDict(submission.improved_dict()) for k, v in self.sub.items(): match k: @@ -47,7 +47,6 @@ class SheetWriter(object): else: self.sub[k] = v # logger.debug(f"\n\nWriting to {submission.filepath.__str__()}\n\n") - if self.filepath.stem.startswith("tmp"): template = self.submission_type.template_file workbook = load_workbook(BytesIO(template)) @@ -124,7 +123,7 @@ class InfoWriter(object): submission_type (SubmissionType | str): Type of submission expected (Wastewater, Bacterial Culture, etc.) info_dict (dict): Dictionary of information to write. sub_object (BasicSubmission | None, optional): Submission object containing methods. Defaults to None. - """ + """ logger.debug(f"Info_dict coming into InfoWriter: {pformat(info_dict)}") if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) @@ -148,7 +147,7 @@ class InfoWriter(object): Returns: dict: merged dictionary """ - output = {} + # output = {} for k, v in info_dict.items(): if v is None: continue @@ -162,9 +161,10 @@ class InfoWriter(object): pass dicto['value'] = v if len(dicto) > 0: - output[k] = dicto + # output[k] = dicto + yield k, dicto # logger.debug(f"Reconciled info: {pformat(output)}") - return output + # return output def write_info(self) -> Workbook: """ @@ -173,7 +173,7 @@ class InfoWriter(object): Returns: Workbook: workbook with info written. """ - for k, v in self.info.items(): + for k, v in self.info: # NOTE: merge all comments to fit in single cell. if k == "comment" and isinstance(v['value'], list): json_join = [item['text'] for item in v['value'] if 'text' in item.keys()] @@ -203,16 +203,17 @@ class ReagentWriter(object): submission_type (SubmissionType | str): Type of submission expected (Wastewater, Bacterial Culture, etc.) extraction_kit (KitType | str): Extraction kit used. reagent_list (list): List of reagent dicts to be written to excel. - """ + """ self.xl = xl if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) if isinstance(extraction_kit, str): kit_type = KitType.query(name=extraction_kit) - reagent_map = kit_type.construct_xl_map_for_use(submission_type) + reagent_map = {k: v for k, v in kit_type.construct_xl_map_for_use(submission_type)} + # self.reagents = {k: v for k, v in self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map)} self.reagents = self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map) - def reconcile_map(self, reagent_list: List[dict], reagent_map: dict) -> List[dict]: + def reconcile_map(self, reagent_list: List[dict], reagent_map: dict) -> Generator[dict, None, None]: """ Merge reagents with their locations @@ -223,7 +224,7 @@ class ReagentWriter(object): Returns: List[dict]: merged dictionary """ - output = [] + # output = [] for reagent in reagent_list: try: mp_info = reagent_map[reagent['role']] @@ -238,8 +239,9 @@ class ReagentWriter(object): dicto = v placeholder[k] = dicto placeholder['sheet'] = mp_info['sheet'] - output.append(placeholder) - return output + # output.append(placeholder) + yield placeholder + # return output def write_reagents(self) -> Workbook: """ @@ -263,21 +265,24 @@ class SampleWriter(object): """ object to write sample data into excel file """ + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, sample_list: list): """ Args: xl (Workbook): Openpyxl workbook from submitted excel file. submission_type (SubmissionType | str): Type of submission expected (Wastewater, Bacterial Culture, etc.) sample_list (list): List of sample dictionaries to be written to excel file. - """ + """ if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) self.submission_type = submission_type self.xl = xl self.sample_map = submission_type.construct_sample_map()['lookup_table'] - self.samples = self.reconcile_map(sample_list) + # self.samples = self.reconcile_map(sample_list) + samples = [item for item in self.reconcile_map(sample_list)] + self.samples = sorted(samples, key=lambda k: k['submission_rank']) - def reconcile_map(self, sample_list: list) -> List[dict]: + def reconcile_map(self, sample_list: list) -> Generator[dict, None, None]: """ Merge sample info with locations @@ -287,7 +292,7 @@ class SampleWriter(object): Returns: List[dict]: List of merged dictionaries """ - output = [] + # output = [] multiples = ['row', 'column', 'assoc_id', 'submission_rank'] for sample in sample_list: # logger.debug(f"Writing sample: {sample}") @@ -297,8 +302,8 @@ class SampleWriter(object): if k in multiples: continue new[k] = v - output.append(new) - return sorted(output, key=lambda k: k['submission_rank']) + yield new + # return sorted(output, key=lambda k: k['submission_rank']) def write_samples(self) -> Workbook: """ @@ -331,15 +336,15 @@ class EquipmentWriter(object): xl (Workbook): Openpyxl workbook from submitted excel file. submission_type (SubmissionType | str): Type of submission expected (Wastewater, Bacterial Culture, etc.) equipment_list (list): List of equipment dictionaries to write to excel file. - """ + """ if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) self.submission_type = submission_type self.xl = xl - equipment_map = self.submission_type.construct_equipment_map() + equipment_map = {k: v for k, v in self.submission_type.construct_equipment_map()} self.equipment = self.reconcile_map(equipment_list=equipment_list, equipment_map=equipment_map) - def reconcile_map(self, equipment_list: list, equipment_map: dict) -> List[dict]: + def reconcile_map(self, equipment_list: list, equipment_map: dict) -> Generator[dict, None, None]: """ Merges equipment with location data @@ -350,9 +355,9 @@ class EquipmentWriter(object): Returns: List[dict]: List of merged dictionaries """ - output = [] + # output = [] if equipment_list is None: - return output + return for ii, equipment in enumerate(equipment_list, start=1): mp_info = equipment_map[equipment['role']] # logger.debug(f"{equipment['role']} map: {mp_info}") @@ -376,8 +381,9 @@ class EquipmentWriter(object): except KeyError: placeholder['sheet'] = "Equipment" # logger.debug(f"Final output of {equipment['role']} : {placeholder}") - output.append(placeholder) - return output + yield placeholder + # output.append(placeholder) + # return output def write_equipment(self) -> Workbook: """ @@ -419,15 +425,15 @@ class TipWriter(object): xl (Workbook): Openpyxl workbook from submitted excel file. submission_type (SubmissionType | str): Type of submission expected (Wastewater, Bacterial Culture, etc.) tips_list (list): List of tip dictionaries to write to the excel file. - """ + """ if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) self.submission_type = submission_type self.xl = xl - tips_map = self.submission_type.construct_tips_map() + tips_map = {k: v for k, v in self.submission_type.construct_tips_map()} self.tips = self.reconcile_map(tips_list=tips_list, tips_map=tips_map) - def reconcile_map(self, tips_list: List[dict], tips_map: dict) -> List[dict]: + def reconcile_map(self, tips_list: List[dict], tips_map: dict) -> Generator[dict, None, None]: """ Merges tips with location data @@ -438,9 +444,9 @@ class TipWriter(object): Returns: List[dict]: List of merged dictionaries """ - output = [] + # output = [] if tips_list is None: - return output + return for ii, tips in enumerate(tips_list, start=1): mp_info = tips_map[tips['role']] # logger.debug(f"{tips['role']} map: {mp_info}") @@ -462,8 +468,9 @@ class TipWriter(object): except KeyError: placeholder['sheet'] = "Tips" # logger.debug(f"Final output of {tips['role']} : {placeholder}") - output.append(placeholder) - return output + yield placeholder + # output.append(placeholder) + # return output def write_tips(self) -> Workbook: """ @@ -497,13 +504,13 @@ class TipWriter(object): class DocxWriter(object): """ Object to render - """ + """ def __init__(self, base_dict: dict): """ Args: base_dict (dict): dictionary of info to be written to template. - """ + """ self.sub_obj = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=base_dict['submission_type']) env = jinja_template_loading() temp_name = f"{base_dict['submission_type'].replace(' ', '').lower()}_subdocument.docx" @@ -530,12 +537,12 @@ class DocxWriter(object): rows = max([sample['row'] for sample in sample_list]) if columns == 0: columns = max([sample['column'] for sample in sample_list]) - output = [] + # output = [] for row in range(0, rows): contents = [''] * columns for column in range(0, columns): try: - ooi = [item for item in sample_list if item['row']==row+1 and item['column']==column+1][0] + ooi = [item for item in sample_list if item['row'] == row + 1 and item['column'] == column + 1][0] except IndexError: continue contents[column] = ooi['submitter_id'] @@ -545,8 +552,9 @@ class DocxWriter(object): contents += [''] * (columns - len(contents)) if not contents: contents = [''] * columns - output.append(contents) - return output + yield contents + # output.append(contents) + # return output def create_merged_template(self, *args) -> BytesIO: """ @@ -554,7 +562,7 @@ class DocxWriter(object): Returns: BytesIO: Merged docx template - """ + """ merged_document = Document() output = BytesIO() for index, file in enumerate(args): @@ -567,7 +575,6 @@ class DocxWriter(object): merged_document.save(output) return output - def save(self, filename: Path | str): if isinstance(filename, str): filename = Path(filename) diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 9fb04af..e400089 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -851,6 +851,7 @@ class PydSubmission(BaseModel, extra='allow'): # logger.debug(f"Template rendered as: {render}") return render + @report_result def check_kit_integrity(self, extraction_kit: str | dict | None = None) -> Tuple[List[PydReagent], Report]: """ Ensures all reagents expected in kit are listed in Submission @@ -873,7 +874,7 @@ class PydSubmission(BaseModel, extra='allow'): ext_kit.get_reagents(required=True, submission_type=self.submission_type['value'])] # logger.debug(f"Kit reagents: {ext_kit_rtypes}") # logger.debug(f"Submission reagents: {self.reagents}") - # Exclude any reagenttype found in this pyd not expected in kit. + # NOTE: Exclude any reagenttype found in this pyd not expected in kit. expected_check = [item.role for item in ext_kit_rtypes] output_reagents = [rt for rt in self.reagents if rt.role in expected_check] # logger.debug(f"Already have these reagent types: {output_reagents}") @@ -882,7 +883,7 @@ class PydSubmission(BaseModel, extra='allow'): missing_reagents += [rt for rt in output_reagents if rt.missing] output_reagents += [rt for rt in missing_reagents if rt not in output_reagents] # logger.debug(f"Missing reagents types: {missing_reagents}") - # if lists are equal return no problem + # NOTE: if lists are equal return no problem if len(missing_reagents) == 0: result = None else: diff --git a/src/submissions/frontend/visualizations/control_charts.py b/src/submissions/frontend/visualizations/control_charts.py index b591c5b..087c900 100644 --- a/src/submissions/frontend/visualizations/control_charts.py +++ b/src/submissions/frontend/visualizations/control_charts.py @@ -1,16 +1,12 @@ """ Functions for constructing controls graphs using plotly. -TODO: Move these functions to widgets.controls_charts """ -import re import plotly import plotly.express as px import pandas as pd -from pandas import DataFrame from plotly.graph_objects import Figure import logging -# from backend.excel import get_unique_values_in_df_column -from tools import Settings, get_unique_values_in_df_column, divide_chunks +from tools import get_unique_values_in_df_column, divide_chunks from frontend.widgets.functions import select_save_file logger = logging.getLogger(f"submissions.{__name__}") @@ -18,232 +14,164 @@ logger = logging.getLogger(f"submissions.{__name__}") class CustomFigure(Figure): - def __init__(self, ctx: Settings, df: pd.DataFrame, ytitle: str | None = None): + def __init__(self, df: pd.DataFrame, modes: list, ytitle: str | None = None): super().__init__() + self.construct_chart(df=df, modes=modes) + self.generic_figure_markers(modes=modes, ytitle=ytitle) + def construct_chart(self, df: pd.DataFrame, modes: list): + """ + Creates a plotly chart for controls from a pandas dataframe -# NOTE: Start here. -def create_charts(ctx: Settings, df: pd.DataFrame, ytitle: str | None = None) -> Figure: - """ - Constructs figures based on parsed pandas dataframe. + Args: + df (pd.DataFrame): input dataframe of controls + modes (list): analysis modes to construct charts for + ytitle (str | None, optional): title on the y-axis. Defaults to None. - Args: - ctx (Settings): settings passed down from gui - df (pd.DataFrame): input dataframe - ytitle (str | None, optional): title for the y-axis. Defaults to None. - - Returns: - Figure: Plotly figure - """ - # from backend.excel import drop_reruns_from_df - # converts starred genera to normal and splits off list of starred - genera = [] - if df.empty: - return None - for item in df['genus'].to_list(): - try: - if item[-1] == "*": - genera.append(item[-1]) - else: - genera.append("") - except IndexError: - genera.append("") - df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"}) - df['genera'] = genera - # NOTE: remove original runs, using reruns if applicable - df = drop_reruns_from_df(ctx=ctx, df=df) - # NOTE: sort by and exclude from - sorts = ['submitted_date', "target", "genus"] - exclude = ['name', 'genera'] - modes = [item for item in df.columns if item not in sorts and item not in exclude] # and "_hashes" not in item] - # NOTE: Set descending for any columns that have "{mode}" in the header. - ascending = [False if item == "target" else True for item in sorts] - df = df.sort_values(by=sorts, ascending=ascending) - # logger.debug(df[df.isna().any(axis=1)]) - # NOTE: actual chart construction is done by - fig = construct_chart(df=df, modes=modes, ytitle=ytitle) - return fig - - -def drop_reruns_from_df(ctx: Settings, df: DataFrame) -> DataFrame: - """ - Removes semi-duplicates from dataframe after finding sequencing repeats. - - Args: - settings (dict): settings passed from gui - df (DataFrame): initial dataframe - - Returns: - DataFrame: dataframe with originals removed in favour of repeats. - """ - if 'rerun_regex' in ctx: - sample_names = get_unique_values_in_df_column(df, column_name="name") - rerun_regex = re.compile(fr"{ctx.rerun_regex}") - for sample in sample_names: - if rerun_regex.search(sample): - first_run = re.sub(rerun_regex, "", sample) - df = df.drop(df[df.name == first_run].index) - return df - - -def generic_figure_markers(fig: Figure, modes: list = [], ytitle: str | None = None) -> Figure: - """ - Adds standard layout to figure. - - Args: - fig (Figure): Input figure. - modes (list, optional): List of modes included in figure. Defaults to []. - ytitle (str, optional): Title for the y-axis. Defaults to None. - - Returns: - Figure: Output figure with updated titles, rangeslider, buttons. - """ - if modes != []: - ytitle = modes[0] - # Creating visibles list for each mode. - fig.update_layout( - xaxis_title="Submitted Date (* - Date parsed from fastq file creation date)", - yaxis_title=ytitle, - showlegend=True, - barmode='stack', - updatemenus=[ - dict( - type="buttons", - direction="right", - x=0.7, - y=1.2, - showactive=True, - buttons=make_buttons(modes=modes, fig_len=len(fig.data)), - ) - ] - ) - fig.update_xaxes( - rangeslider_visible=True, - rangeselector=dict( - buttons=list([ - dict(count=1, label="1m", step="month", stepmode="backward"), - dict(count=3, label="3m", step="month", stepmode="backward"), - dict(count=6, label="6m", step="month", stepmode="backward"), - dict(count=1, label="YTD", step="year", stepmode="todate"), - dict(count=1, label="1y", step="year", stepmode="backward"), - dict(step="all") - ]) - ) - ) - assert type(fig) == Figure - return fig - - -def make_buttons(modes: list, fig_len: int) -> list: - """ - Creates list of buttons with one for each mode to be used in showing/hiding mode traces. - - Args: - modes (list): list of modes used by main parser. - fig_len (int): number of traces in the figure - - Returns: - list: list of buttons. - """ - buttons = [] - if len(modes) > 1: + Returns: + Figure: output stacked bar chart. + """ + # fig = Figure() for ii, mode in enumerate(modes): - # What I need to do is create a list of bools with the same length as the fig.data - mode_vis = [True] * fig_len - # And break it into {len(modes)} chunks - mode_vis = list(divide_chunks(mode_vis, len(modes))) - # Then, for each chunk, if the chunk index isn't equal to the index of the current mode, set to false - for jj, sublist in enumerate(mode_vis): - if jj != ii: - mode_vis[jj] = [not elem for elem in mode_vis[jj]] - # Finally, flatten list. - mode_vis = [item for sublist in mode_vis for item in sublist] - # Now, make button to add to list - buttons.append(dict(label=mode, method="update", args=[ - {"visible": mode_vis}, - {"yaxis.title.text": mode}, + if "count" in mode: + df[mode] = pd.to_numeric(df[mode], errors='coerce') + color = "genus" + color_discrete_sequence = None + elif 'percent' in mode: + color = "genus" + color_discrete_sequence = None + else: + color = "target" + match get_unique_values_in_df_column(df, 'target'): + case ['Target']: + color_discrete_sequence = ["blue"] + case ['Off-target']: + color_discrete_sequence = ['red'] + case _: + color_discrete_sequence = ['blue', 'red'] + bar = px.bar(df, + x="submitted_date", + y=mode, + color=color, + title=mode, + barmode='stack', + hover_data=["genus", "name", "target", mode], + text="genera", + color_discrete_sequence=color_discrete_sequence + ) + bar.update_traces(visible=ii == 0) + self.add_traces(bar.data) + # return generic_figure_markers(modes=modes, ytitle=ytitle) + + def generic_figure_markers(self, modes: list = [], ytitle: str | None = None): + """ + Adds standard layout to figure. + + Args: + fig (Figure): Input figure. + modes (list, optional): List of modes included in figure. Defaults to []. + ytitle (str, optional): Title for the y-axis. Defaults to None. + + Returns: + Figure: Output figure with updated titles, rangeslider, buttons. + """ + if modes: + ytitle = modes[0] + # Creating visibles list for each mode. + self.update_layout( + xaxis_title="Submitted Date (* - Date parsed from fastq file creation date)", + yaxis_title=ytitle, + showlegend=True, + barmode='stack', + updatemenus=[ + dict( + type="buttons", + direction="right", + x=0.7, + y=1.2, + showactive=True, + buttons=[button for button in self.make_buttons(modes=modes)], + ) ] - )) - return buttons + ) + self.update_xaxes( + rangeslider_visible=True, + rangeselector=dict( + buttons=list([ + dict(count=1, label="1m", step="month", stepmode="backward"), + dict(count=3, label="3m", step="month", stepmode="backward"), + dict(count=6, label="6m", step="month", stepmode="backward"), + dict(count=1, label="YTD", step="year", stepmode="todate"), + dict(count=1, label="1y", step="year", stepmode="backward"), + dict(step="all") + ]) + ) + ) + assert isinstance(self, Figure) + # return fig + def make_buttons(self, modes: list) -> list: + """ + Creates list of buttons with one for each mode to be used in showing/hiding mode traces. -def output_figures(figs: list, group_name: str): - """ - Writes plotly figure to html file. + Args: + modes (list): list of modes used by main parser. + fig_len (int): number of traces in the figure - Args: - settings (dict): settings passed down from click - fig (Figure): input figure object - group_name (str): controltype - """ - output = select_save_file(None, default_name=group_name, extension="html") - with open(output, "w") as f: - for fig in figs: + Returns: + list: list of buttons. + """ + fig_len = len(self.data) + if len(modes) > 1: + for ii, mode in enumerate(modes): + # What I need to do is create a list of bools with the same length as the fig.data + mode_vis = [True] * fig_len + # And break it into {len(modes)} chunks + mode_vis = list(divide_chunks(mode_vis, len(modes))) + # Then, for each chunk, if the chunk index isn't equal to the index of the current mode, set to false + for jj, sublist in enumerate(mode_vis): + if jj != ii: + mode_vis[jj] = [not elem for elem in mode_vis[jj]] + # Finally, flatten list. + mode_vis = [item for sublist in mode_vis for item in sublist] + # Now, yield button to add to list + yield dict(label=mode, method="update", args=[ + {"visible": mode_vis}, + {"yaxis.title.text": mode}, + ]) + + def save_figure(self, group_name: str = "plotly_output"): + """ + Writes plotly figure to html file. + + Args: + figs (): + settings (dict): settings passed down from click + fig (Figure): input figure object + group_name (str): controltype + """ + output = select_save_file(None, default_name=group_name, extension="html") + with open(output, "w") as f: try: - f.write(fig.to_html(full_html=False, include_plotlyjs='cdn')) + f.write(self.to_html()) except AttributeError: - logger.error(f"The following figure was a string: {fig}") + logger.error(f"The following figure was a string: {self}") + def to_html(self) -> str: + """ + Creates final html code from plotly -def construct_chart(df: pd.DataFrame, modes: list, ytitle: str | None = None) -> Figure: - """ - Creates a plotly chart for controls from a pandas dataframe + Args: + figure (Figure): input figure - Args: - df (pd.DataFrame): input dataframe of controls - modes (list): analysis modes to construct charts for - ytitle (str | None, optional): title on the y-axis. Defaults to None. - - Returns: - Figure: output stacked bar chart. - """ - fig = Figure() - for ii, mode in enumerate(modes): - if "count" in mode: - df[mode] = pd.to_numeric(df[mode], errors='coerce') - color = "genus" - color_discrete_sequence = None - elif 'percent' in mode: - color = "genus" - color_discrete_sequence = None + Returns: + str: html string + """ + html = '
' + if self is not None: + html += plotly.offline.plot(self, output_type='div', + include_plotlyjs='cdn') #, image = 'png', auto_open=True, image_filename='plot_image') else: - color = "target" - match get_unique_values_in_df_column(df, 'target'): - case ['Target']: - color_discrete_sequence = ["blue"] - case ['Off-target']: - color_discrete_sequence = ['red'] - case _: - color_discrete_sequence = ['blue', 'red'] - bar = px.bar(df, x="submitted_date", - y=mode, - color=color, - title=mode, - barmode='stack', - hover_data=["genus", "name", "target", mode], - text="genera", - color_discrete_sequence=color_discrete_sequence - ) - bar.update_traces(visible=ii == 0) - fig.add_traces(bar.data) - return generic_figure_markers(fig=fig, modes=modes, ytitle=ytitle) - - -def construct_html(figure: Figure) -> str: - """ - Creates final html code from plotly - - Args: - figure (Figure): input figure - - Returns: - str: html string - """ - html = '' - if figure is not None: - html += plotly.offline.plot(figure, output_type='div', - include_plotlyjs='cdn') #, image = 'png', auto_open=True, image_filename='plot_image') - else: - html += "