diff --git a/CHANGELOG.md b/CHANGELOG.md index 8426150..86127ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 202310.03 + +- Replaced RSLNamer class with Submission object specific class methods. + ## 202310.02 - Improvements to First strand constructor. diff --git a/TODO.md b/TODO.md index 2f22789..b54bae4 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,6 @@ +- [ ] Convert Pydantic models to Submission models? +- [x] Move RSLNamer into Submission database object. + - Having second thoughts about some of this. Move into parser module? - [x] Change 'check_is_power_user' to decorator. - [x] Drag and drop files into submission form area? - [ ] Get info for controls into their sample hitpicks. diff --git a/src/submissions/backend/db/functions/constructions.py b/src/submissions/backend/db/functions/constructions.py index 01410a7..1717d9d 100644 --- a/src/submissions/backend/db/functions/constructions.py +++ b/src/submissions/backend/db/functions/constructions.py @@ -2,8 +2,11 @@ Used to construct models from input dictionaries. ''' from getpass import getuser -from tools import Settings, RSLNamer, check_regex_match, check_authorization, massage_common_reagents +from tools import Settings, check_regex_match, check_authorization, massage_common_reagents from .. import models +# from .misc import RSLNamer +# from backend.namer import RSLNamer +# from .misc import get_polymorphic_subclass from .lookups import * import logging from datetime import date, timedelta @@ -62,7 +65,9 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.Basi models.BasicSubmission: Constructed submission object """ # convert submission type into model name - query = info_dict['submission_type'].replace(" ", "") + # model = get_polymorphic_subclass(polymorphic_identity=info_dict['submission_type']) + model = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=info_dict['submission_type']) + logger.debug(f"We've got the model: {type(model)}") # Ensure an rsl plate number exists for the plate if not check_regex_match("^RSL", info_dict["rsl_plate_num"]): instance = None @@ -70,13 +75,13 @@ def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.Basi return instance, {'code': 2, 'message': "A proper RSL plate number is required."} else: # enforce conventions on the rsl plate number from the form - info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name + # info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name + info_dict['rsl_plate_num'] = model.RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"], sub_type=info_dict['submission_type']).parsed_name # check database for existing object instance = lookup_submissions(ctx=ctx, rsl_number=info_dict['rsl_plate_num']) # get model based on submission type converted above - logger.debug(f"Looking at models for submission type: {query}") - model = getattr(models, query) - logger.debug(f"We've got the model: {type(model)}") + # logger.debug(f"Looking at models for submission type: {query}") + # if query return nothing, ie doesn't already exist in db if instance == None: instance = model() @@ -218,10 +223,8 @@ def construct_kit_from_yaml(ctx:Settings, kit_dict:dict) -> dict: kit.kit_submissiontype_associations.append(kt_st_assoc) # A kit contains multiple reagent types. for r in kit_dict['reagent_types']: - # check if reagent type already exists. logger.debug(f"Constructing reagent type: {r}") rtname = massage_common_reagents(r['rtname']) - # look_up = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==rtname).first() look_up = lookup_reagent_types(name=rtname) if look_up == None: rt = models.ReagentType(name=rtname.strip(), eol_ext=timedelta(30*r['eol'])) @@ -237,6 +240,7 @@ def construct_kit_from_yaml(ctx:Settings, kit_dict:dict) -> dict: store_object(ctx=ctx, object=kit) return {'code':0, 'message':'Kit has been added', 'status': 'information'} +@check_authorization def construct_org_from_yaml(ctx:Settings, org:dict) -> dict: """ Create and store a new organization based on a .yml file @@ -248,11 +252,11 @@ def construct_org_from_yaml(ctx:Settings, org:dict) -> dict: Returns: dict: dictionary containing results of db addition """ - from tools import check_is_power_user - # Don't want just anyone adding in clients - if not check_is_power_user(ctx=ctx): - logger.debug(f"{getuser()} does not have permission to add kits.") - return {'code':1, 'message':"This user does not have permission to add organizations."} + # from tools import check_is_power_user + # # Don't want just anyone adding in clients + # if not check_is_power_user(ctx=ctx): + # logger.debug(f"{getuser()} does not have permission to add kits.") + # return {'code':1, 'message':"This user does not have permission to add organizations."} # the yml can contain multiple clients for client in org: cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre']) diff --git a/src/submissions/backend/db/functions/lookups.py b/src/submissions/backend/db/functions/lookups.py index 6ac0e1c..93e6261 100644 --- a/src/submissions/backend/db/functions/lookups.py +++ b/src/submissions/backend/db/functions/lookups.py @@ -1,5 +1,6 @@ from .. import models -from tools import Settings, RSLNamer +from tools import Settings +# from backend.namer import RSLNamer from typing import List import logging from datetime import date, datetime @@ -8,7 +9,6 @@ from sqlalchemy.orm.query import Query from sqlalchemy import and_, JSON from sqlalchemy.orm import Session - logger = logging.getLogger(f"submissions.{__name__}") def query_return(query:Query, limit:int=0): @@ -155,7 +155,10 @@ def lookup_submissions(ctx:Settings, chronologic:bool=False, limit:int=0, **kwargs ) -> models.BasicSubmission | List[models.BasicSubmission]: - model = models.find_subclasses(parent=models.BasicSubmission, attrs=kwargs) + if rsl_number == None: + model = models.BasicSubmission.find_subclasses(ctx=ctx, attrs=kwargs) + else: + model = models.BasicSubmission.find_subclasses(ctx=ctx, rsl_number=rsl_number) query = setup_lookup(ctx=ctx, locals=locals()).query(model) # by submission type match submission_type: @@ -208,14 +211,17 @@ def lookup_submissions(ctx:Settings, # by rsl number (returns only a single value) match rsl_number: case str(): + namer = model.RSLNamer(ctx=ctx, instr=rsl_number) logger.debug(f"Looking up BasicSubmission with rsl number: {rsl_number}") try: - rsl_number = RSLNamer(ctx=ctx, instr=rsl_number).parsed_name + rsl_number = namer.parsed_name + logger.debug(f"Got {rsl_number} from {model}.") except AttributeError as e: logger.error(f"No parsed name found, returning None.") return None # query = query.filter(models.BasicSubmission.rsl_plate_num==rsl_number) query = query.filter(model.rsl_plate_num==rsl_number) + logger.debug(f"At this point the query gets: {query.all()}") limit = 1 case _: pass @@ -242,6 +248,7 @@ def lookup_submissions(ctx:Settings, if chronologic: # query.order_by(models.BasicSubmission.submitted_date) query.order_by(model.submitted_date) + logger.debug(f"At the end of the search, the query gets: {query.all()}") return query_return(query=query, limit=limit) def lookup_submission_type(ctx:Settings, @@ -367,7 +374,8 @@ def lookup_samples(ctx:Settings, **kwargs ) -> models.BasicSample|models.WastewaterSample|List[models.BasicSample]: logger.debug(f"Length of kwargs: {len(kwargs)}") - model = models.find_subclasses(parent=models.BasicSample, attrs=kwargs) + # model = models.find_subclasses(parent=models.BasicSample, attrs=kwargs) + model = models.BasicSample.find_subclasses(ctx=ctx, attrs=kwargs) query = setup_lookup(ctx=ctx, locals=locals()).query(model) match submitter_id: case str(): diff --git a/src/submissions/backend/db/functions/misc.py b/src/submissions/backend/db/functions/misc.py index 8880f30..ac0082f 100644 --- a/src/submissions/backend/db/functions/misc.py +++ b/src/submissions/backend/db/functions/misc.py @@ -12,6 +12,9 @@ from . import store_object from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError from pprint import pformat +import logging + +logger = logging.getLogger(f"submissions.{__name__}") def submissions_to_df(ctx:Settings, submission_type:str|None=None, limit:int=0) -> pd.DataFrame: """ @@ -236,25 +239,26 @@ def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission result = store_object(ctx=ctx, object=assoc) return result -def get_polymorphic_subclass(base:object, polymorphic_identity:str|None=None): - """ - Retrieves any subclasses of given base class whose polymorphic identity matches the string input. +# def get_polymorphic_subclass(base:object|models.BasicSubmission=models.BasicSubmission, polymorphic_identity:str|None=None): +# """ +# Retrieves any subclasses of given base class whose polymorphic identity matches the string input. +# NOTE: Depreciated in favour of class based finders in 'submissions.py' - Args: - base (object): Base (parent) class - polymorphic_identity (str | None): Name of subclass of interest. (Defaults to None) +# Args: +# base (object): Base (parent) class +# polymorphic_identity (str | None): Name of subclass of interest. (Defaults to None) - Returns: - _type_: Subclass, or parent class on - """ - if isinstance(polymorphic_identity, dict): - polymorphic_identity = polymorphic_identity['value'] - if polymorphic_identity == None: - return base - else: - try: - return [item for item in base.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] - except Exception as e: - logger.error(f"Could not get polymorph {polymorphic_identity} of {base} due to {e}") - return base +# Returns: +# _type_: Subclass, or parent class on +# """ +# if isinstance(polymorphic_identity, dict): +# polymorphic_identity = polymorphic_identity['value'] +# if polymorphic_identity == None: +# return base +# else: +# try: +# return [item for item in base.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] +# except Exception as e: +# logger.error(f"Could not get polymorph {polymorphic_identity} of {base} due to {e}") +# return base diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index 089294b..3dd7936 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -11,7 +11,7 @@ metadata = Base.metadata logger = logging.getLogger(f"submissions.{__name__}") -def find_subclasses(parent:Any, attrs:dict) -> Any: +def find_subclasses(parent:Any, attrs:dict|None=None, rsl_number:str|None=None) -> Any: """ Finds subclasses of a parent that does contain all attributes if the parent does not. @@ -26,7 +26,7 @@ def find_subclasses(parent:Any, attrs:dict) -> Any: Returns: _type_: Parent or subclass. """ - if len(attrs) == 0: + if len(attrs) == 0 or attrs == None: return parent if any([not hasattr(parent, attr) for attr in attrs]): # looks for first model that has all included kwargs diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index efbda37..6e8253b 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -3,6 +3,7 @@ Models for the main submission types. ''' from getpass import getuser import math +from pprint import pformat from . import Base from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT, case from sqlalchemy.orm import relationship, validates @@ -17,7 +18,9 @@ from dateutil.parser import parse import re import pandas as pd from openpyxl import Workbook -from tools import check_not_nan, row_map +from tools import check_not_nan, row_map, Settings +from pathlib import Path +from datetime import datetime logger = logging.getLogger(f"submissions.{__name__}") @@ -61,7 +64,7 @@ class BasicSubmission(Base): # Allows for subclassing into ex. BacterialCulture, Wastewater, etc. __mapper_args__ = { - "polymorphic_identity": "basic_submission", + "polymorphic_identity": "Basic Submission", "polymorphic_on": submission_type_name, "with_polymorphic": "*", } @@ -295,18 +298,93 @@ class BasicSubmission(Base): """ return input_excel + class _RSLNamer(object): + + alias = None + + def __init__(self, ctx:Settings, instr:str|Path, sub_type:str|None=None, parent=None): + if parent != None: + logger.debug(f"Hello from {parent.__mapper_args__['polymorphic_identity']} Namer!") + self.ctx = ctx + self.submission_type = sub_type + self.retrieve_rsl_number(instr=instr) + try: + ncls = [item for item in self.__class__.__subclasses__() if item.alias == self.submission_type][0] + enforcer = ncls.enforce_name + enforcer(self=self, parent=parent) + except IndexError: + enforcer = self.enforce_name + enforcer(parent=parent) + + + def retrieve_rsl_number(self, instr:str|Path): + """ + Uses regex to retrieve the plate number and submission type from an input string + + Args: + in_str (str): string to be parsed + """ + if not isinstance(instr, Path): + instr = Path(instr) + self.out_str = instr.stem + logger.debug(f"Attempting match of {self.out_str}") + logger.debug(f"The initial plate name is: {self.out_str}") + regex = self.construct_regex() + m = regex.search(self.out_str) + if m != None: + self.parsed_name = m.group().upper().strip(".") + logger.debug(f"Got parsed submission name: {self.parsed_name}") + if self.submission_type == None: + try: + self.submission_type = m.lastgroup.replace("_", " ") + except AttributeError as e: + self.submission_type = None + + def enforce_name(self, parent): + if parent != None: + logger.debug(f"Hello from {parent.__mapper_args__['polymorphic_identity']} Enforcer!") + self.parsed_name = self.parsed_name + + @classmethod + def construct_regex(cls): + rstring = rf'{"|".join([item.get_regex() for item in cls.__subclasses__()])}' + regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + return regex + @classmethod - def enforce_naming_schema(cls, input_str:str) -> str: - """ - Used to ensure proper custom naming of submission. - - Args: - input_str (str): name parsed by default parser - - Returns: - str: custom parser output. - """ - return input_str + def RSLNamer(cls, ctx:Settings, instr:str, sub_type:str|None=None): + return cls._RSLNamer(parent=cls, ctx=ctx, instr=instr, sub_type=sub_type) + + @classmethod + def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, rsl_number:str|None=None): + if rsl_number != None: + namer = cls._RSLNamer(ctx=ctx, instr=rsl_number) + return cls.find_polymorphic_subclass(namer.submission_type) + if len(attrs) == 0 or attrs == None: + return cls + if any([not hasattr(cls, attr) for attr in attrs]): + # looks for first model that has all included kwargs + try: + model = [subclass for subclass in cls.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] + except IndexError as e: + raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") + else: + model = cls + logger.debug(f"Using model: {model}") + return model + + @classmethod + def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None): + if isinstance(polymorphic_identity, dict): + polymorphic_identity = polymorphic_identity['value'] + if polymorphic_identity == None: + return cls + else: + try: + return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + except Exception as e: + logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") + return cls # Below are the custom submission types @@ -372,6 +450,59 @@ class BacterialCulture(BasicSubmission): input_excel["Sample List"].cell(row=15, column=2, value=getuser()[0:2].upper()) return input_excel + class _RSLNamer(BasicSubmission._RSLNamer): + + alias = "Bacterial Culture" + + @classmethod + def construct_regex(cls): + rstring = rf'{cls.get_regex()}' + regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + return regex + + def enforce_name(self, parent): + # super().enforce_name(parent) + def construct(ctx) -> str: + """ + DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 + + Returns: + str: new RSL number + """ + logger.debug(f"Attempting to construct RSL number from scratch...") + # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") + directory = Path(ctx.directory_path).joinpath("Bacteria") + year = str(datetime.now().year)[-2:] + if directory.exists(): + logger.debug(f"Year: {year}") + relevant_rsls = [] + all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] + logger.debug(f"All rsls: {all_xlsx}") + for item in all_xlsx: + try: + relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) + except Exception as e: + logger.error(f"Regex error: {e}") + continue + logger.debug(f"Initial xlsx: {relevant_rsls}") + max_number = max([int(item[-4:]) for item in relevant_rsls]) + logger.debug(f"The largest sample number is: {max_number}") + return f"RSL-{year}-{str(max_number+1).zfill(4)}" + else: + # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") + return f"RSL-{year}-0000" + try: + self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) + except AttributeError as e: + self.parsed_name = construct(ctx=self.ctx) + # year = datetime.now().year + # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" + self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) + + @classmethod + def get_regex(cls): + return "(?PRSL-?\\d{2}-?\\d{4})" + class Wastewater(BasicSubmission): """ derivative submission type from BasicSubmission @@ -411,6 +542,50 @@ class Wastewater(BasicSubmission): if xl != None: input_dict['csv'] = xl.parse("Copy to import file") return input_dict + + class _RSLNamer(BasicSubmission._RSLNamer): + + alias = "Wastewater" + + @classmethod + def construct_regex(cls): + rstring = rf'{cls.get_regex()}' + regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + return regex + + def enforce_name(self, parent): + # super().enforce_name(parent) + def construct(): + today = datetime.now() + return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + try: + self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) + except AttributeError as e: + logger.error(f"Problem using regex: {e}") + self.parsed_name = construct() + self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") + self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) + self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) + logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}") + try: + plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-") + logger.debug(f"Plate number is: {plate_number}") + except AttributeError as e: + plate_number = "1" + # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) + self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name) + logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}") + try: + repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat'] + if repeat == None: + repeat = "1" + except AttributeError as e: + repeat = "" + self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") + + @classmethod + def get_regex(cls): + return "(?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)" class WastewaterArtic(BasicSubmission): """ @@ -454,6 +629,35 @@ class WastewaterArtic(BasicSubmission): # at the end, this has to be done here. No moving to sqlalchemy object :( input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip() return input_dict + + class _RSLNamer(BasicSubmission._RSLNamer): + + alias = "Wastewater Artic" + + @classmethod + def construct_regex(cls): + rstring = rf'{cls.get_regex()}' + regex = re.compile(rstring, flags = re.IGNORECASE | re.VERBOSE) + return regex + + def enforce_name(self, parent): + # super().enforce_name(parent) + def construct(): + today = datetime.now() + return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" + try: + self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) + except AttributeError: + self.parsed_name = construct() + try: + plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-")) + except (AttributeError, ValueError) as e: + plate_number = 1 + self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name) + + @classmethod + def get_regex(cls): + return "(?P(\\d{4}-\\d{2}-\\d{2}(?:-|_)(?:\\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)\\d?(\\D|$)R?\\d?)?))" class BasicSample(Base): """ @@ -542,6 +746,34 @@ class BasicSample(Base): """ return dict(name=self.submitter_id[:10], positive=False, tooltip=tooltip_text) + @classmethod + def find_subclasses(cls, ctx:Settings, attrs:dict|None=None, rsl_number:str|None=None): + if len(attrs) == 0 or attrs == None: + return cls + if any([not hasattr(cls, attr) for attr in attrs]): + # looks for first model that has all included kwargs + try: + model = [subclass for subclass in cls.__subclasses__() if all([hasattr(subclass, attr) for attr in attrs])][0] + except IndexError as e: + raise AttributeError(f"Couldn't find existing class/subclass of {cls} with all attributes:\n{pformat(attrs)}") + else: + model = cls + logger.debug(f"Using model: {model}") + return model + + @classmethod + def find_polymorphic_subclass(cls, polymorphic_identity:str|None=None): + if isinstance(polymorphic_identity, dict): + polymorphic_identity = polymorphic_identity['value'] + if polymorphic_identity == None: + return cls + else: + try: + return [item for item in cls.__subclasses__() if item.__mapper_args__['polymorphic_identity']==polymorphic_identity][0] + except Exception as e: + logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}") + return cls + class WastewaterSample(BasicSample): """ Derivative wastewater sample diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index 39cfa4d..99cf3ae 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -6,20 +6,78 @@ import pprint from typing import List import pandas as pd from pathlib import Path -from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples, get_polymorphic_subclass +from backend.db import models, lookup_kit_types, lookup_submission_type, lookup_samples from backend.pydant import PydSubmission, PydReagent import logging from collections import OrderedDict import re from datetime import date from dateutil.parser import parse, ParserError -from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings +from tools import check_not_nan, convert_nans_to_nones, Settings +# from backend.namer import RSLNamer from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector logger = logging.getLogger(f"submissions.{__name__}") row_keys = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8) +class RSLNamer(object): + """ + Object that will enforce proper formatting on RSL plate names. + NOTE: Depreciated in favour of object based methods in 'submissions.py' + """ + def __init__(self, ctx, instr:str, sub_type:str|None=None): + self.ctx = ctx + self.submission_type = sub_type + self.retrieve_rsl_number(in_str=instr) + if self.submission_type != None: + # custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema + parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}") + parser() + self.parsed_name = self.parsed_name.replace("_", "-") + + def retrieve_rsl_number(self, in_str:str|Path): + """ + Uses regex to retrieve the plate number and submission type from an input string + + Args: + in_str (str): string to be parsed + """ + if not isinstance(in_str, Path): + in_str = Path(in_str) + self.out_str = in_str.stem + logger.debug(f"Attempting match of {self.out_str}") + logger.debug(f"The initial plate name is: {self.out_str}") + # regex = re.compile(r""" + # # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| + # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| + # (?PRSL-?\d{2}-?\d{4})| + # (?P(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) + # """, flags = re.IGNORECASE | re.VERBOSE) + regex = models.BasicSubmission.RSLNamer.construct_regex() + m = regex.search(self.out_str) + if m != None: + self.parsed_name = m.group().upper().strip(".") + logger.debug(f"Got parsed submission name: {self.parsed_name}") + if self.submission_type == None: + try: + self.submission_type = m.lastgroup + except AttributeError as e: + logger.critical("No RSL plate number found or submission type found!") + logger.debug(f"The cause of the above error was: {e}") + logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") + if in_str.exists(): + my_xl = pd.ExcelFile(in_str) + if my_xl.book.properties.category != None: + categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] + self.submission_type = categories[0].replace(" ", "_").lower() + else: + raise AttributeError(f"File {in_str.__str__()} has no categories.") + else: + raise FileNotFoundError() + # else: + # raise ValueError(f"No parsed name could be created for {self.out_str}.") + class SheetParser(object): """ object to pull and contain data from excel file @@ -69,12 +127,20 @@ class SheetParser(object): else: # This code is going to be depreciated once there is full adoption of the client sheets # with updated metadata... but how will it work for Artic? + + # sub = get_polymorphic_subclass() + try: + logger.debug(f"Attempting to match file name regex") + namer = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=self.filepath) + return namer.submission_type + except Exception as e: + logger.error(f"Unable to find file name regex match") logger.debug("Using excel map to find type...") try: for type in self.ctx.submission_types: # This gets the *first* submission type that matches the sheet names in the workbook if self.xl.sheet_names == self.ctx.submission_types[type]['excel_map']: - return dict(value=type.title(), parsed=True) + return dict(value=type.title(), parsed=False) return "Unknown" except Exception as e: logger.warning(f"We were unable to parse the submission type due to: {e}") @@ -119,43 +185,6 @@ class SheetParser(object): """ self.sample_result, self.sub['samples'] = SampleParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_samples() - # def parse_bacterial_culture(self, input_dict) -> dict: - # """ - # Update submission dictionary with type specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # return input_dict - - # def parse_wastewater(self, input_dict) -> dict: - # """ - # Update submission dictionary with type specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # return input_dict - - # def parse_wastewater_artic(self, input_dict:dict) -> dict: - # """ - # Update submission dictionary with type specific information - - # Args: - # input_dict (dict): Input sample dictionary - - # Returns: - # dict: Updated sample dictionary - # """ - # return input_dict - - def import_kit_validation_check(self): """ Enforce that the parser has an extraction kit @@ -224,7 +253,8 @@ class InfoParser(object): submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type['value']) info_map = submission_type.info_map # Get the parse_info method from the submission type specified - self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_info + # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_info + self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_info return info_map def parse_info(self) -> dict: @@ -359,7 +389,8 @@ class SampleParser(object): submission_type = lookup_submission_type(ctx=self.ctx, name=submission_type) logger.debug(f"info_map: {pprint.pformat(submission_type.info_map)}") sample_info_map = submission_type.info_map['samples'] - self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples + # self.custom_parser = get_polymorphic_subclass(models.BasicSubmission, submission_type.name).parse_samples + self.custom_parser = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=submission_type.name).parse_samples return sample_info_map def construct_plate_map(self, plate_map_location:dict) -> pd.DataFrame: @@ -376,7 +407,8 @@ class SampleParser(object): df = df.iloc[plate_map_location['start_row']-1:plate_map_location['end_row'], plate_map_location['start_column']-1:plate_map_location['end_column']] df = pd.DataFrame(df.values[1:], columns=df.iloc[0]) df = df.set_index(df.columns[0]) - custom_mapper = get_polymorphic_subclass(models.BasicSubmission, self.submission_type) + # custom_mapper = get_polymorphic_subclass(models.BasicSubmission, self.submission_type) + custom_mapper = models.BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type) df = custom_mapper.custom_platemap(self.xl, df) return df @@ -596,7 +628,7 @@ class SampleParser(object): for plate in self.plates: df = self.xl.parse(plate['sheet'], header=None) if isinstance(df.iat[plate['row']-1, plate['column']-1], str): - output = RSLNamer(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]).parsed_name + output = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]).parsed_name else: continue plates.append(output) @@ -631,7 +663,7 @@ class PCRParser(object): return # self.pcr = OrderedDict() self.pcr = {} - namer = RSLNamer(ctx=self.ctx, instr=filepath.__str__()) + namer = models.BasicSubmission.RSLNamer(ctx=self.ctx, instr=filepath.__str__()) self.plate_num = namer.parsed_name self.submission_type = namer.submission_type logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") @@ -672,7 +704,7 @@ class PCRParser(object): self.pcr['imported_by'] = getuser() return df - def parse_wastewater(self): + def parse_Wastewater(self): """ Parse specific to wastewater samples. """ diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py index 588767a..9390112 100644 --- a/src/submissions/backend/pydant/__init__.py +++ b/src/submissions/backend/pydant/__init__.py @@ -7,12 +7,13 @@ from datetime import date, datetime from dateutil.parser import parse from dateutil.parser._parser import ParserError from typing import List, Any -from tools import RSLNamer +# from backend.namer import RSLNamer from pathlib import Path import re import logging from tools import check_not_nan, convert_nans_to_nones, Settings from backend.db.functions import lookup_submissions +from backend.db.models import BasicSubmission @@ -151,10 +152,11 @@ class PydSubmission(BaseModel, extra='allow'): return dict(value=value['value'], parsed=True) else: logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath") - output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name + # output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name + output = BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name return dict(value=output, parsed=False) else: - output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name + output = BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name return dict(value=output, parsed=False) @field_validator("technician", mode="before") @@ -205,7 +207,7 @@ class PydSubmission(BaseModel, extra='allow'): value = value['value'].title() return dict(value=value, parsed=True) else: - return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False) + return dict(value=BasicSubmission.RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False) @field_validator("submission_category") @classmethod diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index 4fefc1f..db9cc55 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -899,7 +899,8 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re logger.debug(f"Attempting: {item['type']}") worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) # Hacky way to pop in 'signed by' - custom_parser = get_polymorphic_subclass(BasicSubmission, info['submission_type']) + # custom_parser = get_polymorphic_subclass(BasicSubmission, info['submission_type']) + custom_parser = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=info['submission_type']) workbook = custom_parser.custom_autofill(workbook) fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") workbook.save(filename=fname.__str__()) diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index b16c283..3083825 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -89,37 +89,37 @@ def convert_nans_to_nones(input_str) -> str|None: return input_str return None -def create_reagent_list(in_dict:dict) -> list[str]: - """ - Makes list of reagent types without "lot_" prefix for each key in a dictionary +# def create_reagent_list(in_dict:dict) -> list[str]: +# """ +# Makes list of reagent types without "lot_" prefix for each key in a dictionary - Args: - in_dict (dict): input dictionary of reagents +# Args: +# in_dict (dict): input dictionary of reagents - Returns: - list[str]: list of reagent types with "lot_" prefix removed. - """ - return [item.strip("lot_") for item in in_dict.keys()] +# Returns: +# list[str]: list of reagent types with "lot_" prefix removed. +# """ +# return [item.strip("lot_") for item in in_dict.keys()] -def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: - """ - Uses regex to retrieve the plate number and submission type from an input string - DEPRECIATED. REPLACED BY RSLNamer.parsed_name +# def retrieve_rsl_number(in_str:str) -> Tuple[str, str]: +# """ +# Uses regex to retrieve the plate number and submission type from an input string +# DEPRECIATED. REPLACED BY RSLNamer.parsed_name - Args: - in_str (str): string to be parsed +# Args: +# in_str (str): string to be parsed - Returns: - Tuple[str, str]: tuple of (output rsl number, submission_type) - """ - in_str = in_str.split("\\")[-1] - logger.debug(f"Attempting match of {in_str}") - regex = re.compile(r""" - (?PRSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?PRSL-\d{2}-\d{4}) - """, re.VERBOSE) - m = regex.search(in_str) - parsed = m.group().replace("_", "-") - return (parsed, m.lastgroup) +# Returns: +# Tuple[str, str]: tuple of (output rsl number, submission_type) +# """ +# in_str = in_str.split("\\")[-1] +# logger.debug(f"Attempting match of {in_str}") +# regex = re.compile(r""" +# (?PRSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?PRSL-\d{2}-\d{4}) +# """, re.VERBOSE) +# m = regex.search(in_str) +# parsed = m.group().replace("_", "-") +# return (parsed, m.lastgroup) def check_regex_match(pattern:str, check:str) -> bool: try: @@ -134,153 +134,152 @@ def massage_common_reagents(reagent_name:str): reagent_name = reagent_name.replace("ยต", "u") return reagent_name -class RSLNamer(object): - """ - Object that will enforce proper formatting on RSL plate names. - """ - def __init__(self, ctx, instr:str, sub_type:str|None=None): - from backend.db.functions import get_polymorphic_subclass - from backend.db.models import BasicSubmission - self.ctx = ctx - self.submission_type = sub_type - self.retrieve_rsl_number(in_str=instr) - if self.submission_type != None: - custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema - # parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}") - # parser() - self.parsed_name = self.parsed_name.replace("_", "-") +# class RSLNamer(object): +# """ +# Object that will enforce proper formatting on RSL plate names. +# NOTE: Depreciated in favour of object based methods in 'submissions.py' +# """ +# def __init__(self, ctx, instr:str, sub_type:str|None=None): +# self.ctx = ctx +# self.submission_type = sub_type +# self.retrieve_rsl_number(in_str=instr) +# if self.submission_type != None: +# # custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema +# parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}") +# parser() +# self.parsed_name = self.parsed_name.replace("_", "-") - def retrieve_rsl_number(self, in_str:str|Path): - """ - Uses regex to retrieve the plate number and submission type from an input string +# def retrieve_rsl_number(self, in_str:str|Path): +# """ +# Uses regex to retrieve the plate number and submission type from an input string - Args: - in_str (str): string to be parsed - """ - if not isinstance(in_str, Path): - in_str = Path(in_str) - self.out_str = in_str.stem - logger.debug(f"Attempting match of {self.out_str}") - logger.debug(f"The initial plate name is: {self.out_str}") - regex = re.compile(r""" - # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| - (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| - (?PRSL-?\d{2}-?\d{4})| - (?P(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) - """, flags = re.IGNORECASE | re.VERBOSE) - m = regex.search(self.out_str) - if m != None: - self.parsed_name = m.group().upper().strip(".") - logger.debug(f"Got parsed submission name: {self.parsed_name}") - if self.submission_type == None: - try: - self.submission_type = m.lastgroup - except AttributeError as e: - logger.critical("No RSL plate number found or submission type found!") - logger.debug(f"The cause of the above error was: {e}") - logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") - if in_str.exists(): - my_xl = pd.ExcelFile(in_str) - if my_xl.book.properties.category != None: - categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] - self.submission_type = categories[0].replace(" ", "_").lower() - else: - raise AttributeError(f"File {in_str.__str__()} has no categories.") - else: - raise FileNotFoundError() - # else: - # raise ValueError(f"No parsed name could be created for {self.out_str}.") +# Args: +# in_str (str): string to be parsed +# """ +# if not isinstance(in_str, Path): +# in_str = Path(in_str) +# self.out_str = in_str.stem +# logger.debug(f"Attempting match of {self.out_str}") +# logger.debug(f"The initial plate name is: {self.out_str}") +# regex = re.compile(r""" +# # (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| +# (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| +# (?PRSL-?\d{2}-?\d{4})| +# (?P(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) +# """, flags = re.IGNORECASE | re.VERBOSE) +# m = regex.search(self.out_str) +# if m != None: +# self.parsed_name = m.group().upper().strip(".") +# logger.debug(f"Got parsed submission name: {self.parsed_name}") +# if self.submission_type == None: +# try: +# self.submission_type = m.lastgroup +# except AttributeError as e: +# logger.critical("No RSL plate number found or submission type found!") +# logger.debug(f"The cause of the above error was: {e}") +# logger.warning(f"We're going to have to create the submission type from the excel sheet properties...") +# if in_str.exists(): +# my_xl = pd.ExcelFile(in_str) +# if my_xl.book.properties.category != None: +# categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")] +# self.submission_type = categories[0].replace(" ", "_").lower() +# else: +# raise AttributeError(f"File {in_str.__str__()} has no categories.") +# else: +# raise FileNotFoundError() +# # else: +# # raise ValueError(f"No parsed name could be created for {self.out_str}.") - def enforce_wastewater(self): - """ - Uses regex to enforce proper formatting of wastewater samples - """ - def construct(): - today = datetime.now() - return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" - try: - self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) - except AttributeError as e: - logger.error(f"Problem using regex: {e}") - self.parsed_name = construct() - self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") - self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) - self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) - logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}") - try: - plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-") - logger.debug(f"Plate number is: {plate_number}") - except AttributeError as e: - plate_number = "1" - # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) - self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name) - logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}") - try: - repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat'] - if repeat == None: - repeat = "1" - except AttributeError as e: - repeat = "" - self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") +# def enforce_wastewater(self): +# """ +# Uses regex to enforce proper formatting of wastewater samples +# """ +# def construct(): +# today = datetime.now() +# return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-1" +# try: +# self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) +# except AttributeError as e: +# logger.error(f"Problem using regex: {e}") +# self.parsed_name = construct() +# self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") +# self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) +# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) +# logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}") +# try: +# plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-") +# logger.debug(f"Plate number is: {plate_number}") +# except AttributeError as e: +# plate_number = "1" +# # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name) +# self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name) +# logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}") +# try: +# repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat'] +# if repeat == None: +# repeat = "1" +# except AttributeError as e: +# repeat = "" +# self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") - def enforce_bacterial_culture(self): - """ - Uses regex to enforce proper formatting of bacterial culture samples - """ - def construct(ctx) -> str: - """ - DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 +# def enforce_bacterial_culture(self): +# """ +# Uses regex to enforce proper formatting of bacterial culture samples +# """ +# def construct(ctx) -> str: +# """ +# DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1 - Returns: - str: new RSL number - """ - logger.debug(f"Attempting to construct RSL number from scratch...") - # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") - directory = Path(ctx.directory_path).joinpath("Bacteria") - year = str(datetime.now().year)[-2:] - if directory.exists(): - logger.debug(f"Year: {year}") - relevant_rsls = [] - all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] - logger.debug(f"All rsls: {all_xlsx}") - for item in all_xlsx: - try: - relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) - except Exception as e: - logger.error(f"Regex error: {e}") - continue - logger.debug(f"Initial xlsx: {relevant_rsls}") - max_number = max([int(item[-4:]) for item in relevant_rsls]) - logger.debug(f"The largest sample number is: {max_number}") - return f"RSL-{year}-{str(max_number+1).zfill(4)}" - else: - # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") - return f"RSL-{year}-0000" - try: - self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) - except AttributeError as e: - self.parsed_name = construct(ctx=self.ctx) - # year = datetime.now().year - # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" - self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) +# Returns: +# str: new RSL number +# """ +# logger.debug(f"Attempting to construct RSL number from scratch...") +# # directory = Path(self.ctx['directory_path']).joinpath("Bacteria") +# directory = Path(ctx.directory_path).joinpath("Bacteria") +# year = str(datetime.now().year)[-2:] +# if directory.exists(): +# logger.debug(f"Year: {year}") +# relevant_rsls = [] +# all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]] +# logger.debug(f"All rsls: {all_xlsx}") +# for item in all_xlsx: +# try: +# relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0)) +# except Exception as e: +# logger.error(f"Regex error: {e}") +# continue +# logger.debug(f"Initial xlsx: {relevant_rsls}") +# max_number = max([int(item[-4:]) for item in relevant_rsls]) +# logger.debug(f"The largest sample number is: {max_number}") +# return f"RSL-{year}-{str(max_number+1).zfill(4)}" +# else: +# # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}") +# return f"RSL-{year}-0000" +# try: +# self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) +# except AttributeError as e: +# self.parsed_name = construct(ctx=self.ctx) +# # year = datetime.now().year +# # self.parsed_name = f"RSL-{str(year)[-2:]}-0000" +# self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) - def enforce_wastewater_artic(self): - """ - Uses regex to enforce proper formatting of wastewater samples - """ - def construct(): - today = datetime.now() - return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" - try: - self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) - except AttributeError: - self.parsed_name = construct() - try: - plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-")) - except (AttributeError, ValueError) as e: - plate_number = 1 - self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name) +# def enforce_wastewater_artic(self): +# """ +# Uses regex to enforce proper formatting of wastewater samples +# """ +# def construct(): +# today = datetime.now() +# return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}" +# try: +# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE) +# except AttributeError: +# self.parsed_name = construct() +# try: +# plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-")) +# except (AttributeError, ValueError) as e: +# plate_number = 1 +# self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name) class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler): @@ -586,25 +585,25 @@ def jinja_template_loading(): env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css") return env -def check_is_power_user(ctx:Settings) -> bool: - """ - Check to ensure current user is in power users list. - NOTE: Depreciated in favour of 'check_authorization' below. +# def check_is_power_user(ctx:Settings) -> bool: +# """ +# Check to ensure current user is in power users list. +# NOTE: Depreciated in favour of 'check_authorization' below. - Args: - ctx (dict): settings passed down from gui. +# Args: +# ctx (dict): settings passed down from gui. - Returns: - bool: True if user is in power users, else false. - """ - try: - check = getpass.getuser() in ctx.power_users - except KeyError as e: - check = False - except Exception as e: - logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}") - check = False - return check +# Returns: +# bool: True if user is in power users, else false. +# """ +# try: +# check = getpass.getuser() in ctx.power_users +# except KeyError as e: +# check = False +# except Exception as e: +# logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}") +# check = False +# return check def check_authorization(func): def wrapper(*args, **kwargs):