diff --git a/CHANGELOG.md b/CHANGELOG.md index 86127ee..e2d952e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 202310.03 -- Replaced RSLNamer class with Submission object specific class methods. +- Better flexibility with parsers pulling methods from database objects. ## 202310.02 diff --git a/TODO.md b/TODO.md index b54bae4..690c5a8 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,6 @@ -- [ ] Convert Pydantic models to Submission models? -- [x] Move RSLNamer into Submission database object. - - Having second thoughts about some of this. Move into parser module? +- [ ] Validate form data using pydantic. +- [x] Rebuild RSLNamer and fix circular imports + - Should be used when coming in to parser and when leaving form. NO OTHER PLACES. - [x] Change 'check_is_power_user' to decorator. - [x] Drag and drop files into submission form area? - [ ] Get info for controls into their sample hitpicks. diff --git a/src/submissions/backend/__init__.py b/src/submissions/backend/__init__.py index b2cd2ae..ab7c0c9 100644 --- a/src/submissions/backend/__init__.py +++ b/src/submissions/backend/__init__.py @@ -1,3 +1,3 @@ ''' -Contains database, pydantic and excel operations. +Contains database, validators and excel operations. ''' \ No newline at end of file diff --git a/src/submissions/backend/db/functions/constructions.py b/src/submissions/backend/db/functions/constructions.py index 1717d9d..08aca11 100644 --- a/src/submissions/backend/db/functions/constructions.py +++ b/src/submissions/backend/db/functions/constructions.py @@ -1,12 +1,9 @@ ''' Used to construct models from input dictionaries. ''' -from getpass import getuser + from tools import Settings, check_regex_match, check_authorization, massage_common_reagents from .. import models -# from .misc import RSLNamer -# from backend.namer import RSLNamer -# from .misc import get_polymorphic_subclass from .lookups import * import logging from datetime import date, timedelta @@ -73,10 +70,10 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.Basi instance = None msg = "A proper RSL plate number is required." return instance, {'code': 2, 'message': "A proper RSL plate number is required."} - else: - # enforce conventions on the rsl plate number from the form - # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name - info_dict['rsl_plate_num'] = model.RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"], sub_type=info_dict['submission_type']).parsed_name + # else: + # # enforce conventions on the rsl plate number from the form + # # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name + # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"], sub_type=info_dict['submission_type']).parsed_name # check database for existing object instance = lookup_submissions(ctx=ctx, rsl_number=info_dict['rsl_plate_num']) # get model based on submission type converted above diff --git a/src/submissions/backend/db/functions/lookups.py b/src/submissions/backend/db/functions/lookups.py index 93e6261..2290f27 100644 --- a/src/submissions/backend/db/functions/lookups.py +++ b/src/submissions/backend/db/functions/lookups.py @@ -155,10 +155,13 @@ def lookup_submissions(ctx:Settings, chronologic:bool=False, limit:int=0, **kwargs ) -> models.BasicSubmission | List[models.BasicSubmission]: - if rsl_number == None: + if submission_type == None: model = models.BasicSubmission.find_subclasses(ctx=ctx, attrs=kwargs) else: - model = models.BasicSubmission.find_subclasses(ctx=ctx, rsl_number=rsl_number) + if isinstance(submission_type, models.SubmissionType): + model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type.name) + else: + model = models.BasicSubmission.find_subclasses(ctx=ctx, submission_type=submission_type) query = setup_lookup(ctx=ctx, locals=locals()).query(model) # by submission type match submission_type: @@ -211,14 +214,6 @@ def lookup_submissions(ctx:Settings, # by rsl number (returns only a single value) match rsl_number: case str(): - namer = model.RSLNamer(ctx=ctx, instr=rsl_number) - logger.debug(f"Looking up BasicSubmission with rsl number: {rsl_number}") - try: - rsl_number = namer.parsed_name - logger.debug(f"Got {rsl_number} from {model}.") - except AttributeError as e: - logger.error(f"No parsed name found, returning None.") - return None # query = query.filter(models.BasicSubmission.rsl_plate_num==rsl_number) query = query.filter(model.rsl_plate_num==rsl_number) logger.debug(f"At this point the query gets: {query.all()}") diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 6e8253b..8daca00 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -298,68 +298,21 @@ class BasicSubmission(Base): """ return input_excel - class _RSLNamer(object): - - alias = None - - def __init__(self, ctx:Settings, instr:str|Path, sub_type:str|None=None, parent=None): - if parent != None: - logger.debug(f"Hello from {parent.__mapper_args__['polymorphic_identity']} Namer!") - self.ctx = ctx - self.submission_type = sub_type - self.retrieve_rsl_number(instr=instr) - try: - ncls = [item for item in self.__class__.__subclasses__() if item.alias == self.submission_type][0] - enforcer = ncls.enforce_name - enforcer(self=self, parent=parent) - except IndexError: - enforcer = self.enforce_name - enforcer(parent=parent) - - - def retrieve_rsl_number(self, instr:str|Path): - """ - Uses regex to retrieve the plate number and submission type from an input string - - Args: - in_str (str): string to be parsed - """ - if not isinstance(instr, Path): - instr = Path(instr) - self.out_str = instr.stem - logger.debug(f"Attempting match of {self.out_str}") - logger.debug(f"The initial plate name is: {self.out_str}") - regex = self.construct_regex() - m = regex.search(self.out_str) - if m != None: - self.parsed_name = m.group().upper().strip(".") - logger.debug(f"Got parsed submission name: {self.parsed_name}") - if self.submission_type == None: - try: - self.submission_type = m.lastgroup.replace("_", " ") - except AttributeError as e: - self.submission_type = None - - def enforce_name(self, parent): - if parent != None: - logger.debug(f"Hello from {parent.__mapper_args__['polymorphic_identity']} Enforcer!") - self.parsed_name = self.parsed_name - - @classmethod - def construct_regex(cls): - rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}' - regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) - return regex + @classmethod + def enforce_name(cls, ctx:Settings, instr:str) -> str: + logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!") + return instr @classmethod - def RSLNamer(cls, ctx:Settings, instr:str, sub_type:str|None=None): - return cls._RSLNamer(parent=cls, ctx=ctx, instr=instr, sub_type=sub_type) - + def construct_regex(cls): + rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}' + regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + return regex + @classmethod - def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, rsl_number:str|None=None): - if rsl_number != None: - namer = cls._RSLNamer(ctx=ctx, instr=rsl_number) - return cls.find_polymorphic_subclass(namer.submission_type) + def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, submission_type:str|None=None): + if submission_type != None: + return cls.find_polymorphic_subclass(submission_type) if len(attrs) == 0 or attrs == None: return cls if any([not hasattr(cls, attr) for attr in attrs]): @@ -386,6 +339,11 @@ class BasicSubmission(Base): logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") return cls + @classmethod + def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list: + logger.debug(f"Hello from {cls.__mapper_args__['polymorphic_identity']} PCR parser!") + return [] + # Below are the custom submission types class BacterialCulture(BasicSubmission): @@ -450,59 +408,50 @@ class BacterialCulture(BasicSubmission): input_excel["Sample List"].cell(row=15, column=2, value=getuser()[0:2].upper()) return input_excel - class _RSLNamer(BasicSubmission._RSLNamer): + @classmethod + def enforce_name(cls, ctx:Settings, instr:str) -> str: + outstr = super().enforce_name(ctx=ctx, instr=instr) + def construct(ctx) -> str: + """ + DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 - alias = "Bacterial Culture" + Returns: + str: new RSL number + """ + logger.debug(f"Attempting to construct RSL number from scratch...") + # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") + directory = Path(ctx.directory_path).joinpath("Bacteria") + year = str(datetime.now().year)[-2:] + if directory.exists(): + logger.debug(f"Year: {year}") + relevant_rsls = [] + all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] + logger.debug(f"All rsls: {all_xlsx}") + for item in all_xlsx: + try: + relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) + except Exception as e: + logger.error(f"Regex error: {e}") + continue + logger.debug(f"Initial xlsx: {relevant_rsls}") + max_number = max([int(item[-4:]) for item in relevant_rsls]) + logger.debug(f"The largest sample number is: {max_number}") + return f"RSL-{year}-{str(max_number+1).zfill(4)}" + else: + # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") + return f"RSL-{year}-0000" + try: + outstr = re.sub(r"RSL(\d{2})", r"RSL-\1", outstr, flags=re.IGNORECASE) + except (AttributeError, TypeError) as e: + outstr = construct(ctx=ctx) + # year = datetime.now().year + # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" + return re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", outstr, flags=re.IGNORECASE) - @classmethod - def construct_regex(cls): - rstring = rf'{cls.get_regex()}' - regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) - return regex - - def enforce_name(self, parent): - # super().enforce_name(parent) - def construct(ctx) -> str: - """ - DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 - - Returns: - str: new RSL number - """ - logger.debug(f"Attempting to construct RSL number from scratch...") - # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") - directory = Path(ctx.directory_path).joinpath("Bacteria") - year = str(datetime.now().year)[-2:] - if directory.exists(): - logger.debug(f"Year: {year}") - relevant_rsls = [] - all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] - logger.debug(f"All rsls: {all_xlsx}") - for item in all_xlsx: - try: - relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) - except Exception as e: - logger.error(f"Regex error: {e}") - continue - logger.debug(f"Initial xlsx: {relevant_rsls}") - max_number = max([int(item[-4:]) for item in relevant_rsls]) - logger.debug(f"The largest sample number is: {max_number}") - return f"RSL-{year}-{str(max_number+1).zfill(4)}" - else: - # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") - return f"RSL-{year}-0000" - try: - self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) - except AttributeError as e: - self.parsed_name = construct(ctx=self.ctx) - # year = datetime.now().year - # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" - self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) - - @classmethod - def get_regex(cls): - return "(?PRSL-?\\d{2}-?\\d{4})" - + @classmethod + def get_regex(cls): + return "(?PRSL-?\\d{2}-?\\d{4})" + class Wastewater(BasicSubmission): """ derivative submission type from BasicSubmission @@ -543,49 +492,78 @@ class Wastewater(BasicSubmission): input_dict['csv'] = xl.parse("Copy to import file") return input_dict - class _RSLNamer(BasicSubmission._RSLNamer): - - alias = "Wastewater" - - @classmethod - def construct_regex(cls): - rstring = rf'{cls.get_regex()}' - regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) - return regex - - def enforce_name(self, parent): - # super().enforce_name(parent) - def construct(): - today = datetime.now() - return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + @classmethod + def parse_pcr(cls, xl: pd.ExcelFile, rsl_number:str) -> list: + """ + Parse specific to wastewater samples. + """ + samples = super().parse_pcr(xl=xl, rsl_number=rsl_number) + df = xl.parse(sheet_name="Results", dtype=object).fillna("") + column_names = ["Well", "Well Position", "Omit","Sample","Target","Task"," Reporter","Quencher","Amp Status","Amp Score","Curve Quality","Result Quality Issues","Cq","Cq Confidence","Cq Mean","Cq SD","Auto Threshold","Threshold", "Auto Baseline", "Baseline Start", "Baseline End"] + samples_df = df.iloc[23:][0:] + logger.debug(f"Dataframe of PCR results:\n\t{samples_df}") + samples_df.columns = column_names + logger.debug(f"Samples columns: {samples_df.columns}") + well_call_df = xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:,-1:] + try: + samples_df['Assessment'] = well_call_df.values + except ValueError: + logger.error("Well call number doesn't match sample number") + logger.debug(f"Well call df: {well_call_df}") + for ii, row in samples_df.iterrows(): try: - self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) - except AttributeError as e: - logger.error(f"Problem using regex: {e}") - self.parsed_name = construct() - self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") - self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) - self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) - logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}") + sample_obj = [sample for sample in samples if sample['sample'] == row[3]][0] + except IndexError: + sample_obj = dict( + sample = row['Sample'], + plate_rsl = rsl_number, + ) + logger.debug(f"Got sample obj: {sample_obj}") + if isinstance(row['Cq'], float): + sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] + else: + sample_obj[f"ct_{row['Target'].lower()}"] = 0.0 try: - plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-") - logger.debug(f"Plate number is: {plate_number}") - except AttributeError as e: - plate_number = "1" - # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) - self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name) - logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}") - try: - repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat'] - if repeat == None: - repeat = "1" - except AttributeError as e: - repeat = "" - self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") + sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment'] + except KeyError: + logger.error(f"No assessment for {sample_obj['sample']}") + samples.append(sample_obj) + return samples + + @classmethod + def enforce_name(cls, ctx:Settings, instr:str) -> str: + outstr = super().enforce_name(ctx=ctx, instr=instr) + def construct(): + today = datetime.now() + return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + try: + outstr = re.sub(r"PCR(-|_)", "", outstr) + except AttributeError as e: + logger.error(f"Problem using regex: {e}") + outstr = construct() + outstr = outstr.replace("RSLWW", "RSL-WW") + outstr = re.sub(r"WW(\d{4})", r"WW-\1", outstr, flags=re.IGNORECASE) + outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr) + logger.debug(f"Coming out of the preliminary parsing, the plate name is {outstr}") + try: + plate_number = re.search(r"(?:(-|_)\d)(?!\d)", outstr).group().strip("_").strip("-") + logger.debug(f"Plate number is: {plate_number}") + except AttributeError as e: + plate_number = "1" + # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) + outstr = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", outstr) + logger.debug(f"After addition of plate number the plate name is: {outstr}") + try: + repeat = re.search(r"-\dR(?P\d)?", outstr).groupdict()['repeat'] + if repeat == None: + repeat = "1" + except AttributeError as e: + repeat = "" + return re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "") - @classmethod - def get_regex(cls): - return "(?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)" + @classmethod + def get_regex(cls): + return "(?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)" class WastewaterArtic(BasicSubmission): """ @@ -630,34 +608,25 @@ class WastewaterArtic(BasicSubmission): input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip() return input_dict - class _RSLNamer(BasicSubmission._RSLNamer): + @classmethod + def enforce_name(cls, ctx:Settings, instr:str) -> str: + outstr = super().enforce_name(ctx=ctx, instr=instr) + def construct(): + today = datetime.now() + return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + try: + outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", outstr, flags=re.IGNORECASE) + except AttributeError: + outstr = construct() + try: + plate_number = int(re.search(r"_|-\d?_", outstr).group().strip("_").strip("-")) + except (AttributeError, ValueError) as e: + plate_number = 1 + return re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", outstr) - alias = "Wastewater Artic" - - @classmethod - def construct_regex(cls): - rstring = rf'{cls.get_regex()}' - regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) - return regex - - def enforce_name(self, parent): - # super().enforce_name(parent) - def construct(): - today = datetime.now() - return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" - try: - self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) - except AttributeError: - self.parsed_name = construct() - try: - plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-")) - except (AttributeError, ValueError) as e: - plate_number = 1 - self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name) - - @classmethod - def get_regex(cls): - return "(?P(\\d{4}-\\d{2}-\\d{2}(?:-|_)(?:\\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)\\d?(\\D|$)R?\\d?)?))" + @classmethod + def get_regex(cls): + return "(?P(\\d{4}-\\d{2}-\\d{2}(?:-|_)(?:\\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)\\d?(\\D|$)R?\\d?)?))" class BasicSample(Base): """ @@ -677,7 +646,7 @@ class BasicSample(Base): ) __mapper_args__ = { - "polymorphic_identity": "basic_sample", + "polymorphic_identity": "Basic Sample", # "polymorphic_on": sample_type, "polymorphic_on": case( [ @@ -685,7 +654,7 @@ class BasicSample(Base): (sample_type == "Wastewater Artic Sample", "Wastewater Sample"), (sample_type == "Bacterial Culture Sample", "Bacterial Culture Sample"), ], - else_="basic_sample" + else_="Basic Sample" ), "with_polymorphic": "*", } @@ -862,8 +831,7 @@ class WastewaterSample(BasicSample): return results[0] except IndexError: return None - - + class BacterialCultureSample(BasicSample): """ base of bacterial culture sample diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 99cf3ae..1f8702a 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -7,77 +7,19 @@ from typing import List import pandas as pd from pathlib import Path from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples -from backend.pydant import PydSubmission, PydReagent +from backend.validators import PydSheetSubmission, PydSheetReagent, RSLNamer import logging from collections import OrderedDict import re from datetime import date from dateutil.parser import parse, ParserError from tools import check_not_nan, convert_nans_to_nones, Settings -# from backend.namer import RSLNamer -from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector +from frontend.custom_widgets.pop_ups import KitSelector logger = logging.getLogger(f"submissions.{__name__}") row_keys = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8) -class RSLNamer(object): - """ - Object that will enforce proper formatting on RSL plate names. - NOTE: Depreciated in favour of object based methods in 'submissions.py' - """ - def __init__(self, ctx, instr:str, sub_type:str|None=None): - self.ctx = ctx - self.submission_type = sub_type - self.retrieve_rsl_number(in_str=instr) - if self.submission_type != None: - # custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema - parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}") - parser() - self.parsed_name = self.parsed_name.replace("_", "-") - - def retrieve_rsl_number(self, in_str:str|Path): - """ - Uses regex to retrieve the plate number and submission type from an input string - - Args: - in_str (str): string to be parsed - """ - if not isinstance(in_str, Path): - in_str = Path(in_str) - self.out_str = in_str.stem - logger.debug(f"Attempting match of {self.out_str}") - logger.debug(f"The initial plate name is: {self.out_str}") - # regex = re.compile(r""" - # # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| - # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| - # (?PRSL-?\d{2}-?\d{4})| - # (?P(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) - # """, flags = re.IGNORECASE | re.VERBOSE) - regex = models.BasicSubmission.RSLNamer.construct_regex() - m = regex.search(self.out_str) - if m != None: - self.parsed_name = m.group().upper().strip(".") - logger.debug(f"Got parsed submission name: {self.parsed_name}") - if self.submission_type == None: - try: - self.submission_type = m.lastgroup - except AttributeError as e: - logger.critical("No RSL plate number found or submission type found!") - logger.debug(f"The cause of the above error was: {e}") - logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") - if in_str.exists(): - my_xl = pd.ExcelFile(in_str) - if my_xl.book.properties.category != None: - categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] - self.submission_type = categories[0].replace(" ", "_").lower() - else: - raise AttributeError(f"File {in_str.__str__()} has no categories.") - else: - raise FileNotFoundError() - # else: - # raise ValueError(f"No parsed name could be created for {self.out_str}.") - class SheetParser(object): """ object to pull and contain data from excel file @@ -90,78 +32,34 @@ class SheetParser(object): """ self.ctx = ctx logger.debug(f"Parsing {filepath.__str__()}") - if filepath == None: - logger.error(f"No filepath given.") - self.xl = None - else: - self.filepath = filepath - # Open excel file - try: - self.xl = pd.ExcelFile(filepath) - except ValueError as e: - logger.error(f"Incorrect value: {e}") - self.xl = None + match filepath: + case Path(): + self.filepath = filepath + case str(): + self.filepath = Path(filepath) + case _: + logger.error(f"No filepath given.") + raise ValueError("No filepath given.") + try: + self.xl = pd.ExcelFile(filepath) + except ValueError as e: + logger.error(f"Incorrect value: {e}") + raise FileNotFoundError(f"Couldn't parse file {self.filepath}") self.sub = OrderedDict() # make decision about type of sample we have - self.sub['submission_type'] = self.type_decider() + self.sub['submission_type'] = dict(value=RSLNamer.retrieve_submission_type(ctx=self.ctx, instr=self.filepath), parsed=False) # # grab the info map from the submission type in database self.parse_info() self.import_kit_validation_check() self.parse_reagents() self.import_reagent_validation_check() self.parse_samples() - - - def type_decider(self) -> str: - """ - makes decisions about submission type based on structure of excel file - - Returns: - str: submission type name - """ - # Check metadata for category, return first category - if self.xl.book.properties.category != None: - logger.debug("Using file properties to find type...") - categories = [item.strip().replace("_", " ").title() for item in self.xl.book.properties.category.split(";")] - return dict(value=categories[0], parsed=False) - else: - # This code is going to be depreciated once there is full adoption of the client sheets - # with updated metadata... but how will it work for Artic? - - # sub = get_polymorphic_subclass() - try: - logger.debug(f"Attempting to match file name regex") - namer = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=self.filepath) - return namer.submission_type - except Exception as e: - logger.error(f"Unable to find file name regex match") - logger.debug("Using excel map to find type...") - try: - for type in self.ctx.submission_types: - # This gets the *first* submission type that matches the sheet names in the workbook - if self.xl.sheet_names == self.ctx.submission_types[type]['excel_map']: - return dict(value=type.title(), parsed=False) - return "Unknown" - except Exception as e: - logger.warning(f"We were unable to parse the submission type due to: {e}") - # return "Unknown" - dlg = SubmissionTypeSelector(ctx=self.ctx, title="Select Submission Type", message="We were unable to find the submission type from the excel metadata. Please select from below.") - if dlg.exec(): - return dict(value=dlg.getValues(), parsed=False) - else: - logger.warning(f"Last attempt at getting submission was rejected.") - raise ValueError("Submission Type needed.") def parse_info(self): """ Pulls basic information from the excel sheet """ info = InfoParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_info() - # parser_query = f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}" - # custom_parser = getattr(self, parser_query) - - # except AttributeError: - # logger.error(f"Couldn't find submission parser: {parser_query}") for k,v in info.items(): match k: case "sample": @@ -215,7 +113,7 @@ class SheetParser(object): logger.debug(f"List of reagents for comparison with allowed_reagents: {pprint.pformat(self.sub['reagents'])}") self.sub['reagents'] = [reagent for reagent in self.sub['reagents'] if reagent['value'].type in allowed_reagents] - def to_pydantic(self) -> PydSubmission: + def to_pydantic(self) -> PydSheetSubmission: """ Generates a pydantic model of scraped data for validation @@ -223,7 +121,7 @@ class SheetParser(object): PydSubmission: output pydantic model """ logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pprint.pformat(self.sub)}") - psm = PydSubmission(ctx=self.ctx, filepath=self.filepath, **self.sub) + psm = PydSheetSubmission(ctx=self.ctx, filepath=self.filepath, **self.sub) delattr(psm, "filepath") return psm @@ -249,11 +147,9 @@ class InfoParser(object): if isinstance(submission_type, str): submission_type = dict(value=submission_type, parsed=False) logger.debug(f"Looking up submission type: {submission_type['value']}") - # submission_type = lookup_submissiontype_by_name(ctx=self.ctx, type_name=submission_type['value']) submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value']) info_map = submission_type.info_map # Get the parse_info method from the submission type specified - # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_info self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info return info_map @@ -300,8 +196,6 @@ class InfoParser(object): except KeyError: check = False return self.custom_parser(input_dict=dicto, xl=self.xl) - - class ReagentParser(object): @@ -335,7 +229,7 @@ class ReagentParser(object): lot = df.iat[relevant[item]['lot']['row']-1, relevant[item]['lot']['column']-1] expiry = df.iat[relevant[item]['expiry']['row']-1, relevant[item]['expiry']['column']-1] except (KeyError, IndexError): - listo.append(dict(value=PydReagent(type=item.strip(), lot=None, exp=None, name=None), parsed=False)) + listo.append(dict(value=PydSheetReagent(type=item.strip(), lot=None, exp=None, name=None), parsed=False)) continue if check_not_nan(lot): parsed = True @@ -343,7 +237,7 @@ class ReagentParser(object): parsed = False logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}") lot = str(lot) - listo.append(dict(value=PydReagent(type=item.strip(), lot=lot, exp=expiry, name=name), parsed=parsed)) + listo.append(dict(value=PydSheetReagent(type=item.strip(), lot=lot, exp=expiry, name=name), parsed=parsed)) logger.debug(f"Returning listo: {listo}") return listo @@ -516,12 +410,7 @@ class SampleParser(object): except KeyError: translated_dict[k] = convert_nans_to_nones(v) translated_dict['sample_type'] = f"{self.submission_type} Sample" - # parser_query = f"parse_{translated_dict['sample_type'].replace(' ', '_').lower()}" - # try: - # custom_parser = getattr(self, parser_query) translated_dict = self.custom_parser(translated_dict) - # except AttributeError: - # logger.error(f"Couldn't get custom parser: {parser_query}") if generate: new_samples.append(self.generate_sample_object(translated_dict)) else: @@ -557,65 +446,6 @@ class SampleParser(object): else: logger.debug(f"Sample {instance.submitter_id} already exists, will run update.") return dict(sample=instance, row=input_dict['row'], column=input_dict['column']) - - - # def parse_bacterial_culture_sample(self, input_dict:dict) -> dict: - # """ - # Update sample dictionary with bacterial culture specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # logger.debug("Called bacterial culture sample parser") - # return input_dict - - # def parse_wastewater_sample(self, input_dict:dict) -> dict: - # """ - # Update sample dictionary with wastewater specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # logger.debug(f"Called wastewater sample parser") - # return input_dict - - # def parse_wastewater_artic_sample(self, input_dict:dict) -> dict: - # """ - # Update sample dictionary with artic specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # logger.debug("Called wastewater artic sample parser") - # input_dict['sample_type'] = "Wastewater Sample" - # # Because generate_sample_object needs the submitter_id and the artic has the "({origin well})" - # # at the end, this has to be done here. No moving to sqlalchemy object :( - # input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip() - # return input_dict - - # def parse_first_strand_sample(self, input_dict:dict) -> dict: - # """ - # Update sample dictionary with first strand specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # logger.debug("Called first strand sample parser") - # input_dict['well'] = re.search(r"\s\((.*)\)$", input_dict['submitter_id']).groups()[0] - # input_dict['submitter_id'] = re.sub(r"\s\(.*\)$", "", str(input_dict['submitter_id'])).strip() - # return input_dict def grab_plates(self) -> List[str]: """ @@ -628,7 +458,7 @@ class SampleParser(object): for plate in self.plates: df = self.xl.parse(plate['sheet'], header=None) if isinstance(df.iat[plate['row']-1, plate['column']-1], str): - output = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]).parsed_name + output = RSLNamer.retrieve_rsl_number(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]) else: continue plates.append(output) @@ -637,7 +467,6 @@ class SampleParser(object): class PCRParser(object): """ Object to pull data from Design and Analysis PCR export file. - TODO: Generify this object. """ def __init__(self, ctx:dict, filepath:Path|None = None) -> None: """ @@ -662,16 +491,14 @@ class PCRParser(object): logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.") return # self.pcr = OrderedDict() - self.pcr = {} - namer = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=filepath.__str__()) + self.parse_general(sheet_name="Results") + namer = RSLNamer(ctx=self.ctx, instr=filepath.__str__()) self.plate_num = namer.parsed_name self.submission_type = namer.submission_type logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") - self.samples = [] - parser = getattr(self, f"parse_{self.submission_type}") - parser() + parser = models.BasicSubmission.find_polymorphic_subclass(self.submission_type) + self.samples = parser.parse_pcr(xl=self.xl, rsl_number=self.plate_num) - def parse_general(self, sheet_name:str): """ Parse general info rows for all types of PCR results @@ -679,6 +506,7 @@ class PCRParser(object): Args: sheet_name (str): Name of sheet in excel workbook that holds info. """ + self.pcr = {} df = self.xl.parse(sheet_name=sheet_name, dtype=object).fillna("") self.pcr['comment'] = df.iloc[0][1] self.pcr['operator'] = df.iloc[1][1] @@ -702,42 +530,5 @@ class PCRParser(object): self.pcr['plugin'] = df.iloc[19][1] self.pcr['exported_on'] = df.iloc[20][1] self.pcr['imported_by'] = getuser() - return df - def parse_Wastewater(self): - """ - Parse specific to wastewater samples. - """ - df = self.parse_general(sheet_name="Results") - column_names = ["Well", "Well Position", "Omit","Sample","Target","Task"," Reporter","Quencher","Amp Status","Amp Score","Curve Quality","Result Quality Issues","Cq","Cq Confidence","Cq Mean","Cq SD","Auto Threshold","Threshold", "Auto Baseline", "Baseline Start", "Baseline End"] - self.samples_df = df.iloc[23:][0:] - logger.debug(f"Dataframe of PCR results:\n\t{self.samples_df}") - self.samples_df.columns = column_names - logger.debug(f"Samples columns: {self.samples_df.columns}") - well_call_df = self.xl.parse(sheet_name="Well Call").iloc[24:][0:].iloc[:,-1:] - try: - self.samples_df['Assessment'] = well_call_df.values - except ValueError: - logger.error("Well call number doesn't match sample number") - logger.debug(f"Well call df: {well_call_df}") - for ii, row in self.samples_df.iterrows(): - try: - sample_obj = [sample for sample in self.samples if sample['sample'] == row[3]][0] - except IndexError: - sample_obj = dict( - sample = row['Sample'], - plate_rsl = self.plate_num, - ) - logger.debug(f"Got sample obj: {sample_obj}") - if isinstance(row['Cq'], float): - sample_obj[f"ct_{row['Target'].lower()}"] = row['Cq'] - else: - sample_obj[f"ct_{row['Target'].lower()}"] = 0.0 - try: - sample_obj[f"{row['Target'].lower()}_status"] = row['Assessment'] - except KeyError: - logger.error(f"No assessment for {sample_obj['sample']}") - self.samples.append(sample_obj) - - - + \ No newline at end of file diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py index ef5d4bf..c5ed48b 100644 --- a/src/submissions/backend/excel/reports.py +++ b/src/submissions/backend/excel/reports.py @@ -218,7 +218,5 @@ def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame: df = df.drop(df[df.name == first_run].index) return df - - def make_hitpicks(input:list) -> DataFrame: return DataFrame.from_records(input) \ No newline at end of file diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py new file mode 100644 index 0000000..df96626 --- /dev/null +++ b/src/submissions/backend/validators/__init__.py @@ -0,0 +1,92 @@ +import logging, re +from pathlib import Path +from openpyxl import load_workbook +from backend.db.models import BasicSubmission +from tools import Settings + + +logger = logging.getLogger(f"submissions.{__name__}") + +class RSLNamer(object): + """ + Object that will enforce proper formatting on RSL plate names. + NOTE: Depreciated in favour of object based methods in 'submissions.py' + """ + def __init__(self, ctx, instr:str, sub_type:str|None=None): + self.ctx = ctx + self.submission_type = sub_type + + if self.submission_type == None: + self.submission_type = self.retrieve_submission_type(ctx=self.ctx, instr=instr) + print(self.submission_type) + if self.submission_type != None: + enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) + self.parsed_name = self.retrieve_rsl_number(instr=instr, regex=enforcer.get_regex()) + self.parsed_name = enforcer.enforce_name(ctx=ctx, instr=self.parsed_name) + + @classmethod + def retrieve_submission_type(cls, ctx:Settings, instr:str|Path) -> str: + match instr: + case Path(): + logger.debug(f"Using path method.") + if instr.exists(): + wb = load_workbook(instr) + try: + submission_type = [item.strip().title() for item in wb.properties.category.split(";")][0] + except AttributeError: + try: + for type in ctx.submission_types: + # This gets the *first* submission type that matches the sheet names in the workbook + if wb.sheetnames == ctx.submission_types[type]['excel_map']: + submission_type = type.title() + except: + submission_type = cls.retrieve_submission_type(ctx=ctx, instr=instr.stem.__str__()) + case str(): + regex = BasicSubmission.construct_regex() + logger.debug(f"Using string method.") + m = regex.search(instr) + try: + submission_type = m.lastgroup + except AttributeError as e: + logger.critical("No RSL plate number found or submission type found!") + case _: + submission_type = None + if submission_type == None: + from frontend.custom_widgets import SubmissionTypeSelector + dlg = SubmissionTypeSelector(ctx, title="Couldn't parse submission type.", message="Please select submission type from list below.") + if dlg.exec(): + submission_type = dlg.parse_form() + submission_type = submission_type.replace("_", " ") + return submission_type + + @classmethod + def retrieve_rsl_number(cls, instr:str|Path, regex:str|None=None): + """ + Uses regex to retrieve the plate number and submission type from an input string + + Args: + in_str (str): string to be parsed + """ + if regex == None: + regex = BasicSubmission.construct_regex() + else: + regex = re.compile(rf'{regex}', re.IGNORECASE | re.VERBOSE) + match instr: + case Path(): + m = regex.search(instr.stem) + case str(): + logger.debug(f"Using string method.") + m = regex.search(instr) + case _: + pass + if m != None: + try: + parsed_name = m.group().upper().strip(".") + except: + parsed_name = None + else: + parsed_name = None + logger.debug(f"Got parsed submission name: {parsed_name}") + return parsed_name + +from .pydant import * \ No newline at end of file diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/validators/pydant.py similarity index 91% rename from src/submissions/backend/pydant/__init__.py rename to src/submissions/backend/validators/pydant.py index 9390112..18cdd65 100644 --- a/src/submissions/backend/pydant/__init__.py +++ b/src/submissions/backend/validators/pydant.py @@ -7,19 +7,16 @@ from datetime import date, datetime from dateutil.parser import parse from dateutil.parser._parser import ParserError from typing import List, Any -# from backend.namer import RSLNamer +from . import RSLNamer from pathlib import Path import re import logging from tools import check_not_nan, convert_nans_to_nones, Settings from backend.db.functions import lookup_submissions -from backend.db.models import BasicSubmission - - logger = logging.getLogger(f"submissions.{__name__}") -class PydReagent(BaseModel): +class PydSheetReagent(BaseModel): type: str|None lot: str|None exp: date|None @@ -73,9 +70,7 @@ class PydReagent(BaseModel): else: return values.data['type'] - - -class PydSubmission(BaseModel, extra='allow'): +class PydSheetSubmission(BaseModel, extra='allow'): ctx: Settings filepath: Path submission_type: dict|None @@ -90,7 +85,6 @@ class PydSubmission(BaseModel, extra='allow'): submission_category: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True) reagents: List[dict] = [] samples: List[Any] - @field_validator("submitter_plate_num") @classmethod @@ -153,10 +147,10 @@ class PydSubmission(BaseModel, extra='allow'): else: logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath") # output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name - output = BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name + output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name return dict(value=output, parsed=False) else: - output = BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name + output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name return dict(value=output, parsed=False) @field_validator("technician", mode="before") @@ -206,8 +200,10 @@ class PydSubmission(BaseModel, extra='allow'): if check_not_nan(value['value']): value = value['value'].title() return dict(value=value, parsed=True) + # else: + # return dict(value="RSL Name not found.") else: - return dict(value=BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False) + return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False) @field_validator("submission_category") @classmethod @@ -215,4 +211,3 @@ class PydSubmission(BaseModel, extra='allow'): if value['value'] not in ["Research", "Diagnostic", "Surveillance"]: value['value'] = values.data['submission_type']['value'] return value - diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index 5a8adba..cae5239 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -17,11 +17,11 @@ from backend.db.functions import construct_kit_from_yaml, \ lookup_reagent_types, lookup_reagents, lookup_submission_type, lookup_reagenttype_kittype_association, \ lookup_submissions from backend.db.models import SubmissionTypeKitTypeAssociation -from sqlalchemy import FLOAT, INTEGER, String +from sqlalchemy import FLOAT, INTEGER import logging import numpy as np from .pop_ups import AlertPop -from backend.pydant import PydReagent +from backend.validators import PydSheetReagent from typing import Tuple logger = logging.getLogger(f"submissions.{__name__}") @@ -386,11 +386,11 @@ class ControlsDatePicker(QWidget): class ImportReagent(QComboBox): - def __init__(self, ctx:Settings, reagent:dict|PydReagent, extraction_kit:str): + def __init__(self, ctx:Settings, reagent:dict|PydSheetReagent, extraction_kit:str): super().__init__() self.setEditable(True) if isinstance(reagent, dict): - reagent = PydReagent(**reagent) + reagent = PydSheetReagent(**reagent) # Ensure that all reagenttypes have a name that matches the items in the excel parser query_var = reagent.type logger.debug(f"Import Reagent is looking at: {reagent.lot} for {query_var}") diff --git a/src/submissions/frontend/custom_widgets/pop_ups.py b/src/submissions/frontend/custom_widgets/pop_ups.py index 243d8b9..43492e1 100644 --- a/src/submissions/frontend/custom_widgets/pop_ups.py +++ b/src/submissions/frontend/custom_widgets/pop_ups.py @@ -96,5 +96,5 @@ class SubmissionTypeSelector(QDialog): self.layout.addWidget(self.buttonBox) self.setLayout(self.layout) - def getValues(self): + def parse_form(self): return self.widget.currentText() diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index db9cc55..7736b06 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -27,7 +27,6 @@ from backend.db.functions import ( construct_submission_info, lookup_reagents, construct_kit_from_yaml, construct_org_from_yaml, get_control_subtypes, update_subsampassoc_with_pcr, check_kit_integrity, update_last_used, lookup_organizations, lookup_kit_types, lookup_submissions, lookup_controls, lookup_samples, lookup_submission_sample_association, store_object, lookup_submission_type, - get_polymorphic_subclass ) from backend.excel.parser import SheetParser, PCRParser, SampleParser from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df @@ -56,9 +55,7 @@ def import_submission_function(obj:QMainWindow, fname:Path|None=None) -> Tuple[Q logger.debug(obj.ctx) # initialize samples obj.samples = [] - obj.missing_info = [] - # set file dialog if isinstance(fname, bool) or fname == None: fname = select_open_file(obj, file_extension="xlsx") diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 3083825..d253dcb 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -134,153 +134,6 @@ def massage_common_reagents(reagent_name:str): reagent_name = reagent_name.replace("ยต", "u") return reagent_name -# class RSLNamer(object): -# """ -# Object that will enforce proper formatting on RSL plate names. -# NOTE: Depreciated in favour of object based methods in 'submissions.py' -# """ -# def __init__(self, ctx, instr:str, sub_type:str|None=None): -# self.ctx = ctx -# self.submission_type = sub_type -# self.retrieve_rsl_number(in_str=instr) -# if self.submission_type != None: -# # custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema -# parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}") -# parser() -# self.parsed_name = self.parsed_name.replace("_", "-") - -# def retrieve_rsl_number(self, in_str:str|Path): -# """ -# Uses regex to retrieve the plate number and submission type from an input string - -# Args: -# in_str (str): string to be parsed -# """ -# if not isinstance(in_str, Path): -# in_str = Path(in_str) -# self.out_str = in_str.stem -# logger.debug(f"Attempting match of {self.out_str}") -# logger.debug(f"The initial plate name is: {self.out_str}") -# regex = re.compile(r""" -# # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| -# (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| -# (?PRSL-?\d{2}-?\d{4})| -# (?P(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) -# """, flags = re.IGNORECASE | re.VERBOSE) -# m = regex.search(self.out_str) -# if m != None: -# self.parsed_name = m.group().upper().strip(".") -# logger.debug(f"Got parsed submission name: {self.parsed_name}") -# if self.submission_type == None: -# try: -# self.submission_type = m.lastgroup -# except AttributeError as e: -# logger.critical("No RSL plate number found or submission type found!") -# logger.debug(f"The cause of the above error was: {e}") -# logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") -# if in_str.exists(): -# my_xl = pd.ExcelFile(in_str) -# if my_xl.book.properties.category != None: -# categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] -# self.submission_type = categories[0].replace(" ", "_").lower() -# else: -# raise AttributeError(f"File {in_str.__str__()} has no categories.") -# else: -# raise FileNotFoundError() -# # else: -# # raise ValueError(f"No parsed name could be created for {self.out_str}.") - -# def enforce_wastewater(self): -# """ -# Uses regex to enforce proper formatting of wastewater samples -# """ -# def construct(): -# today = datetime.now() -# return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-1" -# try: -# self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) -# except AttributeError as e: -# logger.error(f"Problem using regex: {e}") -# self.parsed_name = construct() -# self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") -# self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) -# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) -# logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}") -# try: -# plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-") -# logger.debug(f"Plate number is: {plate_number}") -# except AttributeError as e: -# plate_number = "1" -# # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) -# self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name) -# logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}") -# try: -# repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat'] -# if repeat == None: -# repeat = "1" -# except AttributeError as e: -# repeat = "" -# self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") - -# def enforce_bacterial_culture(self): -# """ -# Uses regex to enforce proper formatting of bacterial culture samples -# """ -# def construct(ctx) -> str: -# """ -# DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 - -# Returns: -# str: new RSL number -# """ -# logger.debug(f"Attempting to construct RSL number from scratch...") -# # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") -# directory = Path(ctx.directory_path).joinpath("Bacteria") -# year = str(datetime.now().year)[-2:] -# if directory.exists(): -# logger.debug(f"Year: {year}") -# relevant_rsls = [] -# all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] -# logger.debug(f"All rsls: {all_xlsx}") -# for item in all_xlsx: -# try: -# relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) -# except Exception as e: -# logger.error(f"Regex error: {e}") -# continue -# logger.debug(f"Initial xlsx: {relevant_rsls}") -# max_number = max([int(item[-4:]) for item in relevant_rsls]) -# logger.debug(f"The largest sample number is: {max_number}") -# return f"RSL-{year}-{str(max_number+1).zfill(4)}" -# else: -# # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") -# return f"RSL-{year}-0000" -# try: -# self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) -# except AttributeError as e: -# self.parsed_name = construct(ctx=self.ctx) -# # year = datetime.now().year -# # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" -# self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) - - -# def enforce_wastewater_artic(self): -# """ -# Uses regex to enforce proper formatting of wastewater samples -# """ -# def construct(): -# today = datetime.now() -# return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" -# try: -# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) -# except AttributeError: -# self.parsed_name = construct() -# try: -# plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-")) -# except (AttributeError, ValueError) as e: -# plate_number = 1 -# self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name) - class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler): def doRollover(self):