diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 1f8ad0d..fb5c396 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -147,10 +147,6 @@ class BaseClass(Base): case _: return query.limit(limit).all() - @classmethod - def default_info_return(cls, info, *args): - return info - def save(self): """ Add the object to the database and commit @@ -191,7 +187,7 @@ class ConfigItem(BaseClass): from .controls import * -# import order must go: orgs, kit, subs due to circular import issues +# NOTE: import order must go: orgs, kit, subs due to circular import issues from .organizations import * from .kits import * from .submissions import * diff --git a/src/submissions/backend/db/models/controls.py b/src/submissions/backend/db/models/controls.py index a60b6a5..423dd29 100644 --- a/src/submissions/backend/db/models/controls.py +++ b/src/submissions/backend/db/models/controls.py @@ -84,7 +84,7 @@ class ControlType(BaseClass): Returns: List[ControlType]: Control types that have targets """ - return [item for item in cls.query() if item.targets != []] + return [item for item in cls.query() if item.targets]# != []] @classmethod def build_positive_regex(cls) -> Pattern: diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index b8e14d7..061e32d 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -743,7 +743,7 @@ class SubmissionType(BaseClass): item.equipment_role == equipment_role] case _: raise TypeError(f"Type {type(equipment_role)} is not allowed") - return list(set([item for items in relevant for item in items if item != None])) + return list(set([item for items in relevant for item in items if item is not None])) def get_submission_class(self) -> "BasicSubmission": """ @@ -982,7 +982,7 @@ class KitTypeReagentRoleAssociation(BaseClass): query = query.join(ReagentRole).filter(ReagentRole.name == reagent_role) case _: pass - if kit_type != None and reagent_role != None: + if kit_type is not None and reagent_role is not None: limit = 1 return cls.execute_query(query=query, limit=limit) @@ -1339,7 +1339,7 @@ class EquipmentRole(BaseClass): if isinstance(submission_type, str): # logger.debug(f"Checking if str {submission_type} exists") submission_type = SubmissionType.query(name=submission_type) - if submission_type != None: + if submission_type is not None: # logger.debug("Getting all processes for this EquipmentRole") processes = [process for process in self.processes if submission_type in process.submission_types] else: @@ -1421,7 +1421,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): back_populates="equipmentrole_submissiontype_associations") #: associated equipment @validates('static') - def validate_age(self, key, value): + def validate_static(self, key, value): """ Ensures only 1 & 0 used in 'static' @@ -1451,7 +1451,7 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass): """ processes = [equipment.get_processes(self.submission_type) for equipment in self.equipment_role.instances] # flatten list - processes = [item for items in processes for item in items if item != None] + processes = [item for items in processes for item in items if item is not None] match extraction_kit: case str(): # logger.debug(f"Filtering Processes by extraction_kit str {extraction_kit}") @@ -1474,7 +1474,7 @@ class Process(BaseClass): """ id = Column(INTEGER, primary_key=True) #: Process id, primary key - name = Column(String(64)) #: Process name + name = Column(String(64), unique=True) #: Process name submission_types = relationship("SubmissionType", back_populates='processes', secondary=submissiontypes_processes) #: relation to SubmissionType equipment = relationship("Equipment", back_populates='processes', @@ -1497,7 +1497,10 @@ class Process(BaseClass): @classmethod @setup_lookup - def query(cls, name: str | None = None, limit: int = 0) -> Process | List[Process]: + def query(cls, + name: str | None = None, + id: int = 1, + limit: int = 0) -> Process | List[Process]: """ Lookup Processes @@ -1516,9 +1519,19 @@ class Process(BaseClass): limit = 1 case _: pass + match id: + case int(): + query = query.filter(cls.id == id) + limit = 1 + case _: + pass return cls.execute_query(query=query, limit=limit) + @check_authorization + def save(self): + super().save() + class TipRole(BaseClass): """ An abstract role that a tip fills during a process @@ -1539,6 +1552,10 @@ class TipRole(BaseClass): def __repr__(self): return f"" + + @check_authorization + def save(self): + super().save() class Tips(BaseClass): @@ -1593,6 +1610,10 @@ class Tips(BaseClass): case _: pass return cls.execute_query(query=query, limit=limit) + + @check_authorization + def save(self): + super().save() class SubmissionTypeTipRoleAssociation(BaseClass): @@ -1609,6 +1630,10 @@ class SubmissionTypeTipRoleAssociation(BaseClass): back_populates="submissiontype_tiprole_associations") #: associated submission tip_role = relationship(TipRole, back_populates="tiprole_submissiontype_associations") #: associated equipment + + @check_authorization + def save(self): + super().save() class SubmissionTipsAssociation(BaseClass): diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py index 7c70f5b..ec352bd 100644 --- a/src/submissions/backend/db/models/organizations.py +++ b/src/submissions/backend/db/models/organizations.py @@ -70,7 +70,6 @@ class Organization(BaseClass): case _: pass return cls.execute_query(query=query, limit=limit) - # return query.first() @check_authorization def save(self): @@ -117,7 +116,6 @@ class Contact(BaseClass): Returns: Contact|List[Contact]: Contact(s) of interest. """ - # super().query(session) query: Query = cls.__database_session__.query(cls) match name: case str(): diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 06c09f8..3419f28 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -22,7 +22,7 @@ import pandas as pd from openpyxl import Workbook from openpyxl.worksheet.worksheet import Worksheet from openpyxl.drawing.image import Image as OpenpyxlImage -from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys +from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr from datetime import datetime, date from typing import List, Any, Tuple, Literal from dateutil.parser import parse @@ -145,7 +145,6 @@ class BasicSubmission(BaseClass): output += BasicSubmission.timestamps() return output - # TODO: Beef up this to include info_map from DB @classmethod def get_default_info(cls, *args): # NOTE: Create defaults for all submission_types @@ -443,7 +442,7 @@ class BasicSubmission(BaseClass): """ # logger.debug(f"Querying Type: {submission_type}") # logger.debug(f"Using limit: {limit}") - # use lookup function to create list of dicts + # NOTE: use lookup function to create list of dicts subs = [item.to_dict() for item in cls.query(submission_type=submission_type, limit=limit, chronologic=chronologic)] # logger.debug(f"Got {len(subs)} submissions.") @@ -498,7 +497,7 @@ class BasicSubmission(BaseClass): case "submission_type": field_value = SubmissionType.query(name=value) case "sample_count": - if value == None: + if value is None: field_value = len(self.samples) else: field_value = value @@ -607,7 +606,7 @@ class BasicSubmission(BaseClass): super().save() @classmethod - def get_regex(cls): + def get_regex(cls) -> str: return cls.construct_regex() # Polymorphic functions @@ -742,7 +741,6 @@ class BasicSubmission(BaseClass): str: Updated name. """ # logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") - # return instr from backend.validators import RSLNamer # logger.debug(f"instr coming into {cls}: {instr}") # logger.debug(f"data coming into {cls}: {data}") @@ -773,7 +771,7 @@ class BasicSubmission(BaseClass): # logger.debug(f"After addition of plate number the plate name is: {outstr}") try: repeat = re.search(r"-\dR(?P\d)?", outstr).groupdict()['repeat'] - if repeat == None: + if repeat is None: repeat = "1" except AttributeError as e: repeat = "" @@ -835,6 +833,15 @@ class BasicSubmission(BaseClass): @classmethod def adjust_autofill_samples(cls, samples: List[Any]) -> List[Any]: + """ + Makes adjustments to samples before writing to excel. + + Args: + samples (List[Any]): List of Samples + + Returns: + List[Any]: Updated list of samples + """ logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} sampler") return samples @@ -953,7 +960,7 @@ class BasicSubmission(BaseClass): query = query.filter(model.submitted_date == start_date) else: query = query.filter(model.submitted_date.between(start_date, end_date)) - # by reagent (for some reason) + # NOTE: by reagent (for some reason) match reagent: case str(): # logger.debug(f"Looking up BasicSubmission with reagent: {reagent}") @@ -965,7 +972,7 @@ class BasicSubmission(BaseClass): SubmissionSampleAssociation.reagent).filter(Reagent.lot == reagent) case _: pass - # by rsl number (returns only a single value) + # NOTE: by rsl number (returns only a single value) match rsl_plate_num: case str(): query = query.filter(model.rsl_plate_num == rsl_plate_num) @@ -973,7 +980,7 @@ class BasicSubmission(BaseClass): limit = 1 case _: pass - # by id (returns only a single value) + # NOTE: by id (returns only a single value) match id: case int(): # logger.debug(f"Looking up BasicSubmission with id: {id}") @@ -1051,7 +1058,7 @@ class BasicSubmission(BaseClass): Performs backup and deletes this instance from database. Args: - obj (_type_, optional): Parent Widget. Defaults to None. + obj (_type_, optional): Parent widget. Defaults to None. Raises: e: _description_ @@ -1075,7 +1082,7 @@ class BasicSubmission(BaseClass): Creates Widget for showing submission details. Args: - obj (_type_): parent widget + obj (_type_): Parent widget """ # logger.debug("Hello from details") from frontend.widgets.submission_details import SubmissionDetails @@ -1084,6 +1091,12 @@ class BasicSubmission(BaseClass): pass def edit(self, obj): + """ + Return submission to form widget for updating + + Args: + obj (Widget): Parent widget + """ from frontend.widgets.submission_widget import SubmissionFormWidget for widg in obj.app.table_widget.formwidget.findChildren(SubmissionFormWidget): # logger.debug(widg) @@ -1224,9 +1237,9 @@ class BacterialCulture(BasicSubmission): """ from . import ControlType input_dict = super().finalize_parse(input_dict, xl, info_map) - # build regex for all control types that have targets + # NOTE: build regex for all control types that have targets regex = ControlType.build_positive_regex() - # search samples for match + # NOTE: search samples for match for sample in input_dict['samples']: matched = regex.match(sample['submitter_id']) if bool(matched): @@ -1311,7 +1324,7 @@ class Wastewater(BasicSubmission): dict: Updated sample dictionary """ input_dict = super().custom_info_parser(input_dict) - if xl != None: + if xl is not None: input_dict['csv'] = xl["Copy to import file"] return input_dict @@ -1355,7 +1368,7 @@ class Wastewater(BasicSubmission): Extends parent """ try: - # Deal with PCR file. + # NOTE: Deal with PCR file. instr = re.sub(r"PCR(-|_)", "", instr) except (AttributeError, TypeError) as e: logger.error(f"Problem using regex: {e}") @@ -1413,6 +1426,15 @@ class Wastewater(BasicSubmission): @classmethod def finalize_details(cls, input_dict: dict) -> dict: + """ + Makes changes to information before display + + Args: + input_dict (dict): Input information + + Returns: + dict: Updated information + """ input_dict = super().finalize_details(input_dict) dummy_samples = [] for item in input_dict['samples']: @@ -1430,11 +1452,23 @@ class Wastewater(BasicSubmission): return input_dict def custom_context_events(self) -> dict: + """ + Sets context events for main widget + + Returns: + dict: Context menu items for this instance. + """ events = super().custom_context_events() events['Link PCR'] = self.link_pcr return events def link_pcr(self, obj): + """ + Adds PCR info to this submission + + Args: + obj (_type_): Parent widget + """ from backend.excel import PCRParser from frontend.widgets import select_open_file fname = select_open_file(obj=obj, file_extension="xlsx") @@ -1562,7 +1596,7 @@ class WastewaterArtic(BasicSubmission): """ input_dict = super().parse_samples(input_dict) input_dict['sample_type'] = "Wastewater Sample" - # Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" + # NOTE: Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" # at the end, this has to be done here. No moving to sqlalchemy object :( input_dict['submitter_id'] = re.sub(r"\s\(.+\)\s?$", "", str(input_dict['submitter_id'])).strip() try: @@ -1576,9 +1610,10 @@ class WastewaterArtic(BasicSubmission): except KeyError: logger.error(f"Unable to set ww_processing_num for sample {input_dict['submitter_id']}") year = str(date.today().year)[-2:] - # if "ENC" in input_dict['submitter_id']: + # NOTE: Check for extraction negative control (Enterics) if re.search(rf"^{year}-(ENC)", input_dict['submitter_id']): input_dict['rsl_number'] = cls.en_adapter(input_str=input_dict['submitter_id']) + # NOTE: Check for extraction negative control (Robotics) if re.search(rf"^{year}-(RSL)", input_dict['submitter_id']): input_dict['rsl_number'] = cls.pbs_adapter(input_str=input_dict['submitter_id']) return input_dict @@ -1595,11 +1630,11 @@ class WastewaterArtic(BasicSubmission): str: output name """ # logger.debug(f"input string raw: {input_str}") - # Remove letters. + # NOTE: Remove letters. processed = input_str.replace("RSL", "") processed = re.sub(r"\(.*\)$", "", processed).strip() processed = re.sub(r"[A-QS-Z]+\d*", "", processed) - # Remove trailing '-' if any + # NOTE: Remove trailing '-' if any processed = processed.strip("-") # logger.debug(f"Processed after stripping letters: {processed}") try: @@ -1632,7 +1667,7 @@ class WastewaterArtic(BasicSubmission): @classmethod def pbs_adapter(cls, input_str): """ - Stopgap solution because WW names their ENs different + Stopgap solution because WW names their controls different Args: input_str (str): input name @@ -1641,20 +1676,13 @@ class WastewaterArtic(BasicSubmission): str: output name """ # logger.debug(f"input string raw: {input_str}") - # Remove letters. + # NOTE: Remove letters. processed = input_str.replace("RSL", "") processed = re.sub(r"\(.*\)$", "", processed).strip() processed = re.sub(r"[A-QS-Z]+\d*", "", processed) - # Remove trailing '-' if any + # NOTE: Remove trailing '-' if any processed = processed.strip("-") # logger.debug(f"Processed after stripping letters: {processed}") - # try: - # en_num = re.search(r"\-\d{1}$", processed).group() - # processed = rreplace(processed, en_num, "") - # except AttributeError: - # en_num = "1" - # en_num = en_num.strip("-") - # logger.debug(f"Processed after en_num: {processed}") try: plate_num = re.search(r"\-\d{1}R?\d?$", processed).group() processed = rreplace(processed, plate_num, "") @@ -1728,13 +1756,15 @@ class WastewaterArtic(BasicSubmission): Workbook: Updated workbook """ input_excel = super().custom_info_writer(input_excel, info, backup) - logger.debug(f"Info:\n{pformat(info)}") - check = 'source_plates' in info.keys() and info['source_plates'] is not None - if check: + # logger.debug(f"Info:\n{pformat(info)}") + # NOTE: check for source plate information + # check = 'source_plates' in info.keys() and info['source_plates'] is not None + if check_key_or_attr(key='source_plates', interest=info, check_none=True): worksheet = input_excel['First Strand List'] start_row = 8 + # NOTE: write source plates to First strand list for iii, plate in enumerate(info['source_plates']['value']): - logger.debug(f"Plate: {plate}") + # logger.debug(f"Plate: {plate}") row = start_row + iii try: worksheet.cell(row=row, column=3, value=plate['plate']) @@ -1744,41 +1774,45 @@ class WastewaterArtic(BasicSubmission): worksheet.cell(row=row, column=4, value=plate['starting_sample']) except TypeError: pass - check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None - if check: + # NOTE: check for gel information + # check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None + if check_key_or_attr(key='gel_info', interest=info, check_none=True): # logger.debug(f"Gel info check passed.") - if info['gel_info'] != None: - # logger.debug(f"Gel info not none.") - worksheet = input_excel['Egel results'] - start_row = 21 - start_column = 15 - for row, ki in enumerate(info['gel_info']['value'], start=1): - # logger.debug(f"ki: {ki}") - # logger.debug(f"vi: {vi}") - row = start_row + row - worksheet.cell(row=row, column=start_column, value=ki['name']) - for jjj, kj in enumerate(ki['values'], start=1): - # logger.debug(f"kj: {kj}") - # logger.debug(f"vj: {vj}") - column = start_column + 2 + jjj - worksheet.cell(row=start_row, column=column, value=kj['name']) - # logger.debug(f"Writing {kj['name']} with value {kj['value']} to row {row}, column {column}") - try: - worksheet.cell(row=row, column=column, value=kj['value']) - except AttributeError: - logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}") - check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None - if check: - if info['gel_image'] != None: - worksheet = input_excel['Egel results'] - # logger.debug(f"We got an image: {info['gel_image']}") - with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: - z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name)) - img = OpenpyxlImage(z) - img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) - img.width = 600 - img.anchor = 'B9' - worksheet.add_image(img) + # if info['gel_info'] is not None: + # logger.debug(f"Gel info not none.") + # NOTE: print json field gel results to Egel results + worksheet = input_excel['Egel results'] + # TODO: Move all this into a seperate function? + # + start_row = 21 + start_column = 15 + for row, ki in enumerate(info['gel_info']['value'], start=1): + # logger.debug(f"ki: {ki}") + # logger.debug(f"vi: {vi}") + row = start_row + row + worksheet.cell(row=row, column=start_column, value=ki['name']) + for jjj, kj in enumerate(ki['values'], start=1): + # logger.debug(f"kj: {kj}") + # logger.debug(f"vj: {vj}") + column = start_column + 2 + jjj + worksheet.cell(row=start_row, column=column, value=kj['name']) + # logger.debug(f"Writing {kj['name']} with value {kj['value']} to row {row}, column {column}") + try: + worksheet.cell(row=row, column=column, value=kj['value']) + except AttributeError: + logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}") + # check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None + if check_key_or_attr(key='gel_image', interest=info, check_none=True): + # if info['gel_image'] is not None: + worksheet = input_excel['Egel results'] + # logger.debug(f"We got an image: {info['gel_image']}") + with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: + z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name)) + img = OpenpyxlImage(z) + img.height = 400 # insert image height in pixels as float or int (e.g. 305.5) + img.width = 600 + img.anchor = 'B9' + worksheet.add_image(img) return input_excel @classmethod @@ -1796,55 +1830,35 @@ class WastewaterArtic(BasicSubmission): base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates", "gel_controls"] base_dict['DNA Core ID'] = base_dict['dna_core_submission_number'] - check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] != None - if check: + # check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] is not None + if check_key_or_attr(key='gel_info', interest=base_dict, check_none=True): headers = [item['name'] for item in base_dict['gel_info'][0]['values']] base_dict['headers'] = [''] * (4 - len(headers)) base_dict['headers'] += headers # logger.debug(f"Gel info: {pformat(base_dict['headers'])}") - check = 'gel_image' in base_dict.keys() and base_dict['gel_image'] != None - if check: + # check = 'gel_image' in base_dict.keys() and base_dict['gel_image'] is not None + if check_key_or_attr(key='gel_image', interest=base_dict, check_none=True): with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped: base_dict['gel_image'] = base64.b64encode(zipped.read(base_dict['gel_image'])).decode('utf-8') return base_dict, template - def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]: - """ - Updates sample dictionaries with custom values - - Args: - backup (bool, optional): Whether to perform backup. Defaults to False. - - Returns: - List[dict]: Updated dictionaries - """ - # logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.") - output = [] - # set_plate = None - for assoc in self.submission_sample_associations: - dicto = assoc.to_sub_dict() - # if self.source_plates is None: - # output.append(dicto) - # continue - # for item in self.source_plates: - # if assoc.sample.id is None: - # old_plate = None - # else: - # old_plate = WastewaterAssociation.query(submission=item['plate'], sample=assoc.sample, limit=1) - # if old_plate is not None: - # set_plate = old_plate.submission.rsl_plate_num - # # logger.debug(f"Dictionary: {pformat(dicto)}") - # if dicto['ww_processing_num'].startswith("NTC"): - # dicto['well'] = dicto['ww_processing_num'] - # else: - # dicto['well'] = f"{row_map[old_plate.row]}{old_plate.column}" - # break - # elif dicto['ww_processing_num'].startswith("NTC"): - # dicto['well'] = dicto['ww_processing_num'] - # dicto['plate_name'] = set_plate - # logger.debug(f"Here is our raw sample: {pformat(dicto)}") - output.append(dicto) - return output + # def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]: + # """ + # Updates sample dictionaries with custom values + # + # Args: + # backup (bool, optional): Whether to perform backup. Defaults to False. + # + # Returns: + # List[dict]: Updated dictionaries + # """ + # # logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.") + # output = [] + # + # for assoc in self.submission_sample_associations: + # dicto = assoc.to_sub_dict() + # output.append(dicto) + # return output def custom_context_events(self) -> dict: """ @@ -1880,7 +1894,7 @@ class WastewaterArtic(BasicSubmission): self.gel_info = output dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") com = dict(text=comment, name=getuser(), time=dt) - if com['text'] != None and com['text'] != "": + if com['text'] is not None and com['text'] != "": if self.comment is not None: self.comment.append(com) else: @@ -1938,7 +1952,7 @@ class BasicSample(BaseClass): Returns: str: new (or unchanged) submitter id """ - if value == None: + if value is None: return uuid.uuid4().hex.upper() else: return value @@ -2334,7 +2348,7 @@ class BacterialCultureSample(BasicSample): sample['name'] = self.submitter_id sample['organism'] = self.organism sample['concentration'] = self.concentration - if self.control != None: + if self.control is not None: sample['colour'] = [0, 128, 0] sample['tooltip'] = f"Control: {self.control.controltype.name} - {self.control.controltype.targets}" # logger.debug(f"Done converting to {self} to dict after {time()-start}") @@ -2480,7 +2494,7 @@ class SubmissionSampleAssociation(BaseClass): """ if isinstance(polymorphic_identity, dict): polymorphic_identity = polymorphic_identity['value'] - if polymorphic_identity == None: + if polymorphic_identity is None: output = cls else: try: diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 001a89d..16e74bc 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -15,7 +15,7 @@ import logging, re from collections import OrderedDict from datetime import date from dateutil.parser import parse, ParserError -from tools import check_not_nan, convert_nans_to_nones, is_missing, remove_key_from_list_of_dicts +from tools import check_not_nan, convert_nans_to_nones, is_missing, remove_key_from_list_of_dicts, check_key_or_attr logger = logging.getLogger(f"submissions.{__name__}") @@ -83,7 +83,7 @@ class SheetParser(object): Args: extraction_kit (str | None, optional): Relevant extraction kit for reagent map. Defaults to None. """ - if extraction_kit == None: + if extraction_kit is None: extraction_kit = self.sub['extraction_kit'] # logger.debug(f"Parsing reagents for {extraction_kit}") self.sub['reagents'] = ReagentParser(xl=self.xl, submission_type=self.submission_type, @@ -491,11 +491,11 @@ class SampleParser(object): break else: new = psample - try: - check = new['submitter_id'] is None - except KeyError: - check = True - if check: + # try: + # check = new['submitter_id'] is None + # except KeyError: + # check = True + if not check_key_or_attr(key='submitter_id', interest=new, check_none=True): new['submitter_id'] = psample['id'] new = self.sub_object.parse_samples(new) samples.append(new) diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index a8f25d2..b637e27 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -1,17 +1,138 @@ ''' Contains functions for generating summary reports ''' -from pandas import DataFrame +from pandas import DataFrame, ExcelWriter import logging, re +from pathlib import Path from datetime import date, timedelta -from typing import List, Tuple -from tools import jinja_template_loading, Settings +from typing import List, Tuple, Any +from backend.db.models import BasicSubmission +from tools import jinja_template_loading, Settings, get_unique_values_in_df_column, html_to_pdf, get_first_blank_df_row, \ + row_map +from PyQt6.QtWidgets import QWidget +from openpyxl.worksheet.worksheet import Worksheet logger = logging.getLogger(f"submissions.{__name__}") env = jinja_template_loading() -def make_report_xlsx(records:list[dict]) -> Tuple[DataFrame, DataFrame]: + +class ReportMaker(object): + + def __init__(self, start_date: date, end_date: date): + subs = BasicSubmission.query(start_date=start_date, end_date=end_date) + records = [item.to_dict(report=True) for item in subs] + self.detailed_df, self.summary_df = self.make_report_xlsx(records=records) + self.html = self.make_report_html(df=self.summary_df, start_date=start_date, end_date=end_date) + + def make_report_xlsx(self, records: list[dict]) -> Tuple[DataFrame, DataFrame]: + """ + create the dataframe for a report + + Args: + records (list[dict]): list of dictionaries created from submissions + + Returns: + DataFrame: output dataframe + """ + df = DataFrame.from_records(records) + # NOTE: put submissions with the same lab together + df = df.sort_values("submitting_lab") + # NOTE: aggregate cost and sample count columns + df2 = df.groupby(["submitting_lab", "extraction_kit"]).agg( + {'extraction_kit': 'count', 'cost': 'sum', 'sample_count': 'sum'}) + df2 = df2.rename(columns={"extraction_kit": 'run_count'}) + # logger.debug(f"Output daftaframe for xlsx: {df2.columns}") + df = df.drop('id', axis=1) + df = df.sort_values(['submitting_lab', "submitted_date"]) + return df, df2 + + def make_report_html(self, df: DataFrame, start_date: date, end_date: date) -> str: + + """ + generates html from the report dataframe + + Args: + df (DataFrame): input dataframe generated from 'make_report_xlsx' above + start_date (date): starting date of the report period + end_date (date): ending date of the report period + + Returns: + str: html string + """ + old_lab = "" + output = [] + # logger.debug(f"Report DataFrame: {df}") + for ii, row in enumerate(df.iterrows()): + # logger.debug(f"Row {ii}: {row}") + lab = row[0][0] + # logger.debug(type(row)) + # logger.debug(f"Old lab: {old_lab}, Current lab: {lab}") + # logger.debug(f"Name: {row[0][1]}") + data = [item for item in row[1]] + kit = dict(name=row[0][1], cost=data[1], run_count=int(data[0]), sample_count=int(data[2])) + # if this is the same lab as before add together + if lab == old_lab: + output[-1]['kits'].append(kit) + output[-1]['total_cost'] += kit['cost'] + output[-1]['total_samples'] += kit['sample_count'] + output[-1]['total_runs'] += kit['run_count'] + # if not the same lab, make a new one + else: + adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'], + total_runs=kit['run_count']) + output.append(adder) + old_lab = lab + # logger.debug(output) + dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output} # , "table":table} + temp = env.get_template('summary_report.html') + html = temp.render(input=dicto) + return html + + def write_report(self, filename: Path | str, obj: QWidget | None = None): + if isinstance(filename, str): + filename = Path(filename) + filename = filename.absolute() + # NOTE: html_to_pdf doesn't function without a PyQt6 app + if isinstance(obj, QWidget): + logger.info(f"We're in PyQt environment, writing PDF to: {filename}") + html_to_pdf(html=self.html, output_file=filename) + else: + logger.info("Not in PyQt. Skipping PDF writing.") + # logger.debug("Finished writing.") + self.writer = ExcelWriter(filename.with_suffix(".xlsx"), engine='openpyxl') + self.summary_df.to_excel(self.writer, sheet_name="Report") + self.detailed_df.to_excel(self.writer, sheet_name="Details", index=False) + self.fix_up_xl() + # logger.debug(f"Writing report to: {filename}") + self.writer.close() + + def fix_up_xl(self): + # logger.debug(f"Updating worksheet") + worksheet: Worksheet = self.writer.sheets['Report'] + for idx, col in enumerate(self.summary_df, start=1): # loop through all columns + series = self.summary_df[col] + max_len = max(( + series.astype(str).map(len).max(), # len of largest item + len(str(series.name)) # len of column name/header + )) + 20 # NOTE: adding a little extra space + try: + # NOTE: Convert idx to letter + col_letter = chr(ord('@') + idx) + worksheet.column_dimensions[col_letter].width = max_len + except ValueError as e: + logger.error(f"Couldn't resize column {col} due to {e}") + blank_row = get_first_blank_df_row(self.summary_df) + 1 + # logger.debug(f"Blank row index = {blank_row}") + for col in range(3, 6): + col_letter = row_map[col] + worksheet.cell(row=blank_row, column=col, value=f"=SUM({col_letter}2:{col_letter}{str(blank_row - 1)})") + for cell in worksheet['D']: + if cell.row > 1: + cell.style = 'Currency' + + +def make_report_xlsx(records: list[dict]) -> Tuple[DataFrame, DataFrame]: """ create the dataframe for a report @@ -20,20 +141,21 @@ def make_report_xlsx(records:list[dict]) -> Tuple[DataFrame, DataFrame]: Returns: DataFrame: output dataframe - """ + """ df = DataFrame.from_records(records) - # put submissions with the same lab together + # NOTE: put submissions with the same lab together df = df.sort_values("submitting_lab") - # aggregate cost and sample count columns - df2 = df.groupby(["submitting_lab", "extraction_kit"]).agg({'extraction_kit':'count', 'cost': 'sum', 'sample_count':'sum'}) + # NOTE: aggregate cost and sample count columns + df2 = df.groupby(["submitting_lab", "extraction_kit"]).agg( + {'extraction_kit': 'count', 'cost': 'sum', 'sample_count': 'sum'}) df2 = df2.rename(columns={"extraction_kit": 'run_count'}) # logger.debug(f"Output daftaframe for xlsx: {df2.columns}") df = df.drop('id', axis=1) df = df.sort_values(['submitting_lab', "submitted_date"]) return df, df2 -def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str: - + +def make_report_html(df: DataFrame, start_date: date, end_date: date) -> str: """ generates html from the report dataframe @@ -44,7 +166,7 @@ def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str: Returns: str: html string - """ + """ old_lab = "" output = [] # logger.debug(f"Report DataFrame: {df}") @@ -64,16 +186,19 @@ def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str: output[-1]['total_runs'] += kit['run_count'] # if not the same lab, make a new one else: - adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'], total_runs=kit['run_count']) + adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'], + total_runs=kit['run_count']) output.append(adder) old_lab = lab # logger.debug(output) - dicto = {'start_date':start_date, 'end_date':end_date, 'labs':output}#, "table":table} + dicto = {'start_date': start_date, 'end_date': end_date, 'labs': output} #, "table":table} temp = env.get_template('summary_report.html') html = temp.render(input=dicto) return html -def convert_data_list_to_df(input:list[dict], subtype:str|None=None) -> DataFrame: + +# TODO: move this into a classmethod of Controls? +def convert_data_list_to_df(input: list[dict], subtype: str | None = None) -> DataFrame: """ Convert list of control records to dataframe @@ -84,8 +209,8 @@ def convert_data_list_to_df(input:list[dict], subtype:str|None=None) -> DataFram Returns: DataFrame: dataframe of controls - """ - + """ + df = DataFrame.from_records(input) safe = ['name', 'submitted_date', 'genus', 'target'] for column in df.columns: @@ -94,7 +219,7 @@ def convert_data_list_to_df(input:list[dict], subtype:str|None=None) -> DataFram # NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating. df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum') if column not in safe: - if subtype != None and column != subtype: + if subtype is not None and column != subtype: del df[column] # NOTE: move date of sample submitted on same date as previous ahead one. df = displace_date(df) @@ -102,7 +227,8 @@ def convert_data_list_to_df(input:list[dict], subtype:str|None=None) -> DataFram df = df_column_renamer(df=df) return df -def df_column_renamer(df:DataFrame) -> DataFrame: + +def df_column_renamer(df: DataFrame) -> DataFrame: """ Ad hoc function I created to clarify some fields @@ -111,16 +237,17 @@ def df_column_renamer(df:DataFrame) -> DataFrame: Returns: DataFrame: dataframe with 'clarified' column names - """ - df = df[df.columns.drop(list(df.filter(regex='_hashes')))] - return df.rename(columns = { - "contains_ratio":"contains_shared_hashes_ratio", - "matches_ratio":"matches_shared_hashes_ratio", - "kraken_count":"kraken2_read_count_(top_50)", - "kraken_percent":"kraken2_read_percent_(top_50)" + """ + df = df[df.columns.drop(list(df.filter(regex='_hashes')))] + return df.rename(columns={ + "contains_ratio": "contains_shared_hashes_ratio", + "matches_ratio": "matches_shared_hashes_ratio", + "kraken_count": "kraken2_read_count_(top_50)", + "kraken_percent": "kraken2_read_percent_(top_50)" }) -def displace_date(df:DataFrame) -> DataFrame: + +def displace_date(df: DataFrame) -> DataFrame: """ This function serves to split samples that were submitted on the same date by incrementing dates. It will shift the date forward by one day if it is the same day as an existing date in a list. @@ -130,16 +257,18 @@ def displace_date(df:DataFrame) -> DataFrame: Returns: DataFrame: output dataframe with dates incremented. - """ + """ # logger.debug(f"Unique items: {df['name'].unique()}") # NOTE: get submitted dates for each control - dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in sorted(df['name'].unique())] + dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in + sorted(df['name'].unique())] previous_dates = [] for _, item in enumerate(dict_list): df, previous_dates = check_date(df=df, item=item, previous_dates=previous_dates) return df -def check_date(df:DataFrame, item:dict, previous_dates:list) -> Tuple[DataFrame, list]: + +def check_date(df: DataFrame, item: dict, previous_dates: list) -> Tuple[DataFrame, list]: """ Checks if an items date is already present in df and adjusts df accordingly @@ -150,7 +279,7 @@ def check_date(df:DataFrame, item:dict, previous_dates:list) -> Tuple[DataFrame, Returns: Tuple[DataFrame, list]: Output dataframe and appended list of previous dates - """ + """ try: check = item['date'] in previous_dates except IndexError: @@ -177,21 +306,23 @@ def check_date(df:DataFrame, item:dict, previous_dates:list) -> Tuple[DataFrame, logger.warning(f"Date check failed, running recursion") df, previous_dates = check_date(df, item, previous_dates) return df, previous_dates - -def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list: - """ - get all unique values in a dataframe column by name - Args: - df (DataFrame): input dataframe - column_name (str): name of column of interest - Returns: - list: sorted list of unique values - """ - return sorted(df[column_name].unique()) +# def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list: +# """ +# get all unique values in a dataframe column by name +# +# Args: +# df (DataFrame): input dataframe +# column_name (str): name of column of interest +# +# Returns: +# list: sorted list of unique values +# """ +# return sorted(df[column_name].unique()) -def drop_reruns_from_df(ctx:Settings, df: DataFrame) -> DataFrame: + +def drop_reruns_from_df(ctx: Settings, df: DataFrame) -> DataFrame: """ Removes semi-duplicates from dataframe after finding sequencing repeats. @@ -201,7 +332,7 @@ def drop_reruns_from_df(ctx:Settings, df: DataFrame) -> DataFrame: Returns: DataFrame: dataframe with originals removed in favour of repeats. - """ + """ if 'rerun_regex' in ctx: sample_names = get_unique_values_in_df_column(df, column_name="name") rerun_regex = re.compile(fr"{ctx.rerun_regex}") @@ -210,15 +341,15 @@ def drop_reruns_from_df(ctx:Settings, df: DataFrame) -> DataFrame: first_run = re.sub(rerun_regex, "", sample) df = df.drop(df[df.name == first_run].index) return df - -def make_hitpicks(input:List[dict]) -> DataFrame: - """ - Converts list of dictionaries constructed by hitpicking to dataframe - Args: - input (List[dict]): list of hitpicked dictionaries - - Returns: - DataFrame: constructed dataframe. - """ - return DataFrame.from_records(input) +# def make_hitpicks(input:List[dict]) -> DataFrame: +# """ +# Converts list of dictionaries constructed by hitpicking to dataframe +# +# Args: +# input (List[dict]): list of hitpicked dictionaries +# +# Returns: +# DataFrame: constructed dataframe. +# """ +# return DataFrame.from_records(input) diff --git a/src/submissions/backend/excel/writer.py b/src/submissions/backend/excel/writer.py index 520acdc..8bbe4ec 100644 --- a/src/submissions/backend/excel/writer.py +++ b/src/submissions/backend/excel/writer.py @@ -1,14 +1,18 @@ import logging from copy import copy +from pathlib import Path # from pathlib import Path from pprint import pformat from typing import List + +from jinja2 import TemplateNotFound from openpyxl import load_workbook, Workbook from backend.db.models import SubmissionType, KitType, BasicSubmission from backend.validators.pydant import PydSubmission from io import BytesIO from collections import OrderedDict - +from tools import jinja_template_loading +from docxtpl import DocxTemplate logger = logging.getLogger(f"submissions.{__name__}") @@ -31,7 +35,8 @@ class SheetWriter(object): case 'submission_type': self.sub[k] = v['value'] self.submission_type = SubmissionType.query(name=v['value']) - self.sub_object = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) + self.sub_object = BasicSubmission.find_polymorphic_subclass( + polymorphic_identity=self.submission_type) case _: if isinstance(v, dict): self.sub[k] = v['value'] @@ -62,7 +67,7 @@ class SheetWriter(object): def write_info(self): """ Calls info writer - """ + """ disallowed = ['filepath', 'reagents', 'samples', 'equipment', 'controls'] info_dict = {k: v for k, v in self.sub.items() if k not in disallowed} writer = InfoWriter(xl=self.xl, submission_type=self.submission_type, info_dict=info_dict) @@ -71,7 +76,7 @@ class SheetWriter(object): def write_reagents(self): """ Calls reagent writer - """ + """ reagent_list = self.sub['reagents'] writer = ReagentWriter(xl=self.xl, submission_type=self.submission_type, extraction_kit=self.sub['extraction_kit'], reagent_list=reagent_list) @@ -80,7 +85,7 @@ class SheetWriter(object): def write_samples(self): """ Calls sample writer - """ + """ sample_list = self.sub['samples'] writer = SampleWriter(xl=self.xl, submission_type=self.submission_type, sample_list=sample_list) self.xl = writer.write_samples() @@ -88,7 +93,7 @@ class SheetWriter(object): def write_equipment(self): """ Calls equipment writer - """ + """ equipment_list = self.sub['equipment'] writer = EquipmentWriter(xl=self.xl, submission_type=self.submission_type, equipment_list=equipment_list) self.xl = writer.write_equipment() @@ -96,7 +101,7 @@ class SheetWriter(object): def write_tips(self): """ Calls tip writer - """ + """ tips_list = self.sub['tips'] writer = TipWriter(xl=self.xl, submission_type=self.submission_type, tips_list=tips_list) self.xl = writer.write_tips() @@ -106,7 +111,9 @@ class InfoWriter(object): """ object to write general submission info into excel file """ - def __init__(self, xl: Workbook, submission_type: SubmissionType | str, info_dict: dict, sub_object:BasicSubmission|None=None): + + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, info_dict: dict, + sub_object: BasicSubmission | None = None): logger.debug(f"Info_dict coming into InfoWriter: {pformat(info_dict)}") if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) @@ -129,7 +136,7 @@ class InfoWriter(object): Returns: dict: merged dictionary - """ + """ output = {} for k, v in info_dict.items(): if v is None: @@ -152,7 +159,7 @@ class InfoWriter(object): Returns: Workbook: workbook with info written. - """ + """ for k, v in self.info.items(): # NOTE: merge all comments to fit in single cell. if k == "comment" and isinstance(v['value'], list): @@ -174,6 +181,7 @@ class ReagentWriter(object): """ object to write reagent data into excel file """ + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, extraction_kit: KitType | str, reagent_list: list): self.xl = xl @@ -184,7 +192,7 @@ class ReagentWriter(object): reagent_map = kit_type.construct_xl_map_for_use(submission_type) self.reagents = self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map) - def reconcile_map(self, reagent_list:List[dict], reagent_map:dict) -> List[dict]: + def reconcile_map(self, reagent_list: List[dict], reagent_map: dict) -> List[dict]: """ Merge reagents with their locations @@ -194,7 +202,7 @@ class ReagentWriter(object): Returns: List[dict]: merged dictionary - """ + """ output = [] for reagent in reagent_list: try: @@ -219,14 +227,14 @@ class ReagentWriter(object): Returns: Workbook: Workbook with reagents written - """ + """ for reagent in self.reagents: sheet = self.xl[reagent['sheet']] for k, v in reagent.items(): if not isinstance(v, dict): continue # logger.debug( - # f"Writing {reagent['type']} {k} to {reagent['sheet']}, row: {v['row']}, column: {v['column']}") + # f"Writing {reagent['type']} {k} to {reagent['sheet']}, row: {v['row']}, column: {v['column']}") sheet.cell(row=v['row'], column=v['column'], value=v['value']) return self.xl @@ -235,6 +243,7 @@ class SampleWriter(object): """ object to write sample data into excel file """ + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, sample_list: list): if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) @@ -252,7 +261,7 @@ class SampleWriter(object): Returns: List[dict]: List of merged dictionaries - """ + """ output = [] multiples = ['row', 'column', 'assoc_id', 'submission_rank'] for sample in sample_list: @@ -272,7 +281,7 @@ class SampleWriter(object): Returns: Workbook: Workbook with samples written - """ + """ sheet = self.xl[self.sample_map['sheet']] columns = self.sample_map['sample_columns'] for sample in self.samples: @@ -290,6 +299,7 @@ class EquipmentWriter(object): """ object to write equipment data into excel file """ + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, equipment_list: list): if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) @@ -308,7 +318,7 @@ class EquipmentWriter(object): Returns: List[dict]: List of merged dictionaries - """ + """ output = [] if equipment_list is None: return output @@ -344,7 +354,7 @@ class EquipmentWriter(object): Returns: Workbook: Workbook with equipment written - """ + """ for equipment in self.equipment: try: sheet = self.xl[equipment['sheet']] @@ -371,6 +381,7 @@ class TipWriter(object): """ object to write tips data into excel file """ + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, tips_list: list): if isinstance(submission_type, str): submission_type = SubmissionType.query(name=submission_type) @@ -389,7 +400,7 @@ class TipWriter(object): Returns: List[dict]: List of merged dictionaries - """ + """ output = [] if tips_list is None: return output @@ -423,7 +434,7 @@ class TipWriter(object): Returns: Workbook: Workbook with tips written - """ + """ for tips in self.tips: try: sheet = self.xl[tips['sheet']] @@ -444,3 +455,19 @@ class TipWriter(object): logger.error(f"Couldn't write to {tips['sheet']}, row: {v['row']}, column: {v['column']}") logger.error(e) return self.xl + + +class DocxWriter(object): + + def __init__(self, base_dict: dict): + env = jinja_template_loading() + temp_name = f"{base_dict['submission_type'].replace(' ', '').lower()}_document.docx" + path = Path(env.loader.__getattribute__("searchpath")[0]).joinpath(temp_name) + template = DocxTemplate(path) + try: + template.render(base_dict) + except FileNotFoundError: + template = DocxTemplate( + Path(env.loader.__getattribute__("searchpath")[0]).joinpath("basicsubmission_document.docx")) + template.render({"sub": base_dict}) + template.save("test.docx") diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py index 1dd5927..556793d 100644 --- a/src/submissions/backend/validators/__init__.py +++ b/src/submissions/backend/validators/__init__.py @@ -141,7 +141,7 @@ class RSLNamer(object): """ if "submitted_date" in data.keys(): if isinstance(data['submitted_date'], dict): - if data['submitted_date']['value'] != None: + if data['submitted_date']['value'] is not None: today = data['submitted_date']['value'] else: today = datetime.now() diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 94e6306..2ef4951 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -33,7 +33,7 @@ class PydReagent(BaseModel): @field_validator('comment', mode='before') @classmethod def create_comment(cls, value): - if value == None: + if value is None: return "" return value @@ -49,7 +49,7 @@ class PydReagent(BaseModel): @field_validator("role") @classmethod def rescue_type_with_lookup(cls, value, values): - if value == None and values.data['lot'] != None: + if value is None and values.data['lot'] is not None: try: # return lookup_reagents(ctx=values.data['ctx'], lot_number=values.data['lot']).name return Reagent.query(lot_number=values.data['lot'].name) @@ -60,21 +60,21 @@ class PydReagent(BaseModel): @field_validator("lot", mode='before') @classmethod def rescue_lot_string(cls, value): - if value != None: + if value is not None: return convert_nans_to_nones(str(value)) return value @field_validator("lot") @classmethod def enforce_lot_string(cls, value): - if value != None: + if value is not None: return value.upper() return value @field_validator("expiry", mode="before") @classmethod def enforce_date(cls, value): - if value != None: + if value is not None: match value: case int(): return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2).date() @@ -86,7 +86,7 @@ class PydReagent(BaseModel): return value case _: return convert_nans_to_nones(str(value)) - if value == None: + if value is None: value = date.today() return value @@ -100,7 +100,7 @@ class PydReagent(BaseModel): @field_validator("name", mode="before") @classmethod def enforce_name(cls, value, values): - if value != None: + if value is not None: return convert_nans_to_nones(str(value)) else: return values.data['role'] @@ -131,7 +131,7 @@ class PydReagent(BaseModel): """ report = Report() # logger.debug("Adding extra fields.") - if self.model_extra != None: + if self.model_extra is not None: self.__dict__.update(self.model_extra) # logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}") reagent = Reagent.query(lot_number=self.lot, name=self.name) @@ -181,6 +181,7 @@ class PydReagent(BaseModel): return reagent, assoc, report + class PydSample(BaseModel, extra='allow'): submitter_id: str sample_type: str @@ -299,10 +300,11 @@ class PydEquipment(BaseModel, extra='ignore'): def make_empty_list(cls, value): # logger.debug(f"Pydantic value: {value}") value = convert_nans_to_nones(value) - if value == None: + if value is None: value = [''] if len(value) == 0: value = [''] + value = [item.strip() for item in value] return value def toSQL(self, submission: BasicSubmission | str = None) -> Tuple[Equipment, SubmissionEquipmentAssociation]: @@ -318,13 +320,13 @@ class PydEquipment(BaseModel, extra='ignore'): if isinstance(submission, str): submission = BasicSubmission.query(rsl_number=submission) equipment = Equipment.query(asset_number=self.asset_number) - if equipment == None: + if equipment is None: return - if submission != None: + if submission is not None: assoc = SubmissionEquipmentAssociation(submission=submission, equipment=equipment) process = Process.query(name=self.processes[0]) - if process == None: - # logger.debug("Adding in unknown process.") + if process is None: + logger.error(f"Found unknown process: {process}.") from frontend.widgets.pop_ups import QuestionAsker dlg = QuestionAsker(title="Add Process?", message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?") @@ -383,7 +385,7 @@ class PydSubmission(BaseModel, extra='allow'): @field_validator('comment', mode='before') @classmethod def create_comment(cls, value): - if value == None: + if value is None: return "" return value @@ -391,7 +393,7 @@ class PydSubmission(BaseModel, extra='allow'): @classmethod def enforce_with_uuid(cls, value): # logger.debug(f"submitter_plate_num coming into pydantic: {value}") - if value['value'] == None or value['value'] == "None": + if value['value'] in [None, "None"]: return dict(value=uuid.uuid4().hex.upper(), missing=True) else: return value @@ -401,7 +403,7 @@ class PydSubmission(BaseModel, extra='allow'): def rescue_date(cls, value): # logger.debug(f"\n\nDate coming into pydantic: {value}\n\n") try: - check = value['value'] == None + check = value['value'] is None except TypeError: check = True if check: @@ -468,7 +470,7 @@ class PydSubmission(BaseModel, extra='allow'): @field_validator("rsl_plate_num", mode='before') @classmethod def rescue_rsl_number(cls, value): - if value == None: + if value is None: return dict(value=None, missing=True) return value @@ -491,7 +493,7 @@ class PydSubmission(BaseModel, extra='allow'): @field_validator("technician", mode="before") @classmethod def rescue_tech(cls, value): - if value == None: + if value is None: return dict(value=None, missing=True) return value @@ -507,7 +509,7 @@ class PydSubmission(BaseModel, extra='allow'): @field_validator("sample_count", mode='before') @classmethod def rescue_sample_count(cls, value): - if value == None: + if value is None: return dict(value=None, missing=True) return value @@ -521,7 +523,7 @@ class PydSubmission(BaseModel, extra='allow'): return value else: raise ValueError(f"No extraction kit found.") - if value == None: + if value is None: return dict(value=None, missing=True) return value @@ -923,14 +925,14 @@ class PydReagentRole(BaseModel): ReagentRole: ReagentType instance """ instance: ReagentRole = ReagentRole.query(name=self.name) - if instance == None: + if instance is None: instance = ReagentRole(name=self.name, eol_ext=self.eol_ext) # logger.debug(f"This is the reagent type instance: {instance.__dict__}") try: assoc = KitTypeReagentRoleAssociation.query(reagent_role=instance, kit_type=kit) except StatementError: assoc = None - if assoc == None: + if assoc is None: assoc = KitTypeReagentRoleAssociation(kit_type=kit, reagent_role=instance, uses=self.uses, required=self.required) return instance @@ -949,7 +951,7 @@ class PydKit(BaseModel): """ report = Report() instance = KitType.query(name=self.name) - if instance == None: + if instance is None: instance = KitType(name=self.name) [item.toSQL(instance) for item in self.reagent_roles] return instance, report diff --git a/src/submissions/frontend/visualizations/control_charts.py b/src/submissions/frontend/visualizations/control_charts.py index 85b0e73..1597c9d 100644 --- a/src/submissions/frontend/visualizations/control_charts.py +++ b/src/submissions/frontend/visualizations/control_charts.py @@ -6,12 +6,13 @@ import plotly.express as px import pandas as pd from plotly.graph_objects import Figure import logging -from backend.excel import get_unique_values_in_df_column -from tools import Settings +# from backend.excel import get_unique_values_in_df_column +from tools import Settings, get_unique_values_in_df_column from frontend.widgets.functions import select_save_file logger = logging.getLogger(f"submissions.{__name__}") + def create_charts(ctx:Settings, df:pd.DataFrame, ytitle:str|None=None) -> Figure: """ Constructs figures based on parsed pandas dataframe. @@ -217,7 +218,7 @@ def construct_html(figure:Figure) -> str: str: html string """ html = '' - if figure != None: + if figure is not None: html += plotly.offline.plot(figure, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image') else: html += "

