diff --git a/TODO.md b/TODO.md index 79e35b7..ae15ff2 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,7 @@ +- [ ] Convert Parsers to using openpyxl. + - The hardest part of this is going to be the sample parsing. I'm onto using the cell formulas in the plate map to suss out the location in the lookup table, but it could get a little recursive up in here. - [ ] Create a default info return function. -- [ ] Parse comment from excel sheet. +- [x] Parse comment from excel sheet. - [ ] Make reporting better. - [x] Build master query method? - Obviously there will need to be extensions, but I feel the attr method I have in Submissions could work. diff --git a/alembic.ini b/alembic.ini index dceb31e..d9d6866 100644 --- a/alembic.ini +++ b/alembic.ini @@ -55,9 +55,10 @@ version_path_separator = os # Use os.pathsep. Default configuration used for ne # are written from script.py.mako # output_encoding = utf-8 -sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db -; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-demo.db -; sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db +;sqlalchemy.url = sqlite:///L:\Robotics Laboratory Support\Submissions\submissions.db +sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-demo.db +;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\Archives\Submissions_app_backups\DB_backups\submissions-prototypes.db +;sqlalchemy.url = sqlite:///C:\Users\lwark\Documents\python\submissions\mytests\test_assets\submissions-test.db [post_write_hooks] diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index 00838c4..f4f9471 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -2,13 +2,16 @@ All kit and reagent related models ''' from __future__ import annotations + +from copy import copy + from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.ext.associationproxy import association_proxy from datetime import date import logging, re from tools import check_authorization, setup_lookup, Report, Result -from typing import List +from typing import List, Literal from pandas import ExcelFile from pathlib import Path from . import Base, BaseClass, Organization @@ -129,7 +132,8 @@ class KitType(BaseClass): return [item.reagent_type for item in relevant_associations if item.required == 1] else: return [item.reagent_type for item in relevant_associations] - + + # TODO: Move to BasicSubmission? def construct_xl_map_for_use(self, submission_type:str|SubmissionType) -> dict: """ Creates map of locations in excel workbook for a SubmissionType @@ -159,11 +163,12 @@ class KitType(BaseClass): map[assoc.reagent_type.name] = assoc.uses except TypeError: continue - # Get SubmissionType info map - try: - map['info'] = st_assoc.info_map - except IndexError as e: - map['info'] = {} + # # Get SubmissionType info map + # try: + # # map['info'] = st_assoc.info_map + # map['info'] = st_assoc.construct_info_map(mode="write") + # except IndexError as e: + # map['info'] = {} return map @classmethod @@ -551,7 +556,8 @@ class SubmissionType(BaseClass): instances = relationship("BasicSubmission", backref="submission_type") #: Concrete instances of this type. template_file = Column(BLOB) #: Blank form for this type stored as binary. processes = relationship("Process", back_populates="submission_types", secondary=submissiontypes_processes) #: Relation to equipment processes used for this type. - + sample_map = Column(JSON) #: Where sample information is found in the excel sheet corresponding to this type. + submissiontype_kit_associations = relationship( "SubmissionTypeKitTypeAssociation", back_populates="submission_type", @@ -612,24 +618,40 @@ class SubmissionType(BaseClass): self.template_file = data self.save() - def construct_equipment_map(self) -> List[dict]: + def construct_info_map(self, mode:Literal['read', 'write']) -> dict: + info = self.info_map + logger.debug(f"Info map: {info}") + output = {} + # for k,v in info.items(): + # info[k]['write'] += info[k]['read'] + match mode: + case "read": + output = {k:v[mode] for k,v in info.items() if v[mode]} + case "write": + output = {k:v[mode] + v['read'] for k,v in info.items() if v[mode] or v['read']} + return output + + def construct_sample_map(self): + return self.sample_map + + def construct_equipment_map(self) -> dict: """ Constructs map of equipment to excel cells. Returns: List[dict]: List of equipment locations in excel sheet """ - output = [] + output = {} # logger.debug("Iterating through equipment roles") for item in self.submissiontype_equipmentrole_associations: map = item.uses - if map == None: + if map is None: map = {} - try: - map['role'] = item.equipment_role.name - except TypeError: - pass - output.append(map) + # try: + output[item.equipment_role.name] = map + # except TypeError: + # pass + # output.append(map) return output def get_equipment(self, extraction_kit:str|KitType|None=None) -> List['PydEquipmentRole']: diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index b799a08..dcf7f66 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -112,6 +112,14 @@ class BasicSubmission(BaseClass): output += BasicSubmission.jsons() return output + @classmethod + def timestamps(cls) -> List[str]: + output = [item.name for item in cls.__table__.columns if isinstance(item.type, TIMESTAMP)] + if issubclass(cls, BasicSubmission) and not cls.__name__ == "BasicSubmission": + output += BasicSubmission.timestamps() + return output + + # TODO: Beef up this to include info_map from DB @classmethod def get_default_info(cls, *args): # Create defaults for all submission_types @@ -121,16 +129,18 @@ class BasicSubmission(BaseClass): details_ignore=['excluded', 'reagents', 'samples', 'extraction_info', 'comment', 'barcode', 'platemap', 'export_map', 'equipment'], - form_recover=recover, + # NOTE: Fields not placed in ui form form_ignore=['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by', 'comment'] + recover, - parser_ignore=['samples', 'signed_by'] + cls.jsons(), - excel_ignore=[], + # NOTE: Fields not placed in ui form to be moved to pydantic + form_recover=recover, + # parser_ignore=['samples', 'signed_by'] + [item for item in cls.jsons() if item != "comment"], + # excel_ignore=[], ) # logger.debug(dicto['singles']) - """Singles tells the query which fields to set limit to 1""" + # NOTE: Singles tells the query which fields to set limit to 1 dicto['singles'] = parent_defs['singles'] # logger.debug(dicto['singles']) - """Grab subtype specific info.""" + # NOTE: Grab subtype specific info. output = {} for k, v in dicto.items(): if len(args) > 0 and k not in args: @@ -163,6 +173,14 @@ class BasicSubmission(BaseClass): name = cls.__mapper_args__['polymorphic_identity'] return SubmissionType.query(name=name) + @classmethod + def construct_info_map(cls, mode:Literal['read', 'write']): + return cls.get_submission_type().construct_info_map(mode=mode) + + @classmethod + def construct_sample_map(cls): + return cls.get_submission_type().construct_sample_map() + def to_dict(self, full_data: bool = False, backup: bool = False, report: bool = False) -> dict: """ Constructs dictionary used in submissions summary @@ -492,7 +510,6 @@ class BasicSubmission(BaseClass): missing = value is None or value in ['', 'None'] match key: case "reagents": - new_dict[key] = [PydReagent(**reagent) for reagent in value] case "samples": new_dict[key] = [PydSample(**{k.lower().replace(" ", "_"): v for k, v in sample.items()}) for sample @@ -506,6 +523,8 @@ class BasicSubmission(BaseClass): new_dict['rsl_plate_num'] = dict(value=value, missing=missing) case "Submitter Plate Number": new_dict['submitter_plate_num'] = dict(value=value, missing=missing) + case "id": + pass case _: logger.debug(f"Setting dict {key} to {value}") new_dict[key.lower().replace(" ", "_")] = dict(value=value, missing=missing) @@ -601,7 +620,7 @@ class BasicSubmission(BaseClass): return plate_map @classmethod - def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict: + def parse_info(cls, input_dict: dict, xl: Workbook | None = None) -> dict: """ Update submission dictionary with type specific information @@ -630,8 +649,7 @@ class BasicSubmission(BaseClass): return input_dict @classmethod - def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, - plate_map: dict | None = None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict: """ Performs any final custom parsing of the excel file. @@ -999,7 +1017,7 @@ class BasicSubmission(BaseClass): fname = self.__backup_path__.joinpath(f"{self.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')})") msg = QuestionAsker(title="Delete?", message=f"Are you sure you want to delete {self.rsl_plate_num}?\n") if msg.exec(): - self.backup(fname=fname, full_backup=True) + # self.backup(fname=fname, full_backup=True) self.__database_session__.delete(self) try: self.__database_session__.commit() @@ -1083,6 +1101,7 @@ class BasicSubmission(BaseClass): if fname.name == "": logger.debug(f"export cancelled.") return + # pyd.filepath = fname if full_backup: backup = self.to_dict(full_data=True) try: @@ -1090,11 +1109,12 @@ class BasicSubmission(BaseClass): yaml.dump(backup, f) except KeyError as e: logger.error(f"Problem saving yml backup file: {e}") - wb = pyd.autofill_excel() - wb = pyd.autofill_samples(wb) - wb = pyd.autofill_equipment(wb) - wb.save(filename=fname.with_suffix(".xlsx")) - + # wb = pyd.autofill_excel() + # wb = pyd.autofill_samples(wb) + # wb = pyd.autofill_equipment(wb) + writer = pyd.toWriter() + # wb.save(filename=fname.with_suffix(".xlsx")) + writer.xl.save(filename=fname.with_suffix(".xlsx")) # Below are the custom submission types @@ -1186,8 +1206,7 @@ class BacterialCulture(BasicSubmission): return template @classmethod - def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, - plate_map: dict | None = None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict: """ Extends parent. Currently finds control sample and adds to reagents. @@ -1201,23 +1220,23 @@ class BacterialCulture(BasicSubmission): dict: _description_ """ from . import ControlType - input_dict = super().finalize_parse(input_dict, xl, info_map, plate_map) + input_dict = super().finalize_parse(input_dict, xl, info_map) # build regex for all control types that have targets regex = ControlType.build_positive_regex() # search samples for match for sample in input_dict['samples']: - matched = regex.match(sample.submitter_id) + matched = regex.match(sample['submitter_id']) if bool(matched): - logger.debug(f"Control match found: {sample.submitter_id}") + logger.debug(f"Control match found: {sample['submitter_id']}") new_lot = matched.group() try: pos_control_reg = \ - [reg for reg in input_dict['reagents'] if reg.type == "Bacterial-Positive Control"][0] + [reg for reg in input_dict['reagents'] if reg['type'] == "Bacterial-Positive Control"][0] except IndexError: logger.error(f"No positive control reagent listed") return input_dict - pos_control_reg.lot = new_lot - pos_control_reg.missing = False + pos_control_reg['lot'] = new_lot + pos_control_reg['missing'] = False return input_dict @classmethod @@ -1278,7 +1297,7 @@ class Wastewater(BasicSubmission): return output @classmethod - def parse_info(cls, input_dict: dict, xl: pd.ExcelFile | None = None) -> dict: + def parse_info(cls, input_dict: dict, xl: Workbook | None = None) -> dict: """ Update submission dictionary with type specific information. Extends parent @@ -1290,7 +1309,7 @@ class Wastewater(BasicSubmission): """ input_dict = super().parse_info(input_dict) if xl != None: - input_dict['csv'] = xl.parse("Copy to import file") + input_dict['csv'] = xl["Copy to import file"] return input_dict @classmethod @@ -1567,8 +1586,7 @@ class WastewaterArtic(BasicSubmission): return "(?P(\\d{4}-\\d{2}-\\d{2}(?:-|_)(?:\\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)\\d?(\\D|$)R?\\d?)?))" @classmethod - def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None, - plate_map: dict | None = None) -> dict: + def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict: """ Performs any final custom parsing of the excel file. Extends parent @@ -1581,7 +1599,7 @@ class WastewaterArtic(BasicSubmission): Returns: dict: Updated parser product. """ - input_dict = super().finalize_parse(input_dict, xl, info_map, plate_map) + input_dict = super().finalize_parse(input_dict, xl, info_map) input_dict['csv'] = xl.parse("hitpicks_csv_to_export") return input_dict @@ -1799,6 +1817,13 @@ class BasicSample(BaseClass): except AttributeError: return f" List[str]: + output = [item.name for item in cls.__table__.columns if isinstance(item.type, TIMESTAMP)] + if issubclass(cls, BasicSample) and not cls.__name__ == "BasicSample": + output += BasicSample.timestamps() + return output + def to_sub_dict(self, full_data: bool = False) -> dict: """ gui friendly dictionary, extends parent method. @@ -1878,6 +1903,7 @@ class BasicSample(BaseClass): Returns: dict: Updated parser results. """ + logger.debug(f"Hello from {cls.__name__} sample parser!") return input_dict @classmethod @@ -2053,24 +2079,30 @@ class WastewaterSample(BasicSample): dict: Updated parser results. """ output_dict = super().parse_sample(input_dict) - if output_dict['rsl_number'] == None: - output_dict['rsl_number'] = output_dict['submitter_id'] - if output_dict['ww_full_sample_id'] != None: + logger.debug(f"Initial sample dict: {pformat(output_dict)}") + try: + check = output_dict['rsl_number'] in [None, "None"] + except KeyError: + check = True + if check: + output_dict['rsl_number'] = "RSL-WW-" + output_dict['ww_processing_number'] + if output_dict['ww_full_sample_id'] is not None: output_dict["submitter_id"] = output_dict['ww_full_sample_id'] # Ad hoc repair method for WW (or possibly upstream) not formatting some dates properly. - match output_dict['collection_date']: - case str(): - try: - output_dict['collection_date'] = parse(output_dict['collection_date']).date() - except ParserError: - logger.error(f"Problem parsing collection_date: {output_dict['collection_date']}") - output_dict['collection_date'] = date(1970, 1, 1) - case datetime(): - output_dict['collection_date'] = output_dict['collection_date'].date() - case date(): - pass - case _: - del output_dict['collection_date'] + # NOTE: Should be handled by validator. + # match output_dict['collection_date']: + # case str(): + # try: + # output_dict['collection_date'] = parse(output_dict['collection_date']).date() + # except ParserError: + # logger.error(f"Problem parsing collection_date: {output_dict['collection_date']}") + # output_dict['collection_date'] = date(1970, 1, 1) + # case datetime(): + # output_dict['collection_date'] = output_dict['collection_date'].date() + # case date(): + # pass + # case _: + # del output_dict['collection_date'] return output_dict def get_previous_ww_submission(self, current_artic_submission: WastewaterArtic): @@ -2134,6 +2166,7 @@ class SubmissionSampleAssociation(BaseClass): submission_id = Column(INTEGER, ForeignKey("_basicsubmission.id"), primary_key=True) #: id of associated submission row = Column(INTEGER, primary_key=True) #: row on the 96 well plate column = Column(INTEGER, primary_key=True) #: column on the 96 well plate + submission_rank = Column(INTEGER, nullable=False, default=1) #: Location in sample list # reference to the Submission object submission = relationship(BasicSubmission, @@ -2193,6 +2226,7 @@ class SubmissionSampleAssociation(BaseClass): sample['Plate Name'] = self.submission.rsl_plate_num sample['positive'] = False sample['submitted_date'] = self.submission.submitted_date + sample['submission_rank'] = self.submission_rank return sample def to_hitpick(self) -> dict | None: diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index e043c83..a215e88 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -1,10 +1,14 @@ ''' contains parser object for pulling values from client generated submission sheets. ''' +import sys +from copy import copy from getpass import getuser from pprint import pformat from typing import List import pandas as pd +from openpyxl import load_workbook, Workbook +from openpyxl.worksheet.protection import SheetProtection import numpy as np from pathlib import Path from backend.db.models import * @@ -13,21 +17,23 @@ import logging, re from collections import OrderedDict from datetime import date from dateutil.parser import parse, ParserError -from tools import check_not_nan, convert_nans_to_nones, row_map, is_missing +from tools import check_not_nan, convert_nans_to_nones, row_map, row_keys, is_missing, remove_key_from_list_of_dicts logger = logging.getLogger(f"submissions.{__name__}") -row_keys = {v:k for k,v in row_map.items()} + +# row_keys = {v:k for k,v in row_map.items()} class SheetParser(object): """ object to pull and contain data from excel file """ - def __init__(self, filepath:Path|None = None): + + def __init__(self, filepath: Path | None = None): """ Args: filepath (Path | None, optional): file path to excel sheet. Defaults to None. - """ + """ logger.debug(f"\n\nParsing {filepath.__str__()}\n\n") match filepath: case Path(): @@ -38,58 +44,61 @@ class SheetParser(object): logger.error(f"No filepath given.") raise ValueError("No filepath given.") try: - self.xl = pd.ExcelFile(filepath) + # self.xl = pd.ExcelFile(filepath) + self.xl = load_workbook(filepath, read_only=True, data_only=True) except ValueError as e: logger.error(f"Incorrect value: {e}") raise FileNotFoundError(f"Couldn't parse file {self.filepath}") self.sub = OrderedDict() # make decision about type of sample we have - self.sub['submission_type'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath), missing=True) - # # grab the info map from the submission type in database + self.sub['submission_type'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath), + missing=True) + # grab the info map from the submission type in database self.parse_info() self.import_kit_validation_check() self.parse_reagents() - self.import_reagent_validation_check() + # self.import_reagent_validation_check() self.parse_samples() self.parse_equipment() self.finalize_parse() logger.debug(f"Parser.sub after info scrape: {pformat(self.sub)}") - + def parse_info(self): """ Pulls basic information from the excel sheet - """ + """ parser = InfoParser(xl=self.xl, submission_type=self.sub['submission_type']['value']) - info = parser.parse_info() + info = parser.parse_info() self.info_map = parser.map # exclude_from_info = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.sub['submission_type']).exclude_from_info_parser() - for k,v in info.items(): + for k, v in info.items(): match k: case "sample": - # case item if + # case item if pass case _: self.sub[k] = v - - def parse_reagents(self, extraction_kit:str|None=None): + + def parse_reagents(self, extraction_kit: str | None = None): """ Pulls reagent info from the excel sheet Args: extraction_kit (str | None, optional): Relevant extraction kit for reagent map. Defaults to None. - """ + """ if extraction_kit == None: - extraction_kit = extraction_kit=self.sub['extraction_kit'] + extraction_kit = extraction_kit = self.sub['extraction_kit'] # logger.debug(f"Parsing reagents for {extraction_kit}") - self.sub['reagents'] = ReagentParser(xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=extraction_kit).parse_reagents() + self.sub['reagents'] = ReagentParser(xl=self.xl, submission_type=self.sub['submission_type'], + extraction_kit=extraction_kit).parse_reagents() def parse_samples(self): """ Pulls sample info from the excel sheet - """ + """ parser = SampleParser(xl=self.xl, submission_type=self.sub['submission_type']['value']) - self.sample_result, self.sub['samples'] = parser.parse_samples() - self.plate_map = parser.plate_map + self.sub['samples'] = parser.reconcile_samples() + # self.plate_map = parser.plate_map def parse_equipment(self): parser = EquipmentParser(xl=self.xl, submission_type=self.sub['submission_type']['value']) @@ -98,10 +107,11 @@ class SheetParser(object): def import_kit_validation_check(self): """ Enforce that the parser has an extraction kit - """ + """ from frontend.widgets.pop_ups import ObjectSelector if 'extraction_kit' not in self.sub.keys() or not check_not_nan(self.sub['extraction_kit']['value']): - dlg = ObjectSelector(title="Kit Needed", message="At minimum a kit is needed. Please select one.", obj_type=KitType) + dlg = ObjectSelector(title="Kit Needed", message="At minimum a kit is needed. Please select one.", + obj_type=KitType) if dlg.exec(): self.sub['extraction_kit'] = dict(value=dlg.parse_form(), missing=True) else: @@ -113,7 +123,7 @@ class SheetParser(object): def import_reagent_validation_check(self): """ Enforce that only allowed reagents get into the Pydantic Model - """ + """ kit = KitType.query(name=self.sub['extraction_kit']['value']) allowed_reagents = [item.name for item in kit.get_reagents()] # logger.debug(f"List of reagents for comparison with allowed_reagents: {pformat(self.sub['reagents'])}") @@ -122,9 +132,10 @@ class SheetParser(object): def finalize_parse(self): """ Run custom final validations of data for submission subclasses. - """ - finisher = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.sub['submission_type']).finalize_parse - self.sub = finisher(input_dict=self.sub, xl=self.xl, info_map=self.info_map, plate_map=self.plate_map) + """ + finisher = BasicSubmission.find_polymorphic_subclass( + polymorphic_identity=self.sub['submission_type']).finalize_parse + self.sub = finisher(input_dict=self.sub, xl=self.xl, info_map=self.info_map) def to_pydantic(self) -> PydSubmission: """ @@ -132,27 +143,33 @@ class SheetParser(object): Returns: PydSubmission: output pydantic model - """ + """ # logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pformat(self.sub)}") + pyd_dict = copy(self.sub) + pyd_dict['samples'] = [PydSample(**sample) for sample in self.sub['samples']] + pyd_dict['reagents'] = [PydReagent(**reagent) for reagent in self.sub['reagents']] logger.debug(f"Equipment: {self.sub['equipment']}") try: check = len(self.sub['equipment']) == 0 except TypeError: check = True if check: - self.sub['equipment'] = None - psm = PydSubmission(filepath=self.filepath, **self.sub) + pyd_dict['equipment'] = None + else: + pyd_dict['equipment'] = self.sub['equipment'] + psm = PydSubmission(filepath=self.filepath, **pyd_dict) return psm - + + class InfoParser(object): - def __init__(self, xl:pd.ExcelFile, submission_type:str): + def __init__(self, xl: Workbook, submission_type: str): logger.info(f"\n\Hello from InfoParser!\n\n") self.submission_type = submission_type self.map = self.fetch_submission_info_map() self.xl = xl logger.debug(f"Info map for InfoParser: {pformat(self.map)}") - + def fetch_submission_info_map(self) -> dict: """ Gets location of basic info from the submission_type object in the database. @@ -162,14 +179,17 @@ class InfoParser(object): Returns: dict: Location map of all info for this submission type - """ + """ if isinstance(self.submission_type, str): self.submission_type = dict(value=self.submission_type, missing=True) logger.debug(f"Looking up submission type: {self.submission_type['value']}") - submission_type = SubmissionType.query(name=self.submission_type['value']) - info_map = submission_type.info_map + # submission_type = SubmissionType.query(name=self.submission_type['value']) + # info_map = submission_type.info_map + self.sub_object: BasicSubmission = \ + BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type['value']) + info_map = self.sub_object.construct_info_map("read") # Get the parse_info method from the submission type specified - self.custom_parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info + return info_map def parse_info(self) -> dict: @@ -178,60 +198,77 @@ class InfoParser(object): Returns: dict: key:value of basic info - """ + """ if isinstance(self.submission_type, str): self.submission_type = dict(value=self.submission_type, missing=True) dicto = {} - exclude_from_generic = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type['value']).get_default_info("parser_ignore") + # exclude_from_generic = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type['value']).get_default_info("parser_ignore") # This loop parses generic info logger.debug(f"Map: {self.map}") - # time.sleep(5) - for sheet in self.xl.sheet_names: - df = self.xl.parse(sheet, header=None) - relevant = {} + # for sheet in self.xl.sheet_names: + for sheet in self.xl.sheetnames: + # df = self.xl.parse(sheet, header=None) + ws = self.xl[sheet] + relevant = [] for k, v in self.map.items(): - # exclude from generic parsing - if k in exclude_from_generic: - logger.warning(f"Key {k} is excluded due to parser_ignore") - continue # If the value is hardcoded put it in the dictionary directly. if isinstance(v, str): dicto[k] = dict(value=v, missing=False) continue logger.debug(f"Looking for {k} in self.map") - try: - check = sheet in self.map[k]['sheets'] - except TypeError: - continue - if check: - relevant[k] = v + logger.debug(f"Locations: {v}") + # try: + # check = sheet in self.map[k]['sheets'] + # except TypeError: + # continue + # if check: + # relevant[k] = v + for location in v: + if location['sheet'] == sheet: + new = location + new['name'] = k + relevant.append(new) logger.debug(f"relevant map for {sheet}: {pformat(relevant)}") - if relevant == {}: + if not relevant: continue for item in relevant: - value = df.iat[relevant[item]['row']-1, relevant[item]['column']-1] - match item: + # NOTE: Get cell contents at this location + # value = df.iat[item['row']-1, item['column']-1] + value = ws.cell(row=item['row'], column=item['column']).value + logger.debug(f"Value for {item['name']} = {value}") + match item['name']: case "submission_type": value, missing = is_missing(value) value = value.title() + case thing if thing in self.sub_object.jsons(): + value, missing = is_missing(value) + if missing: continue + value = dict(name=f"Parser_{sheet}", text=value, time=datetime.now()) + try: + dicto[item['name']]['value'] += value + continue + except KeyError: + logger.debug(f"New value for {item['name']}") case _: value, missing = is_missing(value) logger.debug(f"Setting {item} on {sheet} to {value}") - try: - dicto[item] = dict(value=value, missing=missing) - except (KeyError, IndexError): - continue - return self.custom_parser(input_dict=dicto, xl=self.xl) - + if item['name'] not in dicto.keys(): + try: + dicto[item['name']] = dict(value=value, missing=missing) + except (KeyError, IndexError): + continue + return self.sub_object.parse_info(input_dict=dicto, xl=self.xl) + + class ReagentParser(object): - def __init__(self, xl:pd.ExcelFile, submission_type:str, extraction_kit:str): + def __init__(self, xl: Workbook, submission_type: str, extraction_kit: str): logger.debug("\n\nHello from ReagentParser!\n\n") self.map = self.fetch_kit_info_map(extraction_kit=extraction_kit, submission_type=submission_type) logger.debug(f"Reagent Parser map: {self.map}") self.xl = xl - def fetch_kit_info_map(self, extraction_kit:dict, submission_type:str) -> dict: + def fetch_kit_info_map(self, extraction_kit: dict, submission_type: str) -> dict: """ Gets location of kit reagents from database @@ -241,7 +278,7 @@ class ReagentParser(object): Returns: dict: locations of reagent info for the kit. - """ + """ if isinstance(extraction_kit, dict): extraction_kit = extraction_kit['value'] kit = KitType.query(name=extraction_kit) @@ -250,35 +287,42 @@ class ReagentParser(object): reagent_map = kit.construct_xl_map_for_use(submission_type.title()) del reagent_map['info'] return reagent_map - + def parse_reagents(self) -> List[PydReagent]: """ Extracts reagent information from the excel form. Returns: List[PydReagent]: List of parsed reagents. - """ + """ listo = [] - for sheet in self.xl.sheet_names: - df = self.xl.parse(sheet, header=None, dtype=object) - df.replace({np.nan: None}, inplace = True) - relevant = {k.strip():v for k,v in self.map.items() if sheet in self.map[k]['sheet']} + for sheet in self.xl.sheetnames: + # df = self.xl.parse(sheet, header=None, dtype=object) + ws = self.xl[sheet] + # df.replace({np.nan: None}, inplace = True) + relevant = {k.strip(): v for k, v in self.map.items() if sheet in self.map[k]['sheet']} logger.debug(f"relevant map for {sheet}: {pformat(relevant)}") if relevant == {}: continue for item in relevant: logger.debug(f"Attempting to scrape: {item}") try: - name = df.iat[relevant[item]['name']['row']-1, relevant[item]['name']['column']-1] - lot = df.iat[relevant[item]['lot']['row']-1, relevant[item]['lot']['column']-1] - expiry = df.iat[relevant[item]['expiry']['row']-1, relevant[item]['expiry']['column']-1] + reagent = relevant[item] + # name = df.iat[relevant[item]['name']['row']-1, relevant[item]['name']['column']-1] + # lot = df.iat[relevant[item]['lot']['row']-1, relevant[item]['lot']['column']-1] + # expiry = df.iat[relevant[item]['expiry']['row']-1, relevant[item]['expiry']['column']-1] + name = ws.cell(row=reagent['name']['row'], column=reagent['name']['column']).value + lot = ws.cell(row=reagent['lot']['row'], column=reagent['lot']['column']).value + expiry = ws.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column']).value if 'comment' in relevant[item].keys(): logger.debug(f"looking for {relevant[item]} comment.") - comment = df.iat[relevant[item]['comment']['row']-1, relevant[item]['comment']['column']-1] + # comment = df.iat[relevant[item]['comment']['row']-1, relevant[item]['comment']['column']-1] + expiry = ws.cell(row=reagent['comment']['row'], column=reagent['comment']['column']).value else: comment = "" except (KeyError, IndexError): - listo.append(PydReagent(type=item.strip(), lot=None, expiry=None, name=None, comment="", missing=True)) + listo.append( + PydReagent(type=item.strip(), lot=None, expiry=None, name=None, comment="", missing=True)) continue # If the cell is blank tell the PydReagent if check_not_nan(lot): @@ -287,45 +331,49 @@ class ReagentParser(object): missing = True # logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}") lot = str(lot) - logger.debug(f"Going into pydantic: name: {name}, lot: {lot}, expiry: {expiry}, type: {item.strip()}, comment: {comment}") + logger.debug( + f"Going into pydantic: name: {name}, lot: {lot}, expiry: {expiry}, type: {item.strip()}, comment: {comment}") try: check = name.lower() != "not applicable" except AttributeError: check = True if check: - listo.append(PydReagent(type=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, missing=missing)) + listo.append(dict(type=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, + missing=missing)) return listo + class SampleParser(object): """ object to pull data for samples in excel sheet and construct individual sample objects """ - def __init__(self, xl:pd.ExcelFile, submission_type:str, sample_map:dict|None=None) -> None: + def __init__(self, xl: Workbook, submission_type: str, sample_map: dict | None = None) -> None: """ convert sample sub-dataframe to dictionary of records Args: df (pd.DataFrame): input sample dataframe elution_map (pd.DataFrame | None, optional): optional map of elution plate. Defaults to None. - """ + """ logger.debug("\n\nHello from SampleParser!\n\n") self.samples = [] self.xl = xl self.submission_type = submission_type - sample_info_map = self.fetch_sample_info_map(submission_type=submission_type, sample_map=sample_map) - logger.debug(f"sample_info_map: {sample_info_map}") - self.plate_map = self.construct_plate_map(plate_map_location=sample_info_map['plate_map']) + self.sample_info_map = self.fetch_sample_info_map(submission_type=submission_type, sample_map=sample_map) + logger.debug(f"sample_info_map: {self.sample_info_map}") + # self.plate_map = self.construct_plate_map(plate_map_location=sample_info_map['plate_map']) # logger.debug(f"plate_map: {self.plate_map}") - self.lookup_table = self.construct_lookup_table(lookup_table_location=sample_info_map['lookup_table']) - if "plates" in sample_info_map: - self.plates = sample_info_map['plates'] - self.excel_to_db_map = sample_info_map['xl_db_translation'] - self.create_basic_dictionaries_from_plate_map() - if isinstance(self.lookup_table, pd.DataFrame): - self.parse_lookup_table() - - def fetch_sample_info_map(self, submission_type:str, sample_map:dict|None=None) -> dict: + # self.lookup_table = self.construct_lookup_table(lookup_table_location=sample_info_map['lookup_table']) + # if "plates" in sample_info_map: + # self.plates = sample_info_map['plates'] + # self.excel_to_db_map = sample_info_map['xl_db_translation'] + self.plate_map_samples = self.parse_plate_map() + self.lookup_samples = self.parse_lookup_table() + # if isinstance(self.lookup_table, pd.DataFrame): + # self.parse_lookup_table() + + def fetch_sample_info_map(self, submission_type: str, sample_map: dict | None = None) -> dict: """ Gets info locations in excel book for submission type. @@ -334,19 +382,23 @@ class SampleParser(object): Returns: dict: Info locations. - """ + """ logger.debug(f"Looking up submission type: {submission_type}") - submission_type = SubmissionType.query(name=submission_type) - logger.debug(f"info_map: {pformat(submission_type.info_map)}") + # submission_type = SubmissionType.query(name=submission_type) + self.sub_object = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type) + # self.custom_sub_parser = .parse_samples + self.samp_object = BasicSample.find_polymorphic_subclass(polymorphic_identity=f"{submission_type} Sample") + logger.debug(f"Got sample class: {self.samp_object.__name__}") + # self.custom_sample_parser = .parse_sample + # logger.debug(f"info_map: {pformat(se)}") if sample_map is None: - sample_info_map = submission_type.info_map['samples'] + # sample_info_map = submission_type.info_map['samples'] + sample_info_map = self.sub_object.construct_sample_map() else: sample_info_map = sample_map - self.custom_sub_parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_samples - self.custom_sample_parser = BasicSample.find_polymorphic_subclass(polymorphic_identity=f"{submission_type.name} Sample").parse_sample return sample_info_map - def construct_plate_map(self, plate_map_location:dict) -> pd.DataFrame: + def construct_plate_map(self, plate_map_location: dict) -> pd.DataFrame: """ Gets location of samples from plate map grid in excel sheet. @@ -355,19 +407,20 @@ class SampleParser(object): Returns: pd.DataFrame: Plate map grid - """ + """ logger.debug(f"Plate map location: {plate_map_location}") df = self.xl.parse(plate_map_location['sheet'], header=None, dtype=object) - df = df.iloc[plate_map_location['start_row']-1:plate_map_location['end_row'], plate_map_location['start_column']-1:plate_map_location['end_column']] + df = df.iloc[plate_map_location['start_row'] - 1:plate_map_location['end_row'], + plate_map_location['start_column'] - 1:plate_map_location['end_column']] df = pd.DataFrame(df.values[1:], columns=df.iloc[0]) df = df.set_index(df.columns[0]) logger.debug(f"Vanilla platemap: {df}") - custom_mapper = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) - df = custom_mapper.custom_platemap(self.xl, df) - logger.debug(f"Custom platemap:\n{df}") + # custom_mapper = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) + df = self.sub_object.custom_platemap(self.xl, df) + # logger.debug(f"Custom platemap:\n{df}") return df - - def construct_lookup_table(self, lookup_table_location:dict) -> pd.DataFrame: + + def construct_lookup_table(self, lookup_table_location: dict) -> pd.DataFrame: """ Gets table of misc information from excel book @@ -376,89 +429,118 @@ class SampleParser(object): Returns: pd.DataFrame: _description_ - """ + """ try: df = self.xl.parse(lookup_table_location['sheet'], header=None, dtype=object) except KeyError: return None - df = df.iloc[lookup_table_location['start_row']-1:lookup_table_location['end_row']] + df = df.iloc[lookup_table_location['start_row'] - 1:lookup_table_location['end_row']] df = pd.DataFrame(df.values[1:], columns=df.iloc[0]) df = df.reset_index(drop=True) return df - - def create_basic_dictionaries_from_plate_map(self): + + def parse_plate_map(self): """ Parse sample location/name from plate map - """ + """ invalids = [0, "0", "EMPTY"] - new_df = self.plate_map.dropna(axis=1, how='all') - columns = new_df.columns.tolist() - for _, iii in new_df.iterrows(): - for c in columns: - if check_not_nan(iii[c]): - if iii[c] in invalids: - logger.debug(f"Invalid sample name: {iii[c]}, skipping.") - continue - id = iii[c] - logger.debug(f"Adding sample {iii[c]}") - try: - c = self.plate_map.columns.get_loc(c) + 1 - except Exception as e: - logger.error(f"Unable to get column index of {c} due to {e}") - self.samples.append(dict(submitter_id=id, row=row_keys[iii._name], column=c)) - - def parse_lookup_table(self): + smap = self.sample_info_map['plate_map'] + ws = self.xl[smap['sheet']] + # ws.protection = SheetProtection() + # new_df = self.plate_map.dropna(axis=1, how='all') + # columns = new_df.columns.tolist() + # for _, iii in new_df.iterrows(): + # for c in columns: + # if check_not_nan(iii[c]): + # if iii[c] in invalids: + # logger.debug(f"Invalid sample name: {iii[c]}, skipping.") + # continue + # id = iii[c] + # logger.debug(f"Adding sample {iii[c]}") + # try: + # c = self.plate_map.columns.get_loc(c) + 1 + # except Exception as e: + # logger.error(f"Unable to get column index of {c} due to {e}") + # self.samples.append(dict(submitter_id=id, row=row_keys[iii._name], column=c)) + plate_map_samples = [] + for ii, row in enumerate(range(smap['start_row'], smap['end_row'] + 1), start=1): + # logger.debug(f"Parsing row: {row}") + for jj, column in enumerate(range(smap['start_column'], smap['end_column'] + 1), start=1): + # logger.debug(f"Parsing column: {column}") + id = str(ws.cell(row=row, column=column).value) + if check_not_nan(id): + if id not in invalids: + sample_dict = dict(id=id, row=ii, column=jj) + sample_dict['sample_type'] = f"{self.submission_type} Sample" + plate_map_samples.append(sample_dict) + else: + # logger.error(f"Sample cell ({row}, {column}) has invalid value: {id}.") + pass + else: + # logger.error(f"Sample cell ({row}, {column}) has no info: {id}.") + pass + return plate_map_samples + + def parse_lookup_table(self) -> dict: """ Parse misc info from lookup table. - """ - def determine_if_date(input_str) -> str|date: - regex = re.compile(r"^\d{4}-?\d{2}-?\d{2}") - if bool(regex.search(input_str)): - logger.warning(f"{input_str} is a date!") - try: - return parse(input_str) - except ParserError: - return None - else: - return input_str - for sample in self.samples: - addition = self.lookup_table[self.lookup_table.isin([sample['submitter_id']]).any(axis=1)].squeeze() - # logger.debug(addition) - if isinstance(addition, pd.DataFrame) and not addition.empty: - addition = addition.iloc[0] - # logger.debug(f"Lookuptable info: {addition.to_dict()}") - for k,v in addition.to_dict().items(): - # logger.debug(f"Checking {k} in lookup table.") - if check_not_nan(k) and isinstance(k, str): - if k.lower() not in sample: - k = k.replace(" ", "_").replace("#","num").lower() - # logger.debug(f"Adding {type(v)} - {k}, {v} to the lookuptable output dict") - match v: - case pd.Timestamp(): - sample[k] = v.date() - case str(): - sample[k] = determine_if_date(v) - case _: - sample[k] = v - # Set row in lookup table to blank values to prevent multipe lookups. + """ + lmap = self.sample_info_map['lookup_table'] + ws = self.xl[lmap['sheet']] + # for sample in self.samples: + # addition = self.lookup_table[self.lookup_table.isin([sample['submitter_id']]).any(axis=1)].squeeze() + # # logger.debug(addition) + # if isinstance(addition, pd.DataFrame) and not addition.empty: + # addition = addition.iloc[0] + # # logger.debug(f"Lookuptable info: {addition.to_dict()}") + # for k,v in addition.to_dict().items(): + # # logger.debug(f"Checking {k} in lookup table.") + # if check_not_nan(k) and isinstance(k, str): + # if k.lower() not in sample: + # k = k.replace(" ", "_").replace("#","num").lower() + # # logger.debug(f"Adding {type(v)} - {k}, {v} to the lookuptable output dict") + # match v: + # case pd.Timestamp(): + # sample[k] = v.date() + # case str(): + # sample[k] = determine_if_date(v) + # case _: + # sample[k] = v + # # Set row in lookup table to blank values to prevent multipe lookups. + # try: + # self.lookup_table.loc[self.lookup_table['Sample #']==addition['Sample #']] = np.nan + # except (ValueError, KeyError): + # pass + # try: + # self.lookup_table.loc[self.lookup_table['Well']==addition['Well']] = np.nan + # except (ValueError, KeyError): + # pass + # # logger.debug(f"Output sample dict: {sample}") + # logger.debug(f"Final lookup_table: \n\n {self.lookup_table}") + lookup_samples = [] + for ii, row in enumerate(range(lmap['start_row'], lmap['end_row']+1), start=1): + row_dict = {k:ws.cell(row=row, column=v).value for k, v in lmap['sample_columns'].items()} try: - self.lookup_table.loc[self.lookup_table['Sample #']==addition['Sample #']] = np.nan - except (ValueError, KeyError): + row_dict[lmap['merge_on_id']] = str(row_dict[lmap['merge_on_id']]) + except KeyError: pass + row_dict['sample_type'] = f"{self.submission_type} Sample" + row_dict['submission_rank'] = ii try: - self.lookup_table.loc[self.lookup_table['Well']==addition['Well']] = np.nan - except (ValueError, KeyError): - pass - # logger.debug(f"Output sample dict: {sample}") - logger.debug(f"Final lookup_table: \n\n {self.lookup_table}") + check = check_not_nan(row_dict[lmap['merge_on_id']]) + except KeyError: + check = False + if check: + lookup_samples.append(self.samp_object.parse_sample(row_dict)) + return lookup_samples - def parse_samples(self) -> List[dict]|List[BasicSample]: + def parse_samples(self) -> Tuple[Report | None, List[dict] | List[PydSample]]: """ Parse merged platemap\lookup info into dicts/samples Returns: List[dict]|List[models.BasicSample]: List of samples - """ + """ result = None new_samples = [] # logger.debug(f"Starting samples: {pformat(self.samples)}") @@ -472,13 +554,15 @@ class SampleParser(object): v = convert_nans_to_nones(v) case _: v = v - try: - translated_dict[self.excel_to_db_map[k]] = convert_nans_to_nones(v) - except KeyError: - translated_dict[k] = convert_nans_to_nones(v) + # try: + # translated_dict[self.excel_to_db_map[k]] = convert_nans_to_nones(v) + # except KeyError: + translated_dict[k] = convert_nans_to_nones(v) translated_dict['sample_type'] = f"{self.submission_type} Sample" - translated_dict = self.custom_sub_parser(translated_dict) - translated_dict = self.custom_sample_parser(translated_dict) + # translated_dict = self.custom_sub_parser(translated_dict) + translated_dict = self.sub_object.parse_samples(translated_dict) + # translated_dict = self.custom_sample_parser(translated_dict) + translated_dict = self.samp_object.parse_sample(translated_dict) # logger.debug(f"Here is the output of the custom parser:\n{translated_dict}") new_samples.append(PydSample(**translated_dict)) return result, new_samples @@ -489,20 +573,77 @@ class SampleParser(object): Returns: List[str]: list of plate names. - """ + """ plates = [] for plate in self.plates: df = self.xl.parse(plate['sheet'], header=None) - if isinstance(df.iat[plate['row']-1, plate['column']-1], str): - output = RSLNamer.retrieve_rsl_number(filename=df.iat[plate['row']-1, plate['column']-1]) + if isinstance(df.iat[plate['row'] - 1, plate['column'] - 1], str): + output = RSLNamer.retrieve_rsl_number(filename=df.iat[plate['row'] - 1, plate['column'] - 1]) else: continue plates.append(output) return plates - + + def reconcile_samples(self): + # TODO: Move to pydantic validator? + if self.plate_map_samples is None or self.lookup_samples is None: + self.samples = self.lookup_samples or self.plate_map_samples + return + samples = [] + merge_on_id = self.sample_info_map['lookup_table']['merge_on_id'] + plate_map_samples = sorted(copy(self.plate_map_samples), key=lambda d: d['id']) + lookup_samples = sorted(copy(self.lookup_samples), key=lambda d: d[merge_on_id]) + # try: + # assert len(plate_map_samples) == len(lookup_samples) + # except AssertionError: + # if len(plate_map_samples) > len(lookup_samples): + # logger.error( + # f"Plate samples ({len(plate_map_samples)}) is longer than Lookup samples: ({len(lookup_samples)})") + # return plate_map_samples + # else: + # logger.error( + # f"Lookup samples ({len(lookup_samples)}) is longer than Plate samples: ({len(plate_map_samples)})") + # return lookup_samples + for ii, psample in enumerate(plate_map_samples): + try: + check = psample['id'] == lookup_samples[ii][merge_on_id] + except (KeyError, IndexError): + check = False + if check: + logger.debug(f"Direct match found for {psample['id']}") + new = lookup_samples[ii] | psample + lookup_samples[ii] = {} + # samples.append(new) + else: + logger.warning(f"Match for {psample['id']} not direct, running search.") + for jj, lsample in enumerate(lookup_samples): + try: + check = lsample[merge_on_id] == psample['id'] + except KeyError: + check = False + if check: + new = lsample | psample + lookup_samples[jj] = {} + # self.samples.append(new) + # samples.append(new) + break + else: + new = psample + # samples.append(psample) + new['sample_type'] = f"{self.submission_type} Sample" + try: + check = new['submitter_id'] is None + except KeyError: + check = True + if check: + new['submitter_id'] = psample['id'] + samples.append(new) + samples = remove_key_from_list_of_dicts(samples, "id") + return sorted(samples, key=lambda k: (k['row'], k['column'])) + class EquipmentParser(object): - def __init__(self, xl:pd.ExcelFile, submission_type:str) -> None: + def __init__(self, xl: Workbook, submission_type: str) -> None: self.submission_type = submission_type self.xl = xl self.map = self.fetch_equipment_map() @@ -513,11 +654,11 @@ class EquipmentParser(object): Returns: List[dict]: List of locations - """ + """ submission_type = SubmissionType.query(name=self.submission_type) return submission_type.construct_equipment_map() - - def get_asset_number(self, input:str) -> str: + + def get_asset_number(self, input: str) -> str: """ Pulls asset number from string. @@ -526,60 +667,67 @@ class EquipmentParser(object): Returns: str: asset number - """ + """ regex = Equipment.get_regex() logger.debug(f"Using equipment regex: {regex} on {input}") try: return regex.search(input).group().strip("-") except AttributeError: return input - + def parse_equipment(self) -> List[PydEquipment]: """ Scrapes equipment from xl sheet Returns: List[PydEquipment]: list of equipment - """ + """ logger.debug(f"Equipment parser going into parsing: {pformat(self.__dict__)}") output = [] # logger.debug(f"Sheets: {sheets}") - for sheet in self.xl.sheet_names: - df = self.xl.parse(sheet, header=None, dtype=object) + for sheet in self.xl.sheetnames: + # df = self.xl.parse(sheet, header=None, dtype=object) + ws = self.xl[sheet] try: - relevant = [item for item in self.map if item['sheet']==sheet] + relevant = [item for item in self.map if item['sheet'] == sheet] except (TypeError, KeyError): continue # logger.debug(f"Relevant equipment: {pformat(relevant)}") previous_asset = "" for equipment in relevant: - asset = df.iat[equipment['name']['row']-1, equipment['name']['column']-1] + # asset = df.iat[equipment['name']['row']-1, equipment['name']['column']-1] + asset = ws.cell(equipment['name']['row'], equipment['name']['column']) if not check_not_nan(asset): asset = previous_asset else: previous_asset = asset asset = self.get_asset_number(input=asset) eq = Equipment.query(asset_number=asset) - process = df.iat[equipment['process']['row']-1, equipment['process']['column']-1] + # process = df.iat[equipment['process']['row']-1, equipment['process']['column']-1] + process = ws.cell(row=equipment['process']['row'], column=equipment['process']['column']) try: - output.append(PydEquipment(name=eq.name, processes=[process], role=equipment['role'], asset_number=asset, nickname=eq.nickname)) + output.append( + dict(name=eq.name, processes=[process], role=equipment['role'], asset_number=asset, + nickname=eq.nickname)) except AttributeError: logger.error(f"Unable to add {eq} to PydEquipment list.") # logger.debug(f"Here is the output so far: {pformat(output)}") return output + class PCRParser(object): """ Object to pull data from Design and Analysis PCR export file. - """ - def __init__(self, filepath:Path|None = None) -> None: + """ + + def __init__(self, filepath: Path | None = None) -> None: """ Initializes object. Args: filepath (Path | None, optional): file to parse. Defaults to None. - """ - logger.debug(f"Parsing {filepath.__str__()}") + """ + logger.debug(f"Parsing {filepath.__str__()}") if filepath == None: logger.error(f"No filepath given.") self.xl = None @@ -599,14 +747,14 @@ class PCRParser(object): logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.samples = parser.parse_pcr(xl=self.xl, rsl_number=self.plate_num) - - def parse_general(self, sheet_name:str): + + def parse_general(self, sheet_name: str): """ Parse general info rows for all types of PCR results Args: sheet_name (str): Name of sheet in excel workbook that holds info. - """ + """ self.pcr = {} df = self.xl.parse(sheet_name=sheet_name, dtype=object).fillna("") self.pcr['comment'] = df.iloc[0][1] diff --git a/src/submissions/backend/excel/writer.py b/src/submissions/backend/excel/writer.py new file mode 100644 index 0000000..4a08bc1 --- /dev/null +++ b/src/submissions/backend/excel/writer.py @@ -0,0 +1,248 @@ +import logging +from copy import copy +from pathlib import Path +from typing import List + +from openpyxl import load_workbook, Workbook +from tools import row_keys +from backend.db.models import SubmissionType, KitType +from backend.validators.pydant import PydSubmission +from io import BytesIO +from collections import OrderedDict + +logger = logging.getLogger(f"submissions.{__name__}") + + +class SheetWriter(object): + """ + object to pull and contain data from excel file + """ + + def __init__(self, submission: PydSubmission, missing_only: bool = False): + """ + Args: + filepath (Path | None, optional): file path to excel sheet. Defaults to None. + """ + self.sub = OrderedDict(submission.improved_dict()) + for k, v in self.sub.items(): + match k: + case 'filepath': + self.__setattr__(k, v) + case 'submission_type': + # self.__setattr__('submission_type', submission.submission_type['value']) + self.sub[k] = v['value'] + self.submission_type = SubmissionType.query(name=v['value']) + case _: + if isinstance(v, dict): + self.sub[k] = v['value'] + else: + self.sub[k] = v + logger.debug(f"\n\nWriting to {submission.filepath.__str__()}\n\n") + + if self.filepath.stem.startswith("tmp"): + template = self.submission_type.template_file + workbook = load_workbook(BytesIO(template)) + missing_only = False + else: + try: + workbook = load_workbook(self.filepath) + except Exception as e: + logger.error(f"Couldn't open workbook due to {e}") + template = self.submission_type.template_file + workbook = load_workbook(BytesIO(template)) + missing_only = False + self.workbook = workbook + self.write_info() + self.write_reagents() + self.write_samples() + self.write_equipment() + + def write_info(self): + disallowed = ['filepath', 'reagents', 'samples', 'equipment', 'controls'] + info_dict = {k: v for k, v in self.sub.items() if k not in disallowed} + writer = InfoWriter(xl=self.workbook, submission_type=self.submission_type, info_dict=info_dict) + self.xl = writer.write_info() + + def write_reagents(self): + reagent_list = self.sub['reagents'] + writer = ReagentWriter(xl=self.workbook, submission_type=self.submission_type, + extraction_kit=self.sub['extraction_kit'], reagent_list=reagent_list) + self.xl = writer.write_reagents() + + def write_samples(self): + sample_list = self.sub['samples'] + writer = SampleWriter(xl=self.workbook, submission_type=self.submission_type, sample_list=sample_list) + self.xl = writer.write_samples() + + def write_equipment(self): + equipment_list = self.sub['equipment'] + writer = EquipmentWriter(xl=self.workbook, submission_type=self.submission_type, equipment_list=equipment_list) + self.xl = writer.write_equipment() + + +class InfoWriter(object): + + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, info_dict: dict): + if isinstance(submission_type, str): + submission_type = SubmissionType.query(name=submission_type) + self.submission_type = submission_type + self.xl = xl + map = submission_type.construct_info_map(mode='write') + self.info = self.reconcile_map(info_dict, map) + + def reconcile_map(self, info_dict: dict, map: dict) -> dict: + output = {} + for k, v in info_dict.items(): + if v is None: + continue + dicto = {} + try: + dicto['locations'] = map[k] + except KeyError: + continue + dicto['value'] = v + if len(dicto) > 0: + output[k] = dicto + return output + + def write_info(self): + for k, v in self.info.items(): + try: + locations = v['locations'] + except KeyError: + logger.error(f"No locations for {k}, skipping") + continue + for loc in locations: + logger.debug(f"Writing {k} to {loc['sheet']}, row: {loc['row']}, column: {loc['column']}") + sheet = self.xl[loc['sheet']] + sheet.cell(row=loc['row'], column=loc['column'], value=v['value']) + return self.xl + + +class ReagentWriter(object): + + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, extraction_kit: KitType | str, + reagent_list: list): + self.xl = xl + if isinstance(submission_type, str): + submission_type = SubmissionType.query(name=submission_type) + if isinstance(extraction_kit, str): + kit_type = KitType.query(name=extraction_kit) + map = kit_type.construct_xl_map_for_use(submission_type) + self.reagents = self.reconcile_map(reagent_list=reagent_list, map=map) + + def reconcile_map(self, reagent_list, map) -> List[dict]: + output = [] + for reagent in reagent_list: + try: + mp_info = map[reagent['type']] + except KeyError: + continue + placeholder = copy(reagent) + for k, v in reagent.items(): + try: + dicto = dict(value=v, row=mp_info[k]['row'], column=mp_info[k]['column']) + except KeyError as e: + logger.error(f"Keyerror: {e}") + dicto = v + placeholder[k] = dicto + placeholder['sheet'] = mp_info['sheet'] + output.append(placeholder) + return output + + def write_reagents(self): + for reagent in self.reagents: + sheet = self.xl[reagent['sheet']] + for k, v in reagent.items(): + if not isinstance(v, dict): + continue + logger.debug( + f"Writing {reagent['type']} {k} to {reagent['sheet']}, row: {v['row']}, column: {v['column']}") + sheet.cell(row=v['row'], column=v['column'], value=v['value']) + return self.xl + + +class SampleWriter(object): + + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, sample_list: list): + if isinstance(submission_type, str): + submission_type = SubmissionType.query(name=submission_type) + self.submission_type = submission_type + self.xl = xl + self.map = submission_type.construct_sample_map()['lookup_table'] + self.samples = self.reconcile_map(sample_list) + + def reconcile_map(self, sample_list: list): + output = [] + multiples = ['row', 'column', 'assoc_id', 'submission_rank'] + for sample in sample_list: + for assoc in zip(sample['row'], sample['column'], sample['submission_rank']): + new = dict(row=assoc[0], column=assoc[1], submission_rank=assoc[2]) + for k, v in sample.items(): + if k in multiples: + continue + new[k] = v + output.append(new) + return sorted(output, key=lambda k: k['submission_rank']) + + def write_samples(self): + sheet = self.xl[self.map['sheet']] + columns = self.map['sample_columns'] + # rows = range(self.map['start_row'], self.map['end_row']+1) + for ii, sample in enumerate(self.samples): + row = self.map['start_row'] + (sample['submission_rank'] - 1) + for k, v in sample.items(): + try: + column = columns[k] + except KeyError: + continue + sheet.cell(row=row, column=column, value=v) + return self.xl + + +class EquipmentWriter(object): + + def __init__(self, xl: Workbook, submission_type: SubmissionType | str, equipment_list: list): + if isinstance(submission_type, str): + submission_type = SubmissionType.query(name=submission_type) + self.submission_type = submission_type + self.xl = xl + map = self.submission_type.construct_equipment_map() + self.equipment = self.reconcile_map(equipment_list=equipment_list, map=map) + + def reconcile_map(self, equipment_list: list, map: list): + output = [] + for ii, equipment in enumerate(equipment_list, start=1): + mp_info = map[equipment['role']] + placeholder = copy(equipment) + for jj, (k, v) in enumerate(equipment.items(), start=1): + try: + dicto = dict(value=v, row=mp_info[k]['row'], column=mp_info[k]['column']) + except KeyError as e: + logger.error(f"Keyerror: {e}") + dicto = dict(value=v, row=ii, column=jj) + placeholder[k] = dicto + try: + placeholder['sheet'] = mp_info['sheet'] + except KeyError: + placeholder['sheet'] = "Equipment" + output.append(placeholder) + return output + + def write_equipment(self): + for equipment in self.equipment: + try: + sheet = self.xl[equipment['sheet']] + except KeyError: + self.xl.create_sheet("Equipment") + finally: + sheet = self.xl[equipment['sheet']] + for k, v in equipment.items(): + if not isinstance(v, dict): + continue + logger.debug( + f"Writing {equipment['role']} {k} to {equipment['sheet']}, row: {v['row']}, column: {v['column']}") + if isinstance(v['value'], list): + v['value'] = v['value'][0] + sheet.cell(row=v['row'], column=v['column'], value=v['value']) + return self.xl diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py index 22c0102..8a63f03 100644 --- a/src/submissions/backend/validators/__init__.py +++ b/src/submissions/backend/validators/__init__.py @@ -7,13 +7,17 @@ from jinja2 import Template logger = logging.getLogger(f"submissions.{__name__}") + class RSLNamer(object): """ Object that will enforce proper formatting on RSL plate names. """ - def __init__(self, filename:str, sub_type:str|None=None, data:dict|None=None): + + def __init__(self, filename: str, sub_type: str | None = None, data: dict | None = None): + # NOTE: Preferred method is path retrieval, but might also need validation for just string. + filename = Path(filename) if Path(filename).exists() else filename self.submission_type = sub_type - if self.submission_type == None: + if self.submission_type is None: # logger.debug("Creating submission type because none exists") self.submission_type = self.retrieve_submission_type(filename=filename) logger.debug(f"got submission type: {self.submission_type}") @@ -21,14 +25,14 @@ class RSLNamer(object): # logger.debug("Retrieving BasicSubmission subclass") enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=enforcer.get_regex()) - if data == None: + if data is None: data = dict(submission_type=self.submission_type) if "submission_type" not in data.keys(): data['submission_type'] = self.submission_type self.parsed_name = enforcer.enforce_name(instr=self.parsed_name, data=data) @classmethod - def retrieve_submission_type(cls, filename:str|Path) -> str: + def retrieve_submission_type(cls, filename: str | Path) -> str: """ Gets submission type from excel file properties or sheet names or regex pattern match or user input @@ -37,7 +41,7 @@ class RSLNamer(object): Returns: str: parsed submission type - """ + """ match filename: case Path(): logger.debug(f"Using path method for {filename}.") @@ -47,8 +51,8 @@ class RSLNamer(object): submission_type = [item.strip().title() for item in wb.properties.category.split(";")][0] except AttributeError: try: - sts = {item.name:item.get_template_file_sheets() for item in SubmissionType.query()} - for k,v in sts.items(): + sts = {item.name: item.get_template_file_sheets() for item in SubmissionType.query()} + for k, v in sts.items(): # This gets the *first* submission type that matches the sheet names in the workbook if wb.sheetnames == v: submission_type = k.title() @@ -69,28 +73,30 @@ class RSLNamer(object): case _: submission_type = None try: - check = submission_type == None + check = submission_type is None except UnboundLocalError: check = True if check: # logger.debug("Final option, ask the user for submission type") from frontend.widgets import ObjectSelector - dlg = ObjectSelector(title="Couldn't parse submission type.", message="Please select submission type from list below.", obj_type=SubmissionType) + dlg = ObjectSelector(title="Couldn't parse submission type.", + message="Please select submission type from list below.", obj_type=SubmissionType) if dlg.exec(): submission_type = dlg.parse_form() submission_type = submission_type.replace("_", " ") return submission_type @classmethod - def retrieve_rsl_number(cls, filename:str|Path, regex:str|None=None): + def retrieve_rsl_number(cls, filename: str | Path, regex: str | None = None): """ Uses regex to retrieve the plate number and submission type from an input string Args: - in_str (str): string to be parsed - """ + regex (str): string to construct pattern + filename (str): string to be parsed + """ logger.debug(f"Input string to be parsed: {filename}") - if regex == None: + if regex is None: regex = BasicSubmission.construct_regex() else: regex = re.compile(rf'{regex}', re.IGNORECASE | re.VERBOSE) @@ -102,19 +108,19 @@ class RSLNamer(object): logger.debug(f"Using string method.") m = regex.search(filename) case _: - pass - if m != None: + m = None + if m is not None: try: parsed_name = m.group().upper().strip(".") except: parsed_name = None - else: + else: parsed_name = None logger.debug(f"Got parsed submission name: {parsed_name}") return parsed_name - + @classmethod - def construct_new_plate_name(cls, data:dict) -> str: + def construct_new_plate_name(cls, data: dict) -> str: """ Make a brand new plate name from submission data. @@ -123,7 +129,7 @@ class RSLNamer(object): Returns: str: Output filename - """ + """ if "submitted_date" in data.keys(): if isinstance(data['submitted_date'], dict): if data['submitted_date']['value'] != None: @@ -144,9 +150,9 @@ class RSLNamer(object): previous = BasicSubmission.query(start_date=today, end_date=today, submission_type=data['submission_type']) plate_number = len(previous) + 1 return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}" - + @classmethod - def construct_export_name(cls, template:Template, **kwargs) -> str: + def construct_export_name(cls, template: Template, **kwargs) -> str: """ Make export file name from jinja template. (currently unused) @@ -155,11 +161,13 @@ class RSLNamer(object): Returns: str: output file name. - """ + """ logger.debug(f"Kwargs: {kwargs}") logger.debug(f"Template: {template}") environment = jinja_template_loading() template = environment.from_string(template) return template.render(**kwargs) - -from .pydant import * + + +from .pydant import PydSubmission, PydKit, PydContact, PydOrganization, PydSample, PydReagent, PydReagentType, \ + PydEquipment, PydEquipmentRole diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index e0112fa..58d3b12 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -4,10 +4,11 @@ Contains pydantic models and accompanying validators from __future__ import annotations from operator import attrgetter import uuid, re, logging -from pydantic import BaseModel, field_validator, Field +from pydantic import BaseModel, field_validator, Field, model_validator from datetime import date, datetime, timedelta from dateutil.parser import parse -from dateutil.parser._parser import ParserError +# from dateutil.parser._parser import ParserError +from dateutil.parser import ParserError from typing import List, Tuple, Literal from . import RSLNamer from pathlib import Path @@ -20,14 +21,17 @@ from io import BytesIO logger = logging.getLogger(f"submissions.{__name__}") + +# class PydMixin(object): + + class PydReagent(BaseModel): - - lot: str|None - type: str|None - expiry: date|Literal['NA']|None - name: str|None + lot: str | None + type: str | None + expiry: date | Literal['NA'] | None + name: str | None missing: bool = Field(default=True) - comment: str|None = Field(default="", validate_default=True) + comment: str | None = Field(default="", validate_default=True) @field_validator('comment', mode='before') @classmethod @@ -44,7 +48,7 @@ class PydReagent(BaseModel): return None case _: return value - + @field_validator("type") @classmethod def rescue_type_with_lookup(cls, value, values): @@ -62,14 +66,14 @@ class PydReagent(BaseModel): if value != None: return convert_nans_to_nones(str(value)) return value - + @field_validator("lot") @classmethod def enforce_lot_string(cls, value): if value != None: return value.upper() return value - + @field_validator("expiry", mode="before") @classmethod def enforce_date(cls, value): @@ -88,14 +92,14 @@ class PydReagent(BaseModel): if value == None: value = date.today() return value - + @field_validator("expiry") @classmethod def date_na(cls, value): if isinstance(value, date) and value.year == 1970: value = "NA" return value - + @field_validator("name", mode="before") @classmethod def enforce_name(cls, value, values): @@ -104,13 +108,30 @@ class PydReagent(BaseModel): else: return values.data['type'] - def toSQL(self, submission:BasicSubmission|str=None) -> Tuple[Reagent, Report]: + def improved_dict(self) -> dict: + try: + extras = list(self.model_extra.keys()) + except AttributeError: + extras = [] + fields = list(self.model_fields.keys()) + extras + # output = {} + # for k in fields: + # value = getattr(self, k) + # match value: + # case date(): + # value = value.strftime("%Y-%m-%d") + # case _: + # pass + # output[k] = value + return {k: getattr(self, k) for k in fields} + + def toSQL(self, submission: BasicSubmission | str = None) -> Tuple[Reagent, Report]: """ Converts this instance into a backend.db.models.kit.Reagent instance Returns: Tuple[Reagent, Report]: Reagent instance and result of function - """ + """ report = Report() # logger.debug("Adding extra fields.") if self.model_extra != None: @@ -150,28 +171,51 @@ class PydReagent(BaseModel): # add end-of-life extension from reagent type to expiry date # NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions return reagent, report - -class PydSample(BaseModel, extra='allow'): + # def improved_dict(self) -> dict: + # fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) + # return {k:getattr(self,k) for k in fields} + + +class PydSample(BaseModel, extra='allow'): submitter_id: str sample_type: str - row: int|List[int]|None - column: int|List[int]|None - assoc_id: int|List[int]|None = Field(default=None) + row: int | List[int] | None + column: int | List[int] | None + assoc_id: int | List[int] | None = Field(default=None) + submission_rank: int | List[int] | None - @field_validator("row", "column", "assoc_id") + @model_validator(mode='after') + @classmethod + def validate_model(cls, data): + logger.debug(f"Data for pydsample: {data}") + model = BasicSample.find_polymorphic_subclass(polymorphic_identity=data.sample_type) + for k, v in data.model_extra.items(): + # print(k, v) + if k in model.timestamps(): + if isinstance(v, str): + v = datetime.strptime(v, "%Y-%m-%d") + data.__setattr__(k, v) + # print(dir(data)) + return data + + @field_validator("row", "column", "assoc_id", "submission_rank") @classmethod def row_int_to_list(cls, value): if isinstance(value, int): return [value] return value - + @field_validator("submitter_id", mode="before") @classmethod def int_to_str(cls, value): return str(value) - - def toSQL(self, submission:BasicSubmission|str=None) -> Tuple[BasicSample, Result]: + + def improved_dict(self) -> dict: + fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) + return {k: getattr(self, k) for k in fields} + + def toSQL(self, submission: BasicSubmission | str = None) -> Tuple[BasicSample, Result]: """ Converts this instance into a backend.db.models.submissions.Sample object @@ -180,7 +224,7 @@ class PydSample(BaseModel, extra='allow'): Returns: Tuple[BasicSample, Result]: Sample object and result object. - """ + """ report = None self.__dict__.update(self.model_extra) logger.debug(f"Here is the incoming sample dict: \n{self.__dict__}") @@ -199,10 +243,10 @@ class PydSample(BaseModel, extra='allow'): for row, column, id in zip(self.row, self.column, self.assoc_id): logger.debug(f"Looking up association with identity: ({submission.submission_type_name} Association)") logger.debug(f"Looking up association with identity: ({assoc_type} Association)") - association = SubmissionSampleAssociation.query_or_create(association_type=f"{assoc_type} Association", - submission=submission, - sample=instance, - row=row, column=column, id=id) + association = SubmissionSampleAssociation.query_or_create(association_type=f"{assoc_type} Association", + submission=submission, + sample=instance, + row=row, column=column, id=id) # logger.debug(f"Using submission_sample_association: {association}") try: # instance.sample_submission_associations.append(association) @@ -212,13 +256,21 @@ class PydSample(BaseModel, extra='allow'): instance.metadata.session.rollback() return instance, out_associations, report -class PydEquipment(BaseModel, extra='ignore'): + def improved_dict(self) -> dict: + try: + extras = list(self.model_extra.keys()) + except AttributeError: + extras = [] + fields = list(self.model_fields.keys()) + extras + return {k: getattr(self, k) for k in fields} + +class PydEquipment(BaseModel, extra='ignore'): asset_number: str name: str - nickname: str|None - processes: List[str]|None - role: str|None + nickname: str | None + processes: List[str] | None + role: str | None @field_validator('processes', mode='before') @classmethod @@ -227,11 +279,11 @@ class PydEquipment(BaseModel, extra='ignore'): value = convert_nans_to_nones(value) if value == None: value = [''] - if len(value)==0: - value=[''] + if len(value) == 0: + value = [''] return value - def toSQL(self, submission:BasicSubmission|str=None) -> Tuple[Equipment, SubmissionEquipmentAssociation]: + def toSQL(self, submission: BasicSubmission | str = None) -> Tuple[Equipment, SubmissionEquipmentAssociation]: """ Creates Equipment and SubmssionEquipmentAssociations for this PydEquipment @@ -240,7 +292,7 @@ class PydEquipment(BaseModel, extra='ignore'): Returns: Tuple[Equipment, SubmissionEquipmentAssociation]: SQL objects - """ + """ if isinstance(submission, str): submission = BasicSubmission.query(rsl_number=submission) equipment = Equipment.query(asset_number=self.asset_number) @@ -252,7 +304,8 @@ class PydEquipment(BaseModel, extra='ignore'): if process == None: # logger.debug("Adding in unknown process.") from frontend.widgets.pop_ups import QuestionAsker - dlg = QuestionAsker(title="Add Process?", message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?") + dlg = QuestionAsker(title="Add Process?", + message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?") if dlg.exec(): kit = submission.extraction_kit submission_type = submission.submission_type @@ -267,24 +320,33 @@ class PydEquipment(BaseModel, extra='ignore'): assoc = None return equipment, assoc + def improved_dict(self) -> dict: + try: + extras = list(self.model_extra.keys()) + except AttributeError: + extras = [] + fields = list(self.model_fields.keys()) + extras + return {k: getattr(self, k) for k in fields} + + class PydSubmission(BaseModel, extra='allow'): filepath: Path - submission_type: dict|None + submission_type: dict | None # For defaults - submitter_plate_num: dict|None = Field(default=dict(value=None, missing=True), validate_default=True) - submitted_date: dict|None - rsl_plate_num: dict|None = Field(default=dict(value=None, missing=True), validate_default=True) - submitted_date: dict|None - submitting_lab: dict|None - sample_count: dict|None - extraction_kit: dict|None - technician: dict|None - submission_category: dict|None = Field(default=dict(value=None, missing=True), validate_default=True) - comment: dict|None = Field(default=dict(value="", missing=True), validate_default=True) - reagents: List[dict]|List[PydReagent] = [] + submitter_plate_num: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) + submitted_date: dict | None + rsl_plate_num: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) + submitted_date: dict | None + submitting_lab: dict | None + sample_count: dict | None + extraction_kit: dict | None + technician: dict | None + submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) + comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True) + reagents: List[dict] | List[PydReagent] = [] samples: List[PydSample] - equipment: List[PydEquipment]|None =[] - cost_centre: dict|None = Field(default=dict(value=None, missing=True), validate_default=True) + equipment: List[PydEquipment] | None = [] + cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) @field_validator('equipment', mode='before') @classmethod @@ -309,7 +371,7 @@ class PydSubmission(BaseModel, extra='allow'): return dict(value=uuid.uuid4().hex.upper(), missing=True) else: return value - + @field_validator("submitted_date", mode="before") @classmethod def rescue_date(cls, value): @@ -325,31 +387,32 @@ class PydSubmission(BaseModel, extra='allow'): @field_validator("submitted_date") @classmethod def strip_datetime_string(cls, value): - + if isinstance(value['value'], datetime): return value if isinstance(value['value'], date): return value if isinstance(value['value'], int): - return dict(value=datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2).date(), missing=True) + return dict(value=datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2).date(), + missing=True) string = re.sub(r"(_|-)\d$", "", value['value']) try: output = dict(value=parse(string).date(), missing=True) except ParserError as e: logger.error(f"Problem parsing date: {e}") try: - output = dict(value=parse(string.replace("-","")).date(), missing=True) + output = dict(value=parse(string.replace("-", "")).date(), missing=True) except Exception as e: logger.error(f"Problem with parse fallback: {e}") return output - + @field_validator("submitting_lab", mode="before") @classmethod def rescue_submitting_lab(cls, value): if value is None: return dict(value=None, missing=True) return value - + @field_validator("submitting_lab") @classmethod def lookup_submitting_lab(cls, value): @@ -361,13 +424,15 @@ class PydSubmission(BaseModel, extra='allow'): if value['value'] is None: value['missing'] = True from frontend.widgets.pop_ups import ObjectSelector - dlg = ObjectSelector(title="Missing Submitting Lab", message="We need a submitting lab. Please select from the list.", obj_type=Organization) + dlg = ObjectSelector(title="Missing Submitting Lab", + message="We need a submitting lab. Please select from the list.", + obj_type=Organization) if dlg.exec(): value['value'] = dlg.parse_form() else: value['value'] = None return value - + @field_validator("rsl_plate_num", mode='before') @classmethod def rescue_rsl_number(cls, value): @@ -383,7 +448,8 @@ class PydSubmission(BaseModel, extra='allow'): if check_not_nan(value['value']): return value else: - output = RSLNamer(filename=values.data['filepath'].__str__(), sub_type=sub_type, data=values.data).parsed_name + output = RSLNamer(filename=values.data['filepath'].__str__(), sub_type=sub_type, + data=values.data).parsed_name return dict(value=output, missing=True) @field_validator("technician", mode="before") @@ -401,14 +467,14 @@ class PydSubmission(BaseModel, extra='allow'): return value else: return dict(value=convert_nans_to_nones(value['value']), missing=True) - + @field_validator("sample_count", mode='before') @classmethod def rescue_sample_count(cls, value): if value == None: return dict(value=None, missing=True) return value - + @field_validator("extraction_kit", mode='before') @classmethod def rescue_kit(cls, value): @@ -422,7 +488,7 @@ class PydSubmission(BaseModel, extra='allow'): if value == None: return dict(value=None, missing=True) return value - + @field_validator("submission_type", mode='before') @classmethod def make_submission_type(cls, value, values): @@ -434,7 +500,7 @@ class PydSubmission(BaseModel, extra='allow'): else: # return dict(value=RSLNamer(instr=values.data['filepath'].__str__()).submission_type.title(), missing=True) return dict(value=RSLNamer.retrieve_submission_type(filename=values.data['filepath']).title(), missing=True) - + @field_validator("submission_category", mode="before") @classmethod def create_category(cls, value): @@ -490,25 +556,28 @@ class PydSubmission(BaseModel, extra='allow'): """ Collapses multiple samples with same submitter id into one with lists for rows, columns. Necessary to prevent trying to create duplicate samples in SQL creation. - """ + """ submitter_ids = list(set([sample.submitter_id for sample in self.samples])) output = [] for id in submitter_ids: - relevants = [item for item in self.samples if item.submitter_id==id] + relevants = [item for item in self.samples if item.submitter_id == id] if len(relevants) <= 1: output += relevants else: rows = [item.row[0] for item in relevants] columns = [item.column[0] for item in relevants] ids = [item.assoc_id[0] for item in relevants] + ranks = [item.submission_rank[0] for item in relevants] dummy = relevants[0] dummy.assoc_id = ids dummy.row = rows dummy.column = columns + dummy.submission_rank = ranks output.append(dummy) self.samples = output - def improved_dict(self, dictionaries:bool=True) -> dict: + # TODO: Return samples, reagents, etc to dictionaries as well. + def improved_dict(self, dictionaries: bool = True) -> dict: """ Adds model_extra to fields. @@ -517,13 +586,18 @@ class PydSubmission(BaseModel, extra='allow'): Returns: dict: This instance as a dictionary - """ + """ fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) if dictionaries: - output = {k:getattr(self, k) for k in fields} + output = {k: getattr(self, k) for k in fields} + output['reagents'] = [item.improved_dict() for item in self.reagents] + output['samples'] = [item.improved_dict() for item in self.samples] + output['equipment'] = [item.improved_dict() for item in self.equipment] else: # logger.debug("Extracting 'value' from attributes") - output = {k:(getattr(self, k) if not isinstance(getattr(self, k), dict) else getattr(self, k)['value']) for k in fields} + output = {k: (getattr(self, k) if not isinstance(getattr(self, k), dict) else getattr(self, k)['value']) for + k in fields} + return output def find_missing(self) -> Tuple[dict, dict]: @@ -532,9 +606,9 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Tuple[dict, dict]: Dict for missing info, dict for missing reagents. - """ - info = {k:v for k,v in self.improved_dict().items() if isinstance(v, dict)} - missing_info = {k:v for k,v in info.items() if v['missing']} + """ + info = {k: v for k, v in self.improved_dict().items() if isinstance(v, dict)} + missing_info = {k: v for k, v in info.items() if v['missing']} missing_reagents = [reagent for reagent in self.reagents if reagent.missing] return missing_info, missing_reagents @@ -544,10 +618,11 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Tuple[BasicSubmission, Result]: BasicSubmission instance, result object - """ + """ # self.__dict__.update(self.model_extra) dicto = self.improved_dict() - instance, code, msg = BasicSubmission.query_or_create(submission_type=self.submission_type['value'], rsl_plate_num=self.rsl_plate_num['value']) + instance, code, msg = BasicSubmission.query_or_create(submission_type=self.submission_type['value'], + rsl_plate_num=self.rsl_plate_num['value']) result = Result(msg=msg, code=code) self.handle_duplicate_samples() logger.debug(f"Here's our list of duplicate removed samples: {self.samples}") @@ -574,9 +649,19 @@ class PydSubmission(BaseModel, extra='allow'): equip, association = equip.toSQL(submission=instance) if association != None: association.save() - logger.debug(f"Equipment association SQL object to be added to submission: {association.__dict__}") + logger.debug( + f"Equipment association SQL object to be added to submission: {association.__dict__}") instance.submission_equipment_associations.append(association) - # TODO: case item if item in instance.jsons() + case item if item in instance.jsons(): + logger.debug(f"{item} is a json.") + try: + ii = value.items() + except AttributeError: + ii = {} + for k, v in ii: + if isinstance(v, datetime): + value[k] = v.strftime("%Y-%m-%d %H:%M:%S") + instance.set_attribute(key=key, value=value) case _: try: instance.set_attribute(key=key, value=value) @@ -598,7 +683,8 @@ class PydSubmission(BaseModel, extra='allow'): # Apply any discounts that are applicable for client and kit. try: logger.debug("Checking and applying discounts...") - discounts = [item.amount for item in Discount.query(kit_type=instance.extraction_kit, organization=instance.submitting_lab)] + discounts = [item.amount for item in + Discount.query(kit_type=instance.extraction_kit, organization=instance.submitting_lab)] logger.debug(f"We got discounts: {discounts}") if len(discounts) > 0: discounts = sum(discounts) @@ -613,8 +699,8 @@ class PydSubmission(BaseModel, extra='allow'): logger.debug(f"Something went wrong constructing instance {self.rsl_plate_num}: {e}") logger.debug(f"Constructed submissions message: {msg}") return instance, result - - def toForm(self, parent:QWidget): + + def toForm(self, parent: QWidget): """ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget @@ -623,11 +709,11 @@ class PydSubmission(BaseModel, extra='allow'): Returns: SubmissionFormWidget: Submission form widget - """ + """ from frontend.widgets.submission_widget import SubmissionFormWidget return SubmissionFormWidget(parent=parent, submission=self) - def autofill_excel(self, missing_only:bool=True, backup:bool=False) -> Workbook: + def autofill_excel(self, missing_only: bool = True, backup: bool = False) -> Workbook: """ Fills in relevant information/reagent cells in an excel workbook. @@ -637,13 +723,13 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Workbook: Filled in workbook - """ + """ # open a new workbook using openpyxl if self.filepath.stem.startswith("tmp"): template = SubmissionType.query(name=self.submission_type['value']).template_file workbook = load_workbook(BytesIO(template)) missing_only = False - else: + else: try: workbook = load_workbook(self.filepath) except Exception as e: @@ -654,12 +740,12 @@ class PydSubmission(BaseModel, extra='allow'): if missing_only: info, reagents = self.find_missing() else: - info = {k:v for k,v in self.improved_dict().items() if isinstance(v, dict)} + info = {k: v for k, v in self.improved_dict().items() if isinstance(v, dict)} reagents = self.reagents - if len(reagents + list(info.keys())) == 0: # logger.warning("No info to fill in, returning") return None + logger.debug(f"Info: {pformat(info)}") logger.debug(f"We have blank info and/or reagents in the excel sheet.\n\tLet's try to fill them in.") # extraction_kit = lookup_kit_types(ctx=self.ctx, name=self.extraction_kit['value']) extraction_kit = KitType.query(name=self.extraction_kit['value']) @@ -687,7 +773,7 @@ class PydSubmission(BaseModel, extra='allow'): new_reagents.append(new_reagent) new_info = [] # logger.debug("Constructing info map and values") - for k,v in info.items(): + for k, v in info.items(): try: new_item = {} new_item['type'] = k @@ -708,33 +794,38 @@ class PydSubmission(BaseModel, extra='allow'): # get list of sheet names for sheet in workbook.sheetnames: # open sheet - worksheet=workbook[sheet] + worksheet = workbook[sheet] # Get relevant reagents for that sheet sheet_reagents = [item for item in new_reagents if sheet in item['sheet']] for reagent in sheet_reagents: # logger.debug(f"Attempting to write lot {reagent['lot']['value']} in: row {reagent['lot']['row']}, column {reagent['lot']['column']}") - worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value']) + worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], + value=reagent['lot']['value']) # logger.debug(f"Attempting to write expiry {reagent['expiry']['value']} in: row {reagent['expiry']['row']}, column {reagent['expiry']['column']}") if isinstance(reagent['expiry']['value'], date) and reagent['expiry']['value'].year == 1970: reagent['expiry']['value'] = "NA" - worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value']) + worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], + value=reagent['expiry']['value']) try: # logger.debug(f"Attempting to write name {reagent['name']['value']} in: row {reagent['name']['row']}, column {reagent['name']['column']}") - worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value']) + worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], + value=reagent['name']['value']) except Exception as e: logger.error(f"Could not write name {reagent['name']['value']} due to {e}") # Get relevant info for that sheet new_info = [item for item in new_info if isinstance(item['location'], dict)] - sheet_info = [item for item in new_info if sheet in item['location']['sheets']] + logger.debug(f"New info: {pformat(new_info)}") + sheet_info = [item for item in new_info if item['location']['sheet'] == sheet] for item in sheet_info: - logger.debug(f"Attempting: {item['type']} in row {item['location']['row']}, column {item['location']['column']}") + logger.debug( + f"Attempting: {item['type']} in row {item['location']['row']}, column {item['location']['column']}") worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) # Hacky way to pop in 'signed by' custom_parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type['value']) workbook = custom_parser.custom_autofill(workbook, info=self.improved_dict(), backup=backup) return workbook - - def autofill_samples(self, workbook:Workbook) -> Workbook: + + def autofill_samples(self, workbook: Workbook) -> Workbook: """ Fill in sample rows on the excel sheet @@ -743,8 +834,9 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Workbook: Updated excel workbook - """ - sample_info = SubmissionType.query(name=self.submission_type['value']).info_map['samples'] + """ + # sample_info = SubmissionType.query(name=self.submission_type['value']).info_map['samples'] + sample_info = SubmissionType.query(name=self.submission_type['value']).construct_sample_map() logger.debug(f"Sample info: {pformat(sample_info)}") logger.debug(f"Workbook sheets: {workbook.sheetnames}") worksheet = workbook[sample_info["lookup_table"]['sheet']] @@ -762,10 +854,11 @@ class PydSubmission(BaseModel, extra='allow'): logger.debug(f"Writing to {row}") if row == None: row = sample_info['lookup_table']['start_row'] + iii - fields = [field for field in list(sample.model_fields.keys()) + list(sample.model_extra.keys()) if field in sample_info['sample_columns'].keys()] + fields = [field for field in list(sample.model_fields.keys()) + + list(sample.model_extra.keys()) if field in sample_info['lookup_table']['sample_columns'].keys()] logger.debug(f"Here are the fields we are going to fill:\n\t{fields}") for field in fields: - column = sample_info['sample_columns'][field] + column = sample_info['lookup_table']['sample_columns'][field] value = getattr(sample, field) match value: case list(): @@ -776,8 +869,8 @@ class PydSubmission(BaseModel, extra='allow'): value = row_map[value] worksheet.cell(row=row, column=column, value=value) return workbook - - def autofill_equipment(self, workbook:Workbook) -> Workbook: + + def autofill_equipment(self, workbook: Workbook) -> Workbook: """ Fill in equipment on the excel sheet @@ -786,7 +879,7 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Workbook: Updated excel workbook - """ + """ equipment_map = SubmissionType.query(name=self.submission_type['value']).construct_equipment_map() logger.debug(f"Equipment map: {equipment_map}") # See if all equipment has a location map @@ -820,23 +913,31 @@ class PydSubmission(BaseModel, extra='allow'): case _: pass worksheet.cell(row=rel['name']['row'], column=rel['name']['column'], value=rel['name']['value']) - worksheet.cell(row=rel['process']['row'], column=rel['process']['column'], value=rel['process']['value']) + worksheet.cell(row=rel['process']['row'], column=rel['process']['column'], + value=rel['process']['value']) return workbook + def toWriter(self): + from backend.excel.writer import SheetWriter + return SheetWriter(self) + def construct_filename(self) -> str: """ Creates filename for this instance Returns: str: Output filename - """ - template = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type).filename_template() + """ + template = BasicSubmission.find_polymorphic_subclass( + polymorphic_identity=self.submission_type).filename_template() # logger.debug(f"Using template string: {template}") - render = RSLNamer.construct_export_name(template=template, **self.improved_dict(dictionaries=False)).replace("/", "") + render = RSLNamer.construct_export_name(template=template, **self.improved_dict(dictionaries=False)).replace( + "/", "") # logger.debug(f"Template rendered as: {render}") return render - def check_kit_integrity(self, reagenttypes:list=[], extraction_kit:str|dict|None=None) -> Tuple[List[PydReagent], Report]: + def check_kit_integrity(self, reagenttypes: list = [], extraction_kit: str | dict | None = None) -> Tuple[ + List[PydReagent], Report]: """ Ensures all reagents expected in kit are listed in Submission @@ -845,13 +946,13 @@ class PydSubmission(BaseModel, extra='allow'): Returns: Report: Result object containing a message and any missing components. - """ + """ report = Report() logger.debug(f"Extraction kit: {extraction_kit}. Is it a string? {isinstance(extraction_kit, str)}") if isinstance(extraction_kit, str): extraction_kit = dict(value=extraction_kit) if extraction_kit is not None and extraction_kit != self.extraction_kit['value']: - self.extraction_kit['value'] = extraction_kit['value'] + self.extraction_kit['value'] = extraction_kit['value'] # reagenttypes = [] # else: # reagenttypes = [item.type for item in self.reagents] @@ -859,7 +960,8 @@ class PydSubmission(BaseModel, extra='allow'): # reagenttypes = [item.type for item in self.reagents] logger.debug(f"Looking up {self.extraction_kit['value']}") ext_kit = KitType.query(name=self.extraction_kit['value']) - ext_kit_rtypes = [item.to_pydantic() for item in ext_kit.get_reagents(required=True, submission_type=self.submission_type['value'])] + ext_kit_rtypes = [item.to_pydantic() for item in + ext_kit.get_reagents(required=True, submission_type=self.submission_type['value'])] logger.debug(f"Kit reagents: {ext_kit_rtypes}") logger.debug(f"Submission reagents: {self.reagents}") # check if lists are equal @@ -883,18 +985,20 @@ class PydSubmission(BaseModel, extra='allow'): output_reagents += [rt for rt in missing_reagents if rt not in output_reagents] logger.debug(f"Missing reagents types: {missing_reagents}") # if lists are equal return no problem - if len(missing_reagents)==0: + if len(missing_reagents) == 0: result = None else: - result = Result(msg=f"The excel sheet you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.type.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", status="Warning") + result = Result( + msg=f"The excel sheet you are importing is missing some reagents expected by the kit.\n\nIt looks like you are missing: {[item.type.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kit.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", + status="Warning") report.add_result(result) return output_reagents, report + class PydContact(BaseModel): - name: str - phone: str|None - email: str|None + phone: str | None + email: str | None def toSQL(self) -> Contact: """ @@ -902,14 +1006,14 @@ class PydContact(BaseModel): Returns: Contact: Contact instance - """ + """ return Contact(name=self.name, phone=self.phone, email=self.email) -class PydOrganization(BaseModel): +class PydOrganization(BaseModel): name: str cost_centre: str - contacts: List[PydContact]|None + contacts: List[PydContact] | None def toSQL(self) -> Organization: """ @@ -917,7 +1021,7 @@ class PydOrganization(BaseModel): Returns: Organization: Organization instance - """ + """ instance = Organization() for field in self.model_fields: match field: @@ -929,12 +1033,12 @@ class PydOrganization(BaseModel): instance.__setattr__(name=field, value=value) return instance -class PydReagentType(BaseModel): +class PydReagentType(BaseModel): name: str - eol_ext: timedelta|int|None - uses: dict|None - required: int|None = Field(default=1) + eol_ext: timedelta | int | None + uses: dict | None + required: int | None = Field(default=1) @field_validator("eol_ext") @classmethod @@ -942,8 +1046,8 @@ class PydReagentType(BaseModel): if isinstance(value, int): return timedelta(days=value) return value - - def toSQL(self, kit:KitType) -> ReagentType: + + def toSQL(self, kit: KitType) -> ReagentType: """ Converts this instance into a backend.db.models.ReagentType instance @@ -952,7 +1056,7 @@ class PydReagentType(BaseModel): Returns: ReagentType: ReagentType instance - """ + """ instance: ReagentType = ReagentType.query(name=self.name) if instance == None: instance = ReagentType(name=self.name, eol_ext=self.eol_ext) @@ -962,11 +1066,12 @@ class PydReagentType(BaseModel): except StatementError: assoc = None if assoc == None: - assoc = KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=instance, uses=self.uses, required=self.required) + assoc = KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=instance, uses=self.uses, + required=self.required) return instance - -class PydKit(BaseModel): + +class PydKit(BaseModel): name: str reagent_types: List[PydReagentType] = [] @@ -976,7 +1081,7 @@ class PydKit(BaseModel): Returns: Tuple[KitType, Report]: KitType instance and report of results. - """ + """ report = Report() instance = KitType.query(name=self.name) if instance == None: @@ -984,13 +1089,13 @@ class PydKit(BaseModel): [item.toSQL(instance) for item in self.reagent_types] return instance, report -class PydEquipmentRole(BaseModel): +class PydEquipmentRole(BaseModel): name: str equipment: List[PydEquipment] - processes: List[str]|None - - def toForm(self, parent, used:list) -> "RoleComboBox": + processes: List[str] | None + + def toForm(self, parent, used: list) -> "RoleComboBox": """ Creates a widget for user input into this class. @@ -1000,7 +1105,6 @@ class PydEquipmentRole(BaseModel): Returns: RoleComboBox: widget - """ + """ from frontend.widgets.equipment_usage import RoleComboBox return RoleComboBox(parent=parent, role=self, used=used) - \ No newline at end of file diff --git a/src/submissions/frontend/widgets/submission_widget.py b/src/submissions/frontend/widgets/submission_widget.py index 50ee364..21417cc 100644 --- a/src/submissions/frontend/widgets/submission_widget.py +++ b/src/submissions/frontend/widgets/submission_widget.py @@ -106,8 +106,8 @@ class SubmissionFormContainer(QWidget): logger.debug(f"Pydantic result: \n\n{pformat(self.pyd)}\n\n") self.form = self.pyd.toForm(parent=self) self.layout().addWidget(self.form) - if self.prsr.sample_result != None: - report.add_result(msg=self.prsr.sample_result, status="Warning") + # if self.prsr.sample_result != None: + # report.add_result(msg=self.prsr.sample_result, status="Warning") self.report.add_result(report) logger.debug(f"Outgoing report: {self.report.results}") logger.debug(f"All attributes of submission container:\n{pformat(self.__dict__)}") diff --git a/src/submissions/tools.py b/src/submissions/tools.py index 9cea7c7..e453189 100644 --- a/src/submissions/tools.py +++ b/src/submissions/tools.py @@ -38,6 +38,7 @@ CONFIGDIR = main_aux_dir.joinpath("config") LOGDIR = main_aux_dir.joinpath("logs") row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"} +row_keys = {v:k for k,v in row_map.items()} def check_not_nan(cell_contents) -> bool: """ @@ -369,10 +370,10 @@ def setup_logger(verbosity:int=3): Path(LOGDIR).mkdir(parents=True) except FileExistsError: pass - fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False) + # fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False) # file logging will always be debug - fh.setLevel(logging.DEBUG) - fh.name = "File" + # fh.setLevel(logging.DEBUG) + # fh.name = "File" # create console handler with a higher log level # create custom logger with STERR -> log ch = logging.StreamHandler(stream=sys.stdout) @@ -388,10 +389,10 @@ def setup_logger(verbosity:int=3): # create formatter and add it to the handlers # formatter = logging.Formatter('%(asctime)s - %(levelname)s - {%(pathname)s:%(lineno)d} - %(message)s') formatter = CustomFormatter() - fh.setFormatter(formatter) + # fh.setFormatter(formatter) ch.setFormatter(formatter) # add the handlers to the logger - logger.addHandler(fh) + # logger.addHandler(fh) logger.addHandler(ch) # Output exception and traceback to logger def handle_exception(exc_type, exc_value, exc_traceback): @@ -556,6 +557,11 @@ def html_to_pdf(html, output_file:Path|str): printer.setPageSize(QPageSize(QPageSize.PageSizeId.A4)) document.print(printer) +def remove_key_from_list_of_dicts(input:list, key:str): + for item in input: + del item[key] + return input + ctx = get_config(None) def is_power_user() -> bool: