Update to reagent parser to exclude 'not applicable'
This commit is contained in:
@@ -32,7 +32,6 @@ from pathlib import Path
|
||||
from jinja2.exceptions import TemplateNotFound
|
||||
from jinja2 import Template
|
||||
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
class BasicSubmission(BaseClass):
|
||||
@@ -593,7 +592,7 @@ class BasicSubmission(BaseClass):
|
||||
return input_excel
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr:str, data:dict|None=None) -> str:
|
||||
def enforce_name(cls, instr:str, data:dict|None={}) -> str:
|
||||
"""
|
||||
Custom naming method for this class.
|
||||
|
||||
@@ -604,8 +603,40 @@ class BasicSubmission(BaseClass):
|
||||
Returns:
|
||||
str: Updated name.
|
||||
"""
|
||||
logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!")
|
||||
return instr
|
||||
# logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} Enforcer!")
|
||||
# return instr
|
||||
from backend.validators import RSLNamer
|
||||
defaults = cls.get_default_info()
|
||||
data['abbreviation'] = defaults['abbreviation']
|
||||
if 'submission_type' not in data.keys() or data['submission_type'] in [None, ""]:
|
||||
data['submission_type'] = defaults['submission_type']
|
||||
# outstr = super().enforce_name(instr=instr, data=data)
|
||||
if instr in [None, ""]:
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
else:
|
||||
outstr = instr
|
||||
if re.search(rf"{data['abbreviation']}", outstr, flags=re.IGNORECASE) is None:
|
||||
outstr = re.sub(rf"RSL-?", rf"RSL-{data['abbreviation']}-", outstr, flags=re.IGNORECASE)
|
||||
try:
|
||||
outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr)
|
||||
outstr = re.sub(rf"{data['abbreviation']}(\d{6})", rf"{data['abbreviation']}-\1", outstr, flags=re.IGNORECASE).upper()
|
||||
except (AttributeError, TypeError) as e:
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
try:
|
||||
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", outstr).group().strip("_").strip("-")
|
||||
# logger.debug(f"Plate number is: {plate_number}")
|
||||
except AttributeError as e:
|
||||
plate_number = "1"
|
||||
outstr = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", outstr)
|
||||
# logger.debug(f"After addition of plate number the plate name is: {outstr}")
|
||||
try:
|
||||
repeat = re.search(r"-\dR(?P<repeat>\d)?", outstr).groupdict()['repeat']
|
||||
if repeat == None:
|
||||
repeat = "1"
|
||||
except AttributeError as e:
|
||||
repeat = ""
|
||||
return re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
|
||||
# return outstr
|
||||
|
||||
@classmethod
|
||||
def parse_pcr(cls, xl:pd.DataFrame, rsl_number:str) -> list:
|
||||
@@ -1052,42 +1083,6 @@ class BacterialCulture(BasicSubmission):
|
||||
input_excel["Sample List"].cell(row=15, column=2, value=getuser())
|
||||
return input_excel
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr:str, data:dict|None={}) -> str:
|
||||
"""
|
||||
Extends parent
|
||||
"""
|
||||
from backend.validators import RSLNamer
|
||||
defaults = cls.get_default_info()
|
||||
data['abbreviation'] = defaults['abbreviation']
|
||||
if 'submission_type' not in data.keys() or data['submission_type'] in [None, ""]:
|
||||
data['submission_type'] = defaults['submission_type']
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
if outstr in [None, ""]:
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
if re.search(rf"{data['abbreviation']}", outstr, flags=re.IGNORECASE) is None:
|
||||
outstr = re.sub(rf"RSL-?", rf"RSL-{data['abbreviation']}-", outstr, flags=re.IGNORECASE)
|
||||
try:
|
||||
outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr)
|
||||
outstr = re.sub(rf"{data['abbreviation']}(\d{6})", rf"{data['abbreviation']}-\1", outstr, flags=re.IGNORECASE).upper()
|
||||
except (AttributeError, TypeError) as e:
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
try:
|
||||
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", outstr).group().strip("_").strip("-")
|
||||
# logger.debug(f"Plate number is: {plate_number}")
|
||||
except AttributeError as e:
|
||||
plate_number = "1"
|
||||
outstr = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", outstr)
|
||||
# logger.debug(f"After addition of plate number the plate name is: {outstr}")
|
||||
try:
|
||||
repeat = re.search(r"-\dR(?P<repeat>\d)?", outstr).groupdict()['repeat']
|
||||
if repeat == None:
|
||||
repeat = "1"
|
||||
except AttributeError as e:
|
||||
repeat = ""
|
||||
return re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
|
||||
# return outstr
|
||||
|
||||
@classmethod
|
||||
def get_regex(cls) -> str:
|
||||
"""
|
||||
@@ -1227,37 +1222,17 @@ class Wastewater(BasicSubmission):
|
||||
return samples
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr:str, data:dict|None=None) -> str:
|
||||
def enforce_name(cls, instr:str, data:dict|None={}) -> str:
|
||||
"""
|
||||
Extends parent
|
||||
"""
|
||||
from backend.validators import RSLNamer
|
||||
defaults = cls.get_default_info()
|
||||
data['abbreviation'] = defaults['abbreviation']
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
try:
|
||||
outstr = re.sub(r"PCR(-|_)", "", outstr)
|
||||
# Deal with PCR file.
|
||||
instr = re.sub(r"PCR(-|_)", "", instr)
|
||||
except (AttributeError, TypeError) as e:
|
||||
logger.error(f"Problem using regex: {e}")
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
outstr = outstr.replace("RSLWW", "RSL-WW")
|
||||
outstr = re.sub(r"WW(\d{4})", r"WW-\1", outstr, flags=re.IGNORECASE)
|
||||
outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", outstr)
|
||||
# logger.debug(f"Coming out of the preliminary parsing, the plate name is {outstr}")
|
||||
try:
|
||||
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", outstr).group().strip("_").strip("-")
|
||||
# logger.debug(f"Plate number is: {plate_number}")
|
||||
except AttributeError as e:
|
||||
plate_number = "1"
|
||||
outstr = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", outstr)
|
||||
# logger.debug(f"After addition of plate number the plate name is: {outstr}")
|
||||
try:
|
||||
repeat = re.search(r"-\dR(?P<repeat>\d)?", outstr).groupdict()['repeat']
|
||||
if repeat == None:
|
||||
repeat = "1"
|
||||
except AttributeError as e:
|
||||
repeat = ""
|
||||
return re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
return outstr
|
||||
|
||||
@classmethod
|
||||
def get_regex(cls) -> str:
|
||||
@@ -1275,7 +1250,8 @@ class Wastewater(BasicSubmission):
|
||||
Extends parent
|
||||
"""
|
||||
samples = super().adjust_autofill_samples(samples)
|
||||
return [item for item in samples if not item.submitter_id.startswith("EN")]
|
||||
samples = [item for item in samples if not item.submitter_id.startswith("EN")]
|
||||
return samples
|
||||
|
||||
@classmethod
|
||||
def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
|
||||
@@ -1325,8 +1301,8 @@ class WastewaterArtic(BasicSubmission):
|
||||
return output
|
||||
|
||||
@classmethod
|
||||
def get_abbreviation(cls) -> str:
|
||||
return "AR"
|
||||
def get_default_info(cls) -> str:
|
||||
return dict(abbreviation="AR", submission_type="Wastewater Artic")
|
||||
|
||||
@classmethod
|
||||
def parse_info(cls, input_dict:dict, xl:pd.ExcelFile|None=None) -> dict:
|
||||
@@ -1350,6 +1326,28 @@ class WastewaterArtic(BasicSubmission):
|
||||
input_dict['source_plates'] = plates
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr:str, data:dict|None={}) -> str:
|
||||
"""
|
||||
Extends parent
|
||||
"""
|
||||
try:
|
||||
# Deal with PCR file.
|
||||
instr = re.sub(r"Artic", "", instr, flags=re.IGNORECASE)
|
||||
except (AttributeError, TypeError) as e:
|
||||
logger.error(f"Problem using regex: {e}")
|
||||
try:
|
||||
check = instr.startswith("RSL")
|
||||
except AttributeError:
|
||||
check = False
|
||||
if not check:
|
||||
try:
|
||||
instr = "RSL" + instr
|
||||
except:
|
||||
instr = "RSL"
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
return outstr
|
||||
|
||||
@classmethod
|
||||
def parse_samples(cls, input_dict: dict) -> dict:
|
||||
"""
|
||||
@@ -1420,27 +1418,6 @@ class WastewaterArtic(BasicSubmission):
|
||||
year = f"20{year}"
|
||||
return f"EN{year}{month}{day}-{en_num}"
|
||||
|
||||
@classmethod
|
||||
def enforce_name(cls, instr:str|None=None, data:dict|None=None) -> str:
|
||||
"""
|
||||
Extends parent
|
||||
"""
|
||||
from backend.validators import RSLNamer
|
||||
data['abbreviation'] = cls.get_abbreviation()
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
try:
|
||||
outstr = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", outstr, flags=re.IGNORECASE)
|
||||
except (AttributeError, TypeError):
|
||||
if instr != None:
|
||||
data['rsl_plate_num'] = instr
|
||||
# logger.debug(f"Create new plate name from submission parameters")
|
||||
outstr = RSLNamer.construct_new_plate_name(data=data)
|
||||
try:
|
||||
plate_number = int(re.search(r"_|-\d?_", outstr).group().strip("_").strip("-"))
|
||||
except (AttributeError, ValueError) as e:
|
||||
plate_number = 1
|
||||
return re.sub(r"(_|-\d)?_ARTIC", f"-{plate_number}", outstr)
|
||||
|
||||
@classmethod
|
||||
def get_regex(cls) -> str:
|
||||
"""
|
||||
@@ -1676,10 +1653,11 @@ class WastewaterArtic(BasicSubmission):
|
||||
self.gel_info = output
|
||||
dt = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
|
||||
com = dict(text=comment, name=getuser(), time=dt)
|
||||
if self.comment is not None:
|
||||
self.comment.append(com)
|
||||
else:
|
||||
self.comment = [com]
|
||||
if com['text'] != None and com['text'] != "":
|
||||
if self.comment is not None:
|
||||
self.comment.append(com)
|
||||
else:
|
||||
self.comment = [com]
|
||||
logger.debug(pformat(self.gel_info))
|
||||
with ZipFile(self.__directory_path__.joinpath("submission_imgs.zip"), 'a') as zipf:
|
||||
# Add a file located at the source_path to the destination within the zip
|
||||
|
||||
@@ -266,7 +266,8 @@ class ReagentParser(object):
|
||||
# logger.debug(f"Got lot for {item}-{name}: {lot} as {type(lot)}")
|
||||
lot = str(lot)
|
||||
logger.debug(f"Going into pydantic: name: {name}, lot: {lot}, expiry: {expiry}, type: {item.strip()}, comment: {comment}")
|
||||
listo.append(PydReagent(type=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, missing=missing))
|
||||
if name.lower() != "not applicable":
|
||||
listo.append(PydReagent(type=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment, missing=missing))
|
||||
return listo
|
||||
|
||||
class SampleParser(object):
|
||||
|
||||
@@ -138,19 +138,19 @@ class PydReagent(BaseModel):
|
||||
# NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
|
||||
return reagent, report
|
||||
|
||||
def toForm(self, parent:QWidget, extraction_kit:str) -> QComboBox:
|
||||
"""
|
||||
Converts this instance into a form widget
|
||||
# def toForm(self, parent:QWidget, extraction_kit:str) -> QComboBox:
|
||||
# """
|
||||
# Converts this instance into a form widget
|
||||
|
||||
Args:
|
||||
parent (QWidget): Parent widget of the constructed object
|
||||
extraction_kit (str): Name of extraction kit used
|
||||
# Args:
|
||||
# parent (QWidget): Parent widget of the constructed object
|
||||
# extraction_kit (str): Name of extraction kit used
|
||||
|
||||
Returns:
|
||||
QComboBox: Form object.
|
||||
"""
|
||||
from frontend.widgets.submission_widget import ReagentFormWidget
|
||||
return ReagentFormWidget(parent=parent, reagent=self, extraction_kit=extraction_kit)
|
||||
# Returns:
|
||||
# QComboBox: Form object.
|
||||
# """
|
||||
# from frontend.widgets.submission_widget import ReagentFormWidget
|
||||
# return ReagentFormWidget(parent=parent, reagent=self, extraction_kit=extraction_kit)
|
||||
|
||||
class PydSample(BaseModel, extra='allow'):
|
||||
|
||||
@@ -705,16 +705,19 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
# logger.debug("Sorting samples by row/column")
|
||||
samples = sorted(self.samples, key=attrgetter('column', 'row'))
|
||||
submission_obj = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
|
||||
# custom function to adjust values for writing.
|
||||
samples = submission_obj.adjust_autofill_samples(samples=samples)
|
||||
logger.debug(f"Samples: {pformat(samples)}")
|
||||
# Fail safe against multiple instances of the same sample
|
||||
for iii, sample in enumerate(samples, start=1):
|
||||
logger.debug(f"Sample: {sample}")
|
||||
# custom function to find the row of this sample
|
||||
row = submission_obj.custom_sample_autofill_row(sample, worksheet=worksheet)
|
||||
logger.debug(f"Writing to {row}")
|
||||
if row == None:
|
||||
row = sample_info['lookup_table']['start_row'] + iii
|
||||
fields = [field for field in list(sample.model_fields.keys()) + list(sample.model_extra.keys()) if field in sample_info['sample_columns'].keys()]
|
||||
logger.debug(f"Here are the fields we are going to fill:\n\t{fields}")
|
||||
for field in fields:
|
||||
column = sample_info['sample_columns'][field]
|
||||
value = getattr(sample, field)
|
||||
|
||||
@@ -280,7 +280,10 @@ class SubmissionFormWidget(QWidget):
|
||||
if key not in self.ignore:
|
||||
match value:
|
||||
case PydReagent():
|
||||
widget = self.ReagentFormWidget(self, reagent=value, extraction_kit=extraction_kit)
|
||||
if value.name.lower() != "not applicable":
|
||||
widget = self.ReagentFormWidget(self, reagent=value, extraction_kit=extraction_kit)
|
||||
else:
|
||||
widget = None
|
||||
case _:
|
||||
widget = self.InfoItem(self, key=key, value=value, submission_type=submission_type)
|
||||
return widget
|
||||
|
||||
Reference in New Issue
Block a user