No data was retrieved for the given parameters.

" diff --git a/src/submissions/frontend/widgets/controls_chart.py b/src/submissions/frontend/widgets/controls_chart.py index babb7ff..24947bb 100644 --- a/src/submissions/frontend/widgets/controls_chart.py +++ b/src/submissions/frontend/widgets/controls_chart.py @@ -113,7 +113,7 @@ class ControlsViewer(QWidget): """ report = Report() # logger.debug(f"Control getter context: \n\tControl type: {self.con_type}\n\tMode: {self.mode}\n\tStart Date: {self.start_date}\n\tEnd Date: {self.end_date}") - # set the subtype for kraken + # NOTE: set the subtype for kraken if self.sub_typer.currentText() == "": self.subtype = None else: @@ -121,28 +121,28 @@ class ControlsViewer(QWidget): # logger.debug(f"Subtype: {self.subtype}") # query all controls using the type/start and end dates from the gui controls = Control.query(control_type=self.con_type, start_date=self.start_date, end_date=self.end_date) - # if no data found from query set fig to none for reporting in webview - if controls == None: + # NOTE: if no data found from query set fig to none for reporting in webview + if controls is None: fig = None else: - # change each control to list of dictionaries + # NOTE: change each control to list of dictionaries data = [control.convert_by_mode(mode=self.mode) for control in controls] - # flatten data to one dimensional list + # NOTE: flatten data to one dimensional list data = [item for sublist in data for item in sublist] # logger.debug(f"Control objects going into df conversion: {type(data)}") - if data == []: + if not data: self.report.add_result(Result(status="Critical", msg="No data found for controls in given date range.")) return - # send to dataframe creator + # NOTE send to dataframe creator df = convert_data_list_to_df(input=data, subtype=self.subtype) - if self.subtype == None: + if self.subtype is None: title = self.mode else: title = f"{self.mode} - {self.subtype}" # send dataframe to chart maker fig = create_charts(ctx=self.app.ctx, df=df, ytitle=title) # logger.debug(f"Updating figure...") - # construct html for webview + # NOTE: construct html for webview html = construct_html(figure=fig) # logger.debug(f"The length of html code is: {len(html)}") self.webengineview.setHtml(html) diff --git a/src/submissions/frontend/widgets/misc.py b/src/submissions/frontend/widgets/misc.py index 57382a8..344a69b 100644 --- a/src/submissions/frontend/widgets/misc.py +++ b/src/submissions/frontend/widgets/misc.py @@ -25,7 +25,7 @@ class AddReagentForm(QDialog): """ def __init__(self, reagent_lot:str|None=None, reagent_role: str | None=None, expiry: date | None=None, reagent_name: str | None=None) -> None: super().__init__() - if reagent_lot == None: + if reagent_lot is None: reagent_lot = reagent_role self.setWindowTitle("Add Reagent") @@ -47,7 +47,7 @@ class AddReagentForm(QDialog): self.exp_input = QDateEdit(calendarPopup=True) self.exp_input.setObjectName('expiry') # if expiry is not passed in from gui, use today - if expiry == None: + if expiry is None: self.exp_input.setDate(QDate.currentDate()) else: try: @@ -144,7 +144,7 @@ class FirstStrandSalvage(QDialog): def __init__(self, ctx:Settings, submitter_id:str, rsl_plate_num:str|None=None) -> None: super().__init__() - if rsl_plate_num == None: + if rsl_plate_num is None: rsl_plate_num = "" self.setWindowTitle("Add Reagent") diff --git a/src/submissions/frontend/widgets/submission_details.py b/src/submissions/frontend/widgets/submission_details.py index adb90fe..b313810 100644 --- a/src/submissions/frontend/widgets/submission_details.py +++ b/src/submissions/frontend/widgets/submission_details.py @@ -92,6 +92,7 @@ class SubmissionDetails(QDialog): # logger.debug(f"Making platemap...") self.base_dict['platemap'] = BasicSubmission.make_plate_map(sample_list=submission.hitpick_plate()) self.base_dict, self.template = submission.get_details_template(base_dict=self.base_dict) + logger.debug(f"Submission_details: {pformat(self.base_dict)}") self.html = self.template.render(sub=self.base_dict, signing_permission=is_power_user()) self.webview.setHtml(self.html) # with open("test.html", "w") as f: @@ -110,8 +111,9 @@ class SubmissionDetails(QDialog): def export(self): """ Renders submission to html, then creates and saves .pdf file to user selected file. - """ - fname = select_save_file(obj=self, default_name=self.base_dict['Plate Number'], extension="pdf") + """ + logger.debug(f"Base dict: {pformat(self.base_dict)}") + fname = select_save_file(obj=self, default_name=self.base_dict['plate_number'], extension="docx") image_io = BytesIO() temp_dir = Path(TemporaryDirectory().name) hti = Html2Image(output_path=temp_dir, size=(2400, 1500)) diff --git a/src/submissions/frontend/widgets/submission_table.py b/src/submissions/frontend/widgets/submission_table.py index 0a8d55f..dcba392 100644 --- a/src/submissions/frontend/widgets/submission_table.py +++ b/src/submissions/frontend/widgets/submission_table.py @@ -7,7 +7,7 @@ from PyQt6.QtWidgets import QTableView, QMenu from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel from PyQt6.QtGui import QAction, QCursor from backend.db.models import BasicSubmission -from backend.excel import make_report_html, make_report_xlsx +from backend.excel import make_report_html, make_report_xlsx, ReportMaker from tools import Report, Result, row_map, get_first_blank_df_row, html_to_pdf from .functions import select_save_file, select_open_file from .misc import ReportDatePicker @@ -168,7 +168,7 @@ class SubmissionsSheet(QTableView): # NOTE: Lookup imported submissions sub = BasicSubmission.query(rsl_plate_num=new_run['rsl_plate_num']) # NOTE: If no such submission exists, move onto the next run - if sub == None: + if sub is None: continue try: # logger.debug(f"Found submission: {sub.rsl_plate_num}") @@ -215,7 +215,7 @@ class SubmissionsSheet(QTableView): # NOTE: lookup imported submission sub = BasicSubmission.query(rsl_number=new_run['rsl_plate_num']) # NOTE: if imported submission doesn't exist move on to next run - if sub == None: + if sub is None: continue # try: # logger.debug(f"Found submission: {sub.rsl_plate_num}") @@ -255,12 +255,12 @@ class SubmissionsSheet(QTableView): subs = BasicSubmission.query(start_date=info['start_date'], end_date=info['end_date']) # NOTE: convert each object to dict records = [item.to_dict(report=True) for item in subs] - logger.debug(f"Records: {pformat(records)}") + # logger.debug(f"Records: {pformat(records)}") # NOTE: make dataframe from record dictionaries detailed_df, summary_df = make_report_xlsx(records=records) html = make_report_html(df=summary_df, start_date=info['start_date'], end_date=info['end_date']) # NOTE: get save location of report - fname = select_save_file(obj=self, default_name=f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf", extension="pdf") + fname = select_save_file(obj=self, default_name=f"Submissions_Report_{info['start_date']}-{info['end_date']}.docx", extension="docx") html_to_pdf(html=html, output_file=fname) writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl') summary_df.to_excel(writer, sheet_name="Report") @@ -287,4 +287,6 @@ class SubmissionsSheet(QTableView): if cell.row > 1: cell.style = 'Currency' writer.close() + # rp = ReportMaker(start_date=info['start_date'], end_date=info['end_date']) + # rp.write_report(filename=fname, obj=self) self.report.add_result(report) diff --git a/src/submissions/frontend/widgets/submission_widget.py b/src/submissions/frontend/widgets/submission_widget.py index 6d18446..9055571 100644 --- a/src/submissions/frontend/widgets/submission_widget.py +++ b/src/submissions/frontend/widgets/submission_widget.py @@ -89,7 +89,7 @@ class SubmissionFormContainer(QWidget): self.samples = [] self.missing_info = [] # NOTE: set file dialog - if isinstance(fname, bool) or fname == None: + if isinstance(fname, bool) or fname is None: fname = select_open_file(self, file_extension="xlsx") # logger.debug(f"Attempting to parse file: {fname}") if not fname.exists(): @@ -172,7 +172,7 @@ class SubmissionFormWidget(QWidget): logger.error(f"Couldn't get attribute from pyd: {k}") value = dict(value=None, missing=True) add_widget = self.create_widget(key=k, value=value, submission_type=self.pyd.submission_type['value'], sub_obj=st) - if add_widget != None: + if add_widget is not None: self.layout.addWidget(add_widget) if k == "extraction_kit": add_widget.input.currentTextChanged.connect(self.scrape_reagents) @@ -347,7 +347,7 @@ class SubmissionFormWidget(QWidget): Args: fname (Path | None, optional): Input filename. Defaults to None. """ - if isinstance(fname, bool) or fname == None: + if isinstance(fname, bool) or fname is None: fname = select_save_file(obj=self, default_name=self.pyd.construct_filename(), extension="csv") try: @@ -546,7 +546,7 @@ class SubmissionFormWidget(QWidget): check = not value['missing'] except: check = True - if label_name != None: + if label_name is not None: self.setObjectName(label_name) else: self.setObjectName(f"{key}_label") @@ -605,7 +605,7 @@ class SubmissionFormWidget(QWidget): # logger.debug(f"Using this lot for the reagent {self.reagent}: {lot}") wanted_reagent = Reagent.query(lot_number=lot, reagent_role=self.reagent.role) # NOTE: if reagent doesn't exist in database, offer to add it (uses App.add_reagent) - if wanted_reagent == None: + if wanted_reagent is None: dlg = QuestionAsker(title=f"Add {lot}?", message=f"Couldn't find reagent type {self.reagent.role}: {lot} in the database.\n\nWould you like to add it?") if dlg.exec(): @@ -621,7 +621,7 @@ class SubmissionFormWidget(QWidget): # NOTE: Since this now gets passed in directly from the parser -> pyd -> form and the parser gets the name # from the db, it should no longer be necessary to query the db with reagent/kit, but with rt name directly. rt = ReagentRole.query(name=self.reagent.role) - if rt == None: + if rt is None: rt = ReagentRole.query(kit_type=self.extraction_kit, reagent=wanted_reagent) return PydReagent(name=wanted_reagent.name, lot=wanted_reagent.lot, role=rt.name, expiry=wanted_reagent.expiry, missing=False), None @@ -689,7 +689,7 @@ class SubmissionFormWidget(QWidget): if isinstance(looked_up_reg, list): looked_up_reg = None # logger.debug(f"Because there was no reagent listed for {reagent.lot}, we will insert the last lot used: {looked_up_reg}") - if looked_up_reg != None: + if looked_up_reg is not None: try: relevant_reagents.remove(str(looked_up_reg.lot)) relevant_reagents.insert(0, str(looked_up_reg.lot)) diff --git a/src/submissions/templates/bacterialculture_details.html b/src/submissions/templates/bacterialculture_details.html new file mode 100644 index 0000000..8d9c4b2 --- /dev/null +++ b/src/submissions/templates/bacterialculture_details.html @@ -0,0 +1,25 @@ +{% extends "basicsubmission_details.html" %} + + + {% block head %} + {{ super() }} + {% endblock %} + + + + {% block body %} + {{ super() }} + {% if sub['controls'] %} +

Attached Controls:

+ {% for item in sub['controls'] %} +

   {{ item['name'] }}: {{ item['type'] }} (Targets: {{ item['targets'] }})

+ {% if item['kraken'] %} +

   {{ item['name'] }} Top 5 Kraken Results:

+

{% for genera in item['kraken'] %} +         {{ genera['name'] }}: {{ genera['kraken_count'] }} ({{ genera['kraken_percent'] }})
+ {% endfor %}

+ {% endif %} + {% endfor %} + {% endif %} + {% endblock %} + \ No newline at end of file diff --git a/src/submissions/templates/basicsubmission_details.html b/src/submissions/templates/basicsubmission_details.html index 5e55af3..23c31e7 100644 --- a/src/submissions/templates/basicsubmission_details.html +++ b/src/submissions/templates/basicsubmission_details.html @@ -40,10 +40,9 @@ {% block body %} - -

Submission Details for {{ sub['Plate Number'] }}

   {% if sub['barcode'] %}{% endif %} +

Submission Details for {{ sub['plate_number'] }}

   {% if sub['barcode'] %}{% endif %}

{% for key, value in sub.items() if key not in sub['excluded'] %} -     {{ key | replace("_", " ") | title | replace("Pcr", "PCR") }}: {% if key=='Cost' %}{% if sub['cost'] %} {{ "${:,.2f}".format(value) }}{% endif %}{% else %}{{ value }}{% endif %}
+     {{ key | replace("_", " ") | title | replace("Pcr", "PCR") }}: {% if key=='cost' %}{% if sub['cost'] %} {{ "${:,.2f}".format(value) }}{% endif %}{% else %}{{ value }}{% endif %}
{% endfor %}

