diff --git a/CHANGELOG.md b/CHANGELOG.md index 154782e..4c373d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 202307.03 + +- Auto-filling of some empty cells in excel file. +- Better pydantic validations of missing data. + ## 202307.02 - Better column counting for cost recovery purposes. diff --git a/TODO.md b/TODO.md index 215f79f..22398ad 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,4 @@ +- [x] Put in SN controls I guess. - [x] Code clean-up and refactor (2023-07). - [ ] Migrate context settings to pydantic-settings model. - [x] Insert column into reagent type to indicate if reagent is required for kit. diff --git a/src/submissions/__init__.py b/src/submissions/__init__.py index 2c63dfe..b188763 100644 --- a/src/submissions/__init__.py +++ b/src/submissions/__init__.py @@ -4,7 +4,7 @@ from pathlib import Path # Version of the realpython-reader package __project__ = "submissions" -__version__ = "202307.2b" +__version__ = "202307.3b" __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __copyright__ = "2022-2023, Government of Canada" diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py index d3ee7ea..5976b38 100644 --- a/src/submissions/backend/db/functions.py +++ b/src/submissions/backend/db/functions.py @@ -45,7 +45,7 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d from tools import RSLNamer logger.debug(f"Hello from store_submission") # Add all samples to sample table - typer = RSLNamer(base_submission.rsl_plate_num) + typer = RSLNamer(ctx=ctx, instr=base_submission.rsl_plate_num) base_submission.rsl_plate_num = typer.parsed_name for sample in base_submission.samples: logger.debug(f"Typer: {typer.submission_type}") @@ -116,7 +116,7 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio return instance, {'code': 2, 'message': "A proper RSL plate number is required."} else: # enforce conventions on the rsl plate number from the form - info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name + info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name # check database for existing object instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() # get model based on submission type converted above diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index a751154..9591322 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -38,7 +38,7 @@ class SheetParser(object): self.filepath = filepath # Open excel file try: - self.xl = pd.ExcelFile(filepath.__str__()) + self.xl = pd.ExcelFile(filepath) except ValueError as e: logger.error(f"Incorrect value: {e}") self.xl = None @@ -95,7 +95,7 @@ class SheetParser(object): submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object) self.sub['submitter_plate_num'] = submission_info.iloc[0][1] if check_not_nan(submission_info.iloc[10][1]): - self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name + self.sub['rsl_plate_num'] = RSLNamer(ctx=self.ctx, instr=submission_info.iloc[10][1]).parsed_name else: # self.sub['rsl_plate_num'] = RSLNamer(self.filepath).parsed_name self.sub['rsl_plate_num'] = None @@ -103,6 +103,10 @@ class SheetParser(object): self.sub['submitting_lab'] = submission_info.iloc[0][3] self.sub['sample_count'] = submission_info.iloc[2][3] self.sub['extraction_kit'] = submission_info.iloc[3][3] + if check_not_nan(submission_info.iloc[1][3]): + self.sub['submission_type'] = dict(value=submission_info.iloc[1][3], parsed=True) + else: + self.sub['submission_type'] = dict(value=self.sub['submission_type'], parsed=False) return submission_info def parse_bacterial_culture(self) -> None: @@ -170,7 +174,7 @@ class SheetParser(object): parse_reagents(reagent_range) # get individual sample info sample_parser = SampleParser(self.ctx, submission_info.iloc[16:112]) - sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") + sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}_samples") logger.debug(f"Parser result: {self.sub}") self.sample_result, self.sub['samples'] = sample_parse() @@ -234,14 +238,18 @@ class SheetParser(object): # set qpcr reagent range pcr_reagent_range = qprc_info.iloc[0:5, 9:20] # compile technician info from all sheets - self.sub['technician'] = f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}" + if all(map(check_not_nan, [enrichment_info.columns[2], extraction_info.columns[2], qprc_info.columns[2]])): + parsed = True + else: + parsed = False + self.sub['technician'] = dict(value=f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}", parsed=parsed) self.sub['reagents'] = [] parse_reagents(enr_reagent_range) parse_reagents(ext_reagent_range) parse_reagents(pcr_reagent_range) # parse samples sample_parser = SampleParser(self.ctx, submission_info.iloc[16:], elution_map=retrieve_elution_map()) - sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") + sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].lower()}_samples") self.sample_result, self.sub['samples'] = sample_parse() self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object) @@ -249,6 +257,7 @@ class SheetParser(object): """ pulls info specific to wastewater_arctic submission type """ + self.sub['submission_type'] = dict(value=self.sub['submission_type'], parsed=True) def parse_reagents(df:pd.DataFrame): logger.debug(df) for ii, row in df.iterrows(): @@ -306,7 +315,7 @@ class SheetParser(object): sub_reagent_range = submission_info.iloc[56:, 1:4].dropna(how='all') biomek_reagent_range = biomek_info.iloc[60:, 0:3].dropna(how='all') self.sub['submitter_plate_num'] = "" - self.sub['rsl_plate_num'] = RSLNamer(self.filepath.__str__()).parsed_name + self.sub['rsl_plate_num'] = RSLNamer(ctx=self.ctx, instr=self.filepath.__str__()).parsed_name self.sub['submitted_date'] = biomek_info.iloc[1][1] self.sub['submitting_lab'] = "Enterics Wastewater Genomics" self.sub['sample_count'] = submission_info.iloc[4][6] @@ -317,7 +326,7 @@ class SheetParser(object): parse_reagents(biomek_reagent_range) samples = massage_samples(biomek_info.iloc[22:31, 0:]) sample_parser = SampleParser(self.ctx, pd.DataFrame.from_records(samples)) - sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") + sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].lower()}_samples") self.sample_result, self.sub['samples'] = sample_parse() def to_pydantic(self) -> PydSubmission: @@ -497,7 +506,7 @@ class PCRParser(object): return # self.pcr = OrderedDict() self.pcr = {} - namer = RSLNamer(filepath.__str__()) + namer = RSLNamer(ctx=self.ctx, instr=filepath.__str__()) self.plate_num = namer.parsed_name self.submission_type = namer.submission_type logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py index 4adfa57..b91f0fe 100644 --- a/src/submissions/backend/pydant/__init__.py +++ b/src/submissions/backend/pydant/__init__.py @@ -47,14 +47,14 @@ class PydReagent(BaseModel): class PydSubmission(BaseModel, extra=Extra.allow): ctx: dict filepath: Path - submission_type: str + submission_type: str|dict|None submitter_plate_num: str|None rsl_plate_num: str|dict|None submitted_date: date submitting_lab: str|None sample_count: int extraction_kit: str|dict|None - technician: str|None + technician: str|dict|None reagents: List[PydReagent] = [] samples: List[Any] # missing_fields: List[str] = [] @@ -91,14 +91,20 @@ class PydSubmission(BaseModel, extra=Extra.allow): else: return value else: - logger.debug(f"Pydant values:{type(values)}\n{values}") - return dict(value=RSLNamer(values.data['filepath'].__str__()).parsed_name, parsed=False) + # logger.debug(f"Pydant values:{type(values)}\n{values}") + return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).parsed_name, parsed=False) - @field_validator("technician") + @field_validator("technician", mode="before") @classmethod def enforce_tech(cls, value): - if value == "nan" or value == "None": - value = "Unknown" + if check_not_nan(value): + if isinstance(value, dict): + value['value'] = re.sub(r"\: \d", "", value['value']) + return value + else: + return dict(value=re.sub(r"\: \d", "", value), parsed=True) + else: + return dict(value="Unnamed", parsed=False) return value @field_validator("reagents") @@ -140,6 +146,19 @@ class PydSubmission(BaseModel, extra=Extra.allow): return dict(value=dlg.getValues(), parsed=False) else: raise ValueError("Extraction kit needed.") + + + @field_validator("submission_type", mode='before') + @classmethod + def make_submission_type(cls, value, values): + if check_not_nan(value): + if isinstance(value, dict): + value['value'] = value['value'].title() + return value + elif isinstance(value, str): + return dict(value=value.title(), parsed=False) + else: + return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False) # @model_validator(mode="after") # def ensure_kit(cls, values): diff --git a/src/submissions/frontend/all_window_functions.py b/src/submissions/frontend/all_window_functions.py index 0408902..59a9149 100644 --- a/src/submissions/frontend/all_window_functions.py +++ b/src/submissions/frontend/all_window_functions.py @@ -35,7 +35,10 @@ def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path: Returns: Path: Path of file to be opened """ - home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() + try: + home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() + except FileNotFoundError: + home_dir = Path.home().resolve().__str__() fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0]) return fname diff --git a/src/submissions/frontend/custom_widgets/sub_details.py b/src/submissions/frontend/custom_widgets/sub_details.py index 9173753..f1162b2 100644 --- a/src/submissions/frontend/custom_widgets/sub_details.py +++ b/src/submissions/frontend/custom_widgets/sub_details.py @@ -314,7 +314,12 @@ class SubmissionDetails(QDialog): self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8') logger.debug(self.base_dict) html = template.render(sub=self.base_dict) - home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__() + with open("test.html", "w") as f: + f.write(html) + try: + home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__() + except FileNotFoundError: + home_dir = Path.home().resolve().__str__() fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0]) if fname.__str__() == ".": logger.debug("Saving pdf was cancelled.") diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index fc3c7ee..e1b379a 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -87,7 +87,7 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] # logger.debug(f"Sample from import: {sample.elution_well}") # I don't remember why this is here. missing_info = [k for k,v in pyd if v == None] - obj.current_submission_type = pyd.submission_type + obj.current_submission_type = pyd.submission_type['value'] # destroy any widgets from previous imports for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): item.setParent(None) @@ -98,9 +98,12 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] for field in fields: value = getattr(pyd, field) logger.debug(f"Checking: {field}: {value}") - # No longer necessary with addition of pydantic validations - # if not check_not_nan(value): - # continue + # Get from pydantic model whether field was completed in the form + if isinstance(value, dict) and field != 'ctx': + logger.debug(f"The field {field} is a dictionary: {value}") + if not value['parsed']: + missing_info.append(field) + value = value['value'] match field: case 'submitting_lab': # create label @@ -120,19 +123,18 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] # create label label = QLabel(field.replace("_", " ").title()) # if extraction kit not available, all other values fail - if not check_not_nan(value['value']): + if not check_not_nan(value): msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") msg.exec() - if not value['parsed']: - missing_info.append(field) # create combobox to hold looked up kits add_widget = QComboBox() # lookup existing kits by 'submission_type' decided on by sheetparser - uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type)] + uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type['value'].lower())] + logger.debug(f"Kits received for {pyd.submission_type}: {uses}") if check_not_nan(value): - logger.debug(f"The extraction kit in parser was: {value['value']}") - uses.insert(0, uses.pop(uses.index(value['value']))) - obj.ext_kit = value['value'] + logger.debug(f"The extraction kit in parser was: {value}") + uses.insert(0, uses.pop(uses.index(value))) + obj.ext_kit = value else: logger.error(f"Couldn't find {prsr.sub['extraction_kit']}") obj.ext_kit = uses[0] @@ -173,13 +175,11 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] obj.table_widget.formlayout.addWidget(add_widget) obj.reagents[reagent.type] = reagent continue - case "rsl_plate_num": - if not value['parsed']: - missing_info.append(field) - label = QLabel(field.replace("_", " ").title()) - add_widget = QLineEdit() - logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}") - add_widget.setText(str(value['value']).replace("_", " ")) + # case "rsl_plate_num": + # label = QLabel(field.replace("_", " ").title()) + # add_widget = QLineEdit() + # logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}") + # add_widget.setText(str(value['value']).replace("_", " ")) case _: # anything else gets added in as a line edit label = QLabel(field.replace("_", " ").title()) @@ -352,8 +352,8 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.") extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit) logger.debug(f"We have the extraction kit: {extraction_kit.name}") - logger.debug(f"Extraction kit map:\n\n{extraction_kit.used_for[obj.current_submission_type]}") - excel_map = extraction_kit.used_for[obj.current_submission_type] + logger.debug(f"Extraction kit map:\n\n{extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')]}") + excel_map = extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')] input_reagents = [item.to_reagent_dict() for item in parsed_reagents] autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info) if hasattr(obj, 'csv'): @@ -823,6 +823,7 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re xl_map (dict): Map of where each reagent goes in the excel workbook. reagents (List[dict]): All reagents used in the kit. missing_reagents (List[str]): Reagents that are required for the kit that were not present. + info (dict): Dictionary of misc info from submission """ # logger.debug(reagents) @@ -831,6 +832,7 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re relevant_map = {k:v for k,v in xl_map.items() if k in missing_reagents} # logger.debug(relevant_map) relevant_reagents = [item for item in reagents if item['type'] in missing_reagents] + info['submission_type'] = info['submission_type'].replace("_", " ").title() relevant_info = {k:v for k,v in info.items() if k in missing_reagents} logger.debug(f"Here is the relevant info: {pprint.pformat(relevant_info)}") # logger.debug(f"Relevant reagents:\n{relevant_reagents}") @@ -844,6 +846,11 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re new_reagent['expiry'] = relevant_map[new_reagent['type']]['expiry'] new_reagent['expiry']['value'] = reagent['expiry'] new_reagent['sheet'] = relevant_map[new_reagent['type']]['sheet'] + try: + new_reagent['name'] = relevant_map[new_reagent['type']]['name'] + new_reagent['name']['value'] = reagent['type'] + except: + pass new_reagents.append(new_reagent) new_info = [] for item in relevant_info: @@ -858,14 +865,20 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re logger.debug(workbook.sheetnames) for sheet in sheets: worksheet=workbook[sheet] - sheet_reagents = [item for item in new_reagents if item['sheet'] == sheet] + sheet_reagents = [item for item in new_reagents if sheet in item['sheet']] for reagent in sheet_reagents: logger.debug(f"Attempting: {reagent['type']}:") worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value']) worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value']) - sheet_info = [item for item in new_info if item['location']['sheet'] == sheet] + try: + worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value'].replace("_", " ").upper()) + except: + pass + sheet_info = [item for item in new_info if sheet in item['location']['sheets']] for item in sheet_info: logger.debug(f"Attempting: {item['type']}") worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) - fname = select_save_file(obj=obj, default_name=Path(obj.xl).stem, extension="xlsx") + if info['submission_type'] == "Bacterial Culture": + workbook["Sample List"].cell(row=14, column=2, value=getuser()) + fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") workbook.save(filename=fname.__str__()) diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 27cedd8..6ee8b4b 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -10,6 +10,7 @@ import getpass from backend.db.models import BasicSubmission, KitType import pandas as pd from typing import Tuple +from datetime import datetime logger = logging.getLogger(f"submissions.{__name__}") @@ -25,13 +26,16 @@ def check_not_nan(cell_contents) -> bool: """ # check for nan as a string first try: + if "Unnamed:" in cell_contents: + cell_contents = np.nan cell_contents = cell_contents.lower() - except AttributeError: + except (TypeError, AttributeError): pass if cell_contents == 'nan': cell_contents = np.nan if cell_contents == None: cell_contents = np.nan + try: if pd.isnull(cell_contents): cell_contents = np.nan @@ -197,7 +201,8 @@ class RSLNamer(object): """ Object that will enforce proper formatting on RSL plate names. """ - def __init__(self, instr:str): + def __init__(self, ctx:dict, instr:str): + self.ctx = ctx self.retrieve_rsl_number(in_str=instr) if self.submission_type != None: parser = getattr(self, f"enforce_{self.submission_type}") @@ -205,7 +210,7 @@ class RSLNamer(object): self.parsed_name = self.parsed_name.replace("_", "-") - def retrieve_rsl_number(self, in_str:str|Path) -> Tuple[str, str]: + def retrieve_rsl_number(self, in_str:str|Path): """ Uses regex to retrieve the plate number and submission type from an input string @@ -215,37 +220,57 @@ class RSLNamer(object): Returns: Tuple[str, str]: tuple of (output rsl number, submission_type) """ - if isinstance(in_str, Path): - in_str = in_str.stem - logger.debug(f"Attempting split of {in_str}") - try: - in_str = in_str.split("\\")[-1] - except AttributeError: - self.parsed_name = None - self.submission_type = None - return - logger.debug(f"Attempting match of {in_str}") - logger.debug(f"The initial plate name is: {in_str}") + if not isinstance(in_str, Path): + in_str = Path(in_str) + out_str = in_str.stem + # else: + # in_str = Path(in_str) + # logger.debug(f"Attempting split of {in_str}") + # try: + # out_str = in_str.split("\\")[-1] + # except AttributeError: + # self.parsed_name = None + # self.submission_type = None + # return + logger.debug(f"Attempting match of {out_str}") + logger.debug(f"The initial plate name is: {out_str}") regex = re.compile(r""" # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| (?PRSL-?\d{2}-?\d{4})| (?P(\d{4}-\d{2}-\d{2}_(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) """, flags = re.IGNORECASE | re.VERBOSE) - m = regex.search(in_str) - try: - self.parsed_name = m.group().upper().strip(".") - logger.debug(f"Got parsed submission name: {self.parsed_name}") - self.submission_type = m.lastgroup - except AttributeError as e: - logger.critical("No RSL plate number found or submission type found!") - logger.debug(f"The cause of the above error was: {e}") + m = regex.search(out_str) + if m != None: + try: + self.parsed_name = m.group().upper().strip(".") + logger.debug(f"Got parsed submission name: {self.parsed_name}") + self.submission_type = m.lastgroup + except AttributeError as e: + logger.critical("No RSL plate number found or submission type found!") + logger.debug(f"The cause of the above error was: {e}") + else: + logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") + if in_str.exists(): + my_xl = pd.ExcelFile(in_str) + if my_xl.book.properties.category != None: + categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] + self.submission_type = categories[0].replace(" ", "_").lower() + else: + raise AttributeError(f"File {in_str.__str__()} has no categories.") + else: + raise FileNotFoundError() + + def enforce_wastewater(self): """ Uses regex to enforce proper formatting of wastewater samples """ - self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) + try: + self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) + except AttributeError as e: + self.parsed_name = self.construct_wastewater() self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) @@ -267,13 +292,55 @@ class RSLNamer(object): self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") + def construct_wastewater(self): + today = datetime.now() + return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + + def enforce_bacterial_culture(self): """ Uses regex to enforce proper formatting of bacterial culture samples """ - self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) + try: + self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) + except AttributeError as e: + self.parsed_name = self.construct_bacterial_culture_rsl() + # year = datetime.now().year + # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) + + def construct_bacterial_culture_rsl(self) -> str: + """ + DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 + + Returns: + str: new RSL number + """ + logger.debug(f"Attempting to construct RSL number from scratch...") + directory = Path(self.ctx['directory_path']).joinpath("Bacteria") + year = str(datetime.now().year)[-2:] + if directory.exists(): + logger.debug(f"Year: {year}") + relevant_rsls = [] + all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] + logger.debug(f"All rsls: {all_xlsx}") + for item in all_xlsx: + try: + relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) + except Exception as e: + logger.error(f"Regex error: {e}") + continue + logger.debug(f"Initial xlsx: {relevant_rsls}") + max_number = max([int(item[-4:]) for item in relevant_rsls]) + logger.debug(f"The largest sample number is: {max_number}") + return f"RSL-{year}-{str(max_number+1).zfill(4)}" + else: + # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") + return f"RSL-{year}-0000" + + + def enforce_wastewater_artic(self): """ Uses regex to enforce proper formatting of wastewater samples