Post RSLNamer move, pre-restore

This commit is contained in:
Landon Wark
2023-10-16 14:24:33 -05:00
parent 80d77117e1
commit 0a90542e8e
11 changed files with 575 additions and 286 deletions

View File

@@ -89,37 +89,37 @@ def convert_nans_to_nones(input_str) -> str|None:
return input_str
return None
def create_reagent_list(in_dict:dict) -> list[str]:
"""
Makes list of reagent types without "lot_" prefix for each key in a dictionary
# def create_reagent_list(in_dict:dict) -> list[str]:
# """
# Makes list of reagent types without "lot_" prefix for each key in a dictionary
Args:
in_dict (dict): input dictionary of reagents
# Args:
# in_dict (dict): input dictionary of reagents
Returns:
list[str]: list of reagent types with "lot_" prefix removed.
"""
return [item.strip("lot_") for item in in_dict.keys()]
# Returns:
# list[str]: list of reagent types with "lot_" prefix removed.
# """
# return [item.strip("lot_") for item in in_dict.keys()]
def retrieve_rsl_number(in_str:str) -> Tuple[str, str]:
"""
Uses regex to retrieve the plate number and submission type from an input string
DEPRECIATED. REPLACED BY RSLNamer.parsed_name
# def retrieve_rsl_number(in_str:str) -> Tuple[str, str]:
# """
# Uses regex to retrieve the plate number and submission type from an input string
# DEPRECIATED. REPLACED BY RSLNamer.parsed_name
Args:
in_str (str): string to be parsed
# Args:
# in_str (str): string to be parsed
Returns:
Tuple[str, str]: tuple of (output rsl number, submission_type)
"""
in_str = in_str.split("\\")[-1]
logger.debug(f"Attempting match of {in_str}")
regex = re.compile(r"""
(?P<wastewater>RSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?P<bacterial_culture>RSL-\d{2}-\d{4})
""", re.VERBOSE)
m = regex.search(in_str)
parsed = m.group().replace("_", "-")
return (parsed, m.lastgroup)
# Returns:
# Tuple[str, str]: tuple of (output rsl number, submission_type)
# """
# in_str = in_str.split("\\")[-1]
# logger.debug(f"Attempting match of {in_str}")
# regex = re.compile(r"""
# (?P<wastewater>RSL-?WW(?:-|_)20\d{6}(?:(?:_|-)\d(?!\d))?)|(?P<bacterial_culture>RSL-\d{2}-\d{4})
# """, re.VERBOSE)
# m = regex.search(in_str)
# parsed = m.group().replace("_", "-")
# return (parsed, m.lastgroup)
def check_regex_match(pattern:str, check:str) -> bool:
try:
@@ -134,153 +134,152 @@ def massage_common_reagents(reagent_name:str):
reagent_name = reagent_name.replace("µ", "u")
return reagent_name
class RSLNamer(object):
"""
Object that will enforce proper formatting on RSL plate names.
"""
def __init__(self, ctx, instr:str, sub_type:str|None=None):
from backend.db.functions import get_polymorphic_subclass
from backend.db.models import BasicSubmission
self.ctx = ctx
self.submission_type = sub_type
self.retrieve_rsl_number(in_str=instr)
if self.submission_type != None:
custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema
# parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}")
# parser()
self.parsed_name = self.parsed_name.replace("_", "-")
# class RSLNamer(object):
# """
# Object that will enforce proper formatting on RSL plate names.
# NOTE: Depreciated in favour of object based methods in 'submissions.py'
# """
# def __init__(self, ctx, instr:str, sub_type:str|None=None):
# self.ctx = ctx
# self.submission_type = sub_type
# self.retrieve_rsl_number(in_str=instr)
# if self.submission_type != None:
# # custom_enforcer = get_polymorphic_subclass(BasicSubmission, self.submission_type).enforce_naming_schema
# parser = getattr(self, f"enforce_{self.submission_type.replace(' ', '_').lower()}")
# parser()
# self.parsed_name = self.parsed_name.replace("_", "-")
def retrieve_rsl_number(self, in_str:str|Path):
"""
Uses regex to retrieve the plate number and submission type from an input string
# def retrieve_rsl_number(self, in_str:str|Path):
# """
# Uses regex to retrieve the plate number and submission type from an input string
Args:
in_str (str): string to be parsed
"""
if not isinstance(in_str, Path):
in_str = Path(in_str)
self.out_str = in_str.stem
logger.debug(f"Attempting match of {self.out_str}")
logger.debug(f"The initial plate name is: {self.out_str}")
regex = re.compile(r"""
# (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)|
(?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)|
(?P<bacterial_culture>RSL-?\d{2}-?\d{4})|
(?P<wastewater_artic>(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?))
""", flags = re.IGNORECASE | re.VERBOSE)
m = regex.search(self.out_str)
if m != None:
self.parsed_name = m.group().upper().strip(".")
logger.debug(f"Got parsed submission name: {self.parsed_name}")
if self.submission_type == None:
try:
self.submission_type = m.lastgroup
except AttributeError as e:
logger.critical("No RSL plate number found or submission type found!")
logger.debug(f"The cause of the above error was: {e}")
logger.warning(f"We're going to have to create the submission type from the excel sheet properties...")
if in_str.exists():
my_xl = pd.ExcelFile(in_str)
if my_xl.book.properties.category != None:
categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")]
self.submission_type = categories[0].replace(" ", "_").lower()
else:
raise AttributeError(f"File {in_str.__str__()} has no categories.")
else:
raise FileNotFoundError()
# else:
# raise ValueError(f"No parsed name could be created for {self.out_str}.")
# Args:
# in_str (str): string to be parsed
# """
# if not isinstance(in_str, Path):
# in_str = Path(in_str)
# self.out_str = in_str.stem
# logger.debug(f"Attempting match of {self.out_str}")
# logger.debug(f"The initial plate name is: {self.out_str}")
# regex = re.compile(r"""
# # (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)|
# (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)|
# (?P<bacterial_culture>RSL-?\d{2}-?\d{4})|
# (?P<wastewater_artic>(\d{4}-\d{2}-\d{2}(?:-|_)(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?))
# """, flags = re.IGNORECASE | re.VERBOSE)
# m = regex.search(self.out_str)
# if m != None:
# self.parsed_name = m.group().upper().strip(".")
# logger.debug(f"Got parsed submission name: {self.parsed_name}")
# if self.submission_type == None:
# try:
# self.submission_type = m.lastgroup
# except AttributeError as e:
# logger.critical("No RSL plate number found or submission type found!")
# logger.debug(f"The cause of the above error was: {e}")
# logger.warning(f"We're going to have to create the submission type from the excel sheet properties...")
# if in_str.exists():
# my_xl = pd.ExcelFile(in_str)
# if my_xl.book.properties.category != None:
# categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")]
# self.submission_type = categories[0].replace(" ", "_").lower()
# else:
# raise AttributeError(f"File {in_str.__str__()} has no categories.")
# else:
# raise FileNotFoundError()
# # else:
# # raise ValueError(f"No parsed name could be created for {self.out_str}.")
def enforce_wastewater(self):
"""
Uses regex to enforce proper formatting of wastewater samples
"""
def construct():
today = datetime.now()
return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
try:
self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name)
except AttributeError as e:
logger.error(f"Problem using regex: {e}")
self.parsed_name = construct()
self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW")
self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE)
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name)
logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}")
try:
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-")
logger.debug(f"Plate number is: {plate_number}")
except AttributeError as e:
plate_number = "1"
# self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name)
self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name)
logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}")
try:
repeat = re.search(r"-\dR(?P<repeat>\d)?", self.parsed_name).groupdict()['repeat']
if repeat == None:
repeat = "1"
except AttributeError as e:
repeat = ""
self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "")
# def enforce_wastewater(self):
# """
# Uses regex to enforce proper formatting of wastewater samples
# """
# def construct():
# today = datetime.now()
# return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-1"
# try:
# self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name)
# except AttributeError as e:
# logger.error(f"Problem using regex: {e}")
# self.parsed_name = construct()
# self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW")
# self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE)
# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name)
# logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}")
# try:
# plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-")
# logger.debug(f"Plate number is: {plate_number}")
# except AttributeError as e:
# plate_number = "1"
# # self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name)
# self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name)
# logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}")
# try:
# repeat = re.search(r"-\dR(?P<repeat>\d)?", self.parsed_name).groupdict()['repeat']
# if repeat == None:
# repeat = "1"
# except AttributeError as e:
# repeat = ""
# self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "")
def enforce_bacterial_culture(self):
"""
Uses regex to enforce proper formatting of bacterial culture samples
"""
def construct(ctx) -> str:
"""
DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1
# def enforce_bacterial_culture(self):
# """
# Uses regex to enforce proper formatting of bacterial culture samples
# """
# def construct(ctx) -> str:
# """
# DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1
Returns:
str: new RSL number
"""
logger.debug(f"Attempting to construct RSL number from scratch...")
# directory = Path(self.ctx['directory_path']).joinpath("Bacteria")
directory = Path(ctx.directory_path).joinpath("Bacteria")
year = str(datetime.now().year)[-2:]
if directory.exists():
logger.debug(f"Year: {year}")
relevant_rsls = []
all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]]
logger.debug(f"All rsls: {all_xlsx}")
for item in all_xlsx:
try:
relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0))
except Exception as e:
logger.error(f"Regex error: {e}")
continue
logger.debug(f"Initial xlsx: {relevant_rsls}")
max_number = max([int(item[-4:]) for item in relevant_rsls])
logger.debug(f"The largest sample number is: {max_number}")
return f"RSL-{year}-{str(max_number+1).zfill(4)}"
else:
# raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}")
return f"RSL-{year}-0000"
try:
self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE)
except AttributeError as e:
self.parsed_name = construct(ctx=self.ctx)
# year = datetime.now().year
# self.parsed_name = f"RSL-{str(year)[-2:]}-0000"
self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE)
# Returns:
# str: new RSL number
# """
# logger.debug(f"Attempting to construct RSL number from scratch...")
# # directory = Path(self.ctx['directory_path']).joinpath("Bacteria")
# directory = Path(ctx.directory_path).joinpath("Bacteria")
# year = str(datetime.now().year)[-2:]
# if directory.exists():
# logger.debug(f"Year: {year}")
# relevant_rsls = []
# all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]]
# logger.debug(f"All rsls: {all_xlsx}")
# for item in all_xlsx:
# try:
# relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0))
# except Exception as e:
# logger.error(f"Regex error: {e}")
# continue
# logger.debug(f"Initial xlsx: {relevant_rsls}")
# max_number = max([int(item[-4:]) for item in relevant_rsls])
# logger.debug(f"The largest sample number is: {max_number}")
# return f"RSL-{year}-{str(max_number+1).zfill(4)}"
# else:
# # raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}")
# return f"RSL-{year}-0000"
# try:
# self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE)
# except AttributeError as e:
# self.parsed_name = construct(ctx=self.ctx)
# # year = datetime.now().year
# # self.parsed_name = f"RSL-{str(year)[-2:]}-0000"
# self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE)
def enforce_wastewater_artic(self):
"""
Uses regex to enforce proper formatting of wastewater samples
"""
def construct():
today = datetime.now()
return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
try:
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE)
except AttributeError:
self.parsed_name = construct()
try:
plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-"))
except (AttributeError, ValueError) as e:
plate_number = 1
self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name)
# def enforce_wastewater_artic(self):
# """
# Uses regex to enforce proper formatting of wastewater samples
# """
# def construct():
# today = datetime.now()
# return f"RSL-AR-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
# try:
# self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE)
# except AttributeError:
# self.parsed_name = construct()
# try:
# plate_number = int(re.search(r"_|-\d?_", self.parsed_name).group().strip("_").strip("-"))
# except (AttributeError, ValueError) as e:
# plate_number = 1
# self.parsed_name = re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", self.parsed_name)
class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
@@ -586,25 +585,25 @@ def jinja_template_loading():
env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css")
return env
def check_is_power_user(ctx:Settings) -> bool:
"""
Check to ensure current user is in power users list.
NOTE: Depreciated in favour of 'check_authorization' below.
# def check_is_power_user(ctx:Settings) -> bool:
# """
# Check to ensure current user is in power users list.
# NOTE: Depreciated in favour of 'check_authorization' below.
Args:
ctx (dict): settings passed down from gui.
# Args:
# ctx (dict): settings passed down from gui.
Returns:
bool: True if user is in power users, else false.
"""
try:
check = getpass.getuser() in ctx.power_users
except KeyError as e:
check = False
except Exception as e:
logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
check = False
return check
# Returns:
# bool: True if user is in power users, else false.
# """
# try:
# check = getpass.getuser() in ctx.power_users
# except KeyError as e:
# check = False
# except Exception as e:
# logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
# check = False
# return check
def check_authorization(func):
def wrapper(*args, **kwargs):