Reagents:

{% for item in sub['reagents'] %} @@ -67,18 +66,7 @@     {{ item['well'] }}: {% if item['organism'] %} {{ item['name'] }} - ({{ item['organism']|replace('\n\t', '
        ') }}){% else %} {{ item['name']|replace('\n\t', '
        ') }}{% endif %}
{% endfor %}

{% endif %} - {% if sub['controls'] %} -

Attached Controls:

- {% for item in sub['controls'] %} -

   {{ item['name'] }}: {{ item['type'] }} (Targets: {{ item['targets'] }})

- {% if item['kraken'] %} -

   {{ item['name'] }} Top 5 Kraken Results:

-

{% for genera in item['kraken'] %} -         {{ genera['name'] }}: {{ genera['kraken_count'] }} ({{ genera['kraken_percent'] }})
- {% endfor %}

- {% endif %} - {% endfor %} - {% endif %} + {% if sub['ext_info'] %} {% for entry in sub['ext_info'] %}

Extraction Status:

@@ -91,22 +79,7 @@ {% endfor %}

{% endfor %} {% endif %} - {% if sub['pcr_info'] %} - {% for entry in sub['pcr_info'] %} - {% if 'comment' not in entry.keys() %} -

qPCR Momentum Status:

- {% else %} -

qPCR Status:

- {% endif %} -

{% for key, value in entry.items() if key != 'imported_by'%} - {% if "column" in key %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
- {% else %} -     {{ key|replace('_', ' ')|title() }}: {{ value }}
- {% endif %} - {% endfor %}

- {% endfor %} - {% endif %} + {% if sub['comment'] %}

Comments:

{% for entry in sub['comment'] %} diff --git a/src/submissions/templates/wastewater_details.html b/src/submissions/templates/wastewater_details.html index 35e1f2e..e292c7d 100644 --- a/src/submissions/templates/wastewater_details.html +++ b/src/submissions/templates/wastewater_details.html @@ -9,6 +9,22 @@ {% block body %} {{ super() }} + {% if sub['pcr_info'] %} + {% for entry in sub['pcr_info'] %} + {% if 'comment' not in entry.keys() %} +

qPCR Momentum Status:

+ {% else %} +

qPCR Status:

+ {% endif %} +

{% for key, value in entry.items() if key != 'imported_by'%} + {% if "column" in key %} +     {{ key|replace('_', ' ')|title() }}: {{ value }}uL
+ {% else %} +     {{ key|replace('_', ' ')|title() }}: {{ value }}
+ {% endif %} + {% endfor %}

+ {% endfor %} + {% endif %} {% if sub['origin_plate'] %}

24 Well Plate:

diff --git a/src/submissions/tools.py b/src/submissions/tools.py index 0d023ac..aec3c4c 100644 --- a/src/submissions/tools.py +++ b/src/submissions/tools.py @@ -7,6 +7,7 @@ import json import numpy as np import logging, re, yaml, sys, os, stat, platform, getpass, inspect, csv import pandas as pd +from PyQt6.QtWidgets import QWidget from jinja2 import Environment, FileSystemLoader from logging import handlers from pathlib import Path @@ -18,7 +19,6 @@ from typing import Any, Tuple, Literal, List from PyQt6.QtGui import QPageSize from PyQt6.QtWebEngineWidgets import QWebEngineView from openpyxl.worksheet.worksheet import Worksheet - # from PyQt6 import QtPrintSupport, QtCore, QtWebEngineWidgets from PyQt6.QtPrintSupport import QPrinter @@ -50,6 +50,51 @@ main_form_style = ''' ''' +def get_unique_values_in_df_column(df: pd.DataFrame, column_name: str) -> list: + """ + get all unique values in a dataframe column by name + + Args: + df (DataFrame): input dataframe + column_name (str): name of column of interest + + Returns: + list: sorted list of unique values + """ + return sorted(df[column_name].unique()) + + +def check_key_or_attr(key: str, interest: dict | object, check_none: bool = False) -> bool: + match interest: + case dict(): + if key in interest.keys(): + if check_none: + match interest[key]: + case dict(): + if interest[key]['value'] is None: + return False + else: + return True + case _: + if interest[key] is None: + return False + else: + return True + else: + return True + return False + case object(): + if hasattr(interest, key): + if check_none: + if interest.__getattribute__(key) is None: + return False + else: + return True + else: + return True + return False + + def check_not_nan(cell_contents) -> bool: """ Check to ensure excel sheet cell contents are not blank. @@ -78,7 +123,7 @@ def check_not_nan(cell_contents) -> bool: cell_contents = np.nan if cell_contents == 'nan': cell_contents = np.nan - if cell_contents == None: + if cell_contents is None: cell_contents = np.nan if str(cell_contents).lower() == "none": cell_contents = np.nan @@ -212,11 +257,11 @@ class Settings(BaseSettings, extra="allow"): @field_validator('database_session', mode="before") @classmethod def create_database_session(cls, value, values): - if value != None: + if value is not None: return value else: database_path = values.data['database_path'] - if database_path == None: + if database_path is None: # check in user's .submissions directory for submissions.db if Path.home().joinpath(".submissions", "submissions.db").exists(): database_path = Path.home().joinpath(".submissions", "submissions.db") @@ -244,7 +289,7 @@ class Settings(BaseSettings, extra="allow"): @classmethod def import_package(cls, value): import __init__ as package - if value == None: + if value is None: return package def __init__(self, *args, **kwargs): @@ -299,7 +344,7 @@ def get_config(settings_path: Path | str | None = None) -> Settings: except FileExistsError: logger.warning(f"Logging directory {LOGDIR} already exists.") # NOTE: if user hasn't defined config path in cli args - if settings_path == None: + if settings_path is None: # NOTE: Check user .config/submissions directory if CONFIGDIR.joinpath("config.yml").exists(): settings_path = CONFIGDIR.joinpath("config.yml") @@ -602,6 +647,7 @@ class Report(BaseModel): def is_empty(self): return bool(self.results) + def rreplace(s, old, new): return (s[::-1].replace(old[::-1], new[::-1], 1))[::-1] @@ -609,6 +655,7 @@ def rreplace(s, old, new): def html_to_pdf(html, output_file: Path | str): if isinstance(output_file, str): output_file = Path(output_file) + logger.debug(f"Printing PDF to {output_file}") document = QWebEngineView() document.setHtml(html) printer = QPrinter(QPrinter.PrinterMode.HighResolution) @@ -616,9 +663,23 @@ def html_to_pdf(html, output_file: Path | str): printer.setOutputFileName(output_file.absolute().__str__()) printer.setPageSize(QPageSize(QPageSize.PageSizeId.A4)) document.print(printer) + # HTML(string=html).write_pdf(output_file) + # new_parser = HtmlToDocx() + # docx = new_parser.parse_html_string(html) + # docx.save(output_file) -def remove_key_from_list_of_dicts(input: list, key: str): +def remove_key_from_list_of_dicts(input: list, key: str) -> list: + """ + Removes a key from all dictionaries in a list + + Args: + input (list): Input list of dicts + key (str): Name of key to remove. + + Returns: + list: List of updated dictionaries + """ for item in input: del item[key] return input @@ -649,6 +710,7 @@ def check_authorization(func): Args: func (_type_): Function to be used. """ + def wrapper(*args, **kwargs): logger.debug(f"Checking authorization") if is_power_user(): @@ -656,4 +718,5 @@ def check_authorization(func): else: logger.error(f"User {getpass.getuser()} is not authorized for this function.") return dict(code=1, message="This user does not have permission for this function.", status="warning") + return wrapper