Added custom validation to PydSubmission to replace 'finalize_parse'
This commit is contained in:
@@ -817,7 +817,7 @@ class BasicSubmission(BaseClass):
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict:
|
||||
def custom_validation(cls, pyd:"PydSubmission") -> dict:
|
||||
"""
|
||||
Performs any final custom parsing of the excel file.
|
||||
|
||||
@@ -831,7 +831,7 @@ class BasicSubmission(BaseClass):
|
||||
dict: Updated parser product.
|
||||
"""
|
||||
logger.info(f"Called {cls.__mapper_args__['polymorphic_identity']} finalizer")
|
||||
return input_dict
|
||||
return pyd
|
||||
|
||||
@classmethod
|
||||
def custom_info_writer(cls, input_excel: Workbook, info: dict | None = None, backup: bool = False,
|
||||
@@ -1390,7 +1390,7 @@ class BacterialCulture(BasicSubmission):
|
||||
return template
|
||||
|
||||
@classmethod
|
||||
def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict:
|
||||
def custom_validation(cls, pyd) -> dict:
|
||||
"""
|
||||
Extends parent. Currently finds control sample and adds to reagents.
|
||||
|
||||
@@ -1403,24 +1403,24 @@ class BacterialCulture(BasicSubmission):
|
||||
dict: Updated dictionary.
|
||||
"""
|
||||
from . import ControlType
|
||||
input_dict = super().finalize_parse(input_dict, xl, info_map)
|
||||
pyd = super().custom_validation(pyd)
|
||||
# NOTE: build regex for all control types that have targets
|
||||
regex = ControlType.build_positive_regex()
|
||||
# NOTE: search samples for match
|
||||
for sample in input_dict['samples']:
|
||||
matched = regex.match(sample['submitter_id'])
|
||||
for sample in pyd.samples:
|
||||
matched = regex.match(sample.submitter_id)
|
||||
if bool(matched):
|
||||
# logger.debug(f"Control match found: {sample['submitter_id']}")
|
||||
new_lot = matched.group()
|
||||
try:
|
||||
pos_control_reg = \
|
||||
next(reg for reg in input_dict['reagents'] if reg['role'] == "Bacterial-Positive Control")
|
||||
next(reg for reg in pyd.reagents if reg.role == "Bacterial-Positive Control")
|
||||
except StopIteration:
|
||||
logger.error(f"No positive control reagent listed")
|
||||
return input_dict
|
||||
pos_control_reg['lot'] = new_lot
|
||||
pos_control_reg['missing'] = False
|
||||
return input_dict
|
||||
return pyd
|
||||
pos_control_reg.lot = new_lot
|
||||
pos_control_reg.missing = False
|
||||
return pyd
|
||||
|
||||
# @classmethod
|
||||
# def custom_sample_autofill_row(cls, sample, worksheet: Worksheet) -> int:
|
||||
@@ -1812,6 +1812,21 @@ class WastewaterArtic(BasicSubmission):
|
||||
continue
|
||||
else:
|
||||
datum['plate'] = RSLNamer(filename=datum['plate'], sub_type="Wastewater").parsed_name
|
||||
if xl is not None:
|
||||
try:
|
||||
input_dict['csv'] = xl["hitpicks_csv_to_export"]
|
||||
except KeyError as e:
|
||||
logger.error(e)
|
||||
try:
|
||||
match input_dict['rsl_plate_num']:
|
||||
case dict():
|
||||
input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']]
|
||||
case str():
|
||||
input_dict['csv'] = xl[input_dict['rsl_plate_num']]
|
||||
case _:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling couldn't get csv due to: {e}")
|
||||
input_dict['source_plates'] = data
|
||||
egel_info_section = custom_fields['egel_info']
|
||||
ws = xl[egel_info_section['sheet']]
|
||||
@@ -1992,7 +2007,7 @@ class WastewaterArtic(BasicSubmission):
|
||||
return final_en_name
|
||||
|
||||
@classmethod
|
||||
def finalize_parse(cls, input_dict: dict, xl: pd.ExcelFile | None = None, info_map: dict | None = None) -> dict:
|
||||
def custom_validation(cls, pyd) -> dict:
|
||||
"""
|
||||
Performs any final custom parsing of the excel file. Extends parent
|
||||
|
||||
@@ -2005,13 +2020,13 @@ class WastewaterArtic(BasicSubmission):
|
||||
Returns:
|
||||
dict: Updated parser product.
|
||||
"""
|
||||
input_dict = super().finalize_parse(input_dict, xl, info_map)
|
||||
input_dict = super().custom_validation(pyd)
|
||||
# logger.debug(f"Incoming input_dict: {pformat(input_dict)}")
|
||||
for sample in input_dict['samples']:
|
||||
for sample in pyd.samples:
|
||||
# logger.debug(f"Sample: {sample}")
|
||||
if re.search(r"^NTC", sample['submitter_id']):
|
||||
sample['submitter_id'] = f"{sample['submitter_id']}-WWG-{input_dict['rsl_plate_num']['value']}"
|
||||
input_dict['csv'] = xl["hitpicks_csv_to_export"]
|
||||
if re.search(r"^NTC", sample.submitter_id):
|
||||
sample.submitter_id = f"{sample.submitter_id}-WWG-{pyd.rsl_plate_num}"
|
||||
# input_dict['csv'] = xl["hitpicks_csv_to_export"]
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -53,7 +53,7 @@ class SheetParser(object):
|
||||
self.parse_samples()
|
||||
self.parse_equipment()
|
||||
self.parse_tips()
|
||||
self.finalize_parse()
|
||||
# self.finalize_parse()
|
||||
# logger.debug(f"Parser.sub after info scrape: {pformat(self.sub)}")
|
||||
|
||||
def parse_info(self):
|
||||
@@ -98,7 +98,7 @@ class SheetParser(object):
|
||||
# logger.debug(f"Parsing reagents for {extraction_kit}")
|
||||
parser = ReagentParser(xl=self.xl, submission_type=self.submission_type,
|
||||
extraction_kit=extraction_kit)
|
||||
self.sub['reagents'] = [reagent for reagent in parser.parse_reagents()]
|
||||
self.sub['reagents'] = parser.parse_reagents()
|
||||
|
||||
def parse_samples(self):
|
||||
"""
|
||||
@@ -112,14 +112,14 @@ class SheetParser(object):
|
||||
Calls equipment parser to pull info from the excel sheet
|
||||
"""
|
||||
parser = EquipmentParser(xl=self.xl, submission_type=self.submission_type)
|
||||
self.sub['equipment'] = [equipment for equipment in parser.parse_equipment()]
|
||||
self.sub['equipment'] = parser.parse_equipment()
|
||||
|
||||
def parse_tips(self):
|
||||
"""
|
||||
Calls tips parser to pull info from the excel sheet
|
||||
"""
|
||||
parser = TipParser(xl=self.xl, submission_type=self.submission_type)
|
||||
self.sub['tips'] = [tip for tip in parser.parse_tips()]
|
||||
self.sub['tips'] = parser.parse_tips()
|
||||
|
||||
def import_kit_validation_check(self):
|
||||
"""
|
||||
@@ -137,12 +137,6 @@ class SheetParser(object):
|
||||
if isinstance(self.sub['extraction_kit'], str):
|
||||
self.sub['extraction_kit'] = dict(value=self.sub['extraction_kit'], missing=True)
|
||||
|
||||
def finalize_parse(self):
|
||||
"""
|
||||
Run custom final validations of data for submission subclasses.
|
||||
"""
|
||||
self.sub = self.sub_object.finalize_parse(input_dict=self.sub, xl=self.xl, info_map=self.info_map)
|
||||
|
||||
def to_pydantic(self) -> PydSubmission:
|
||||
"""
|
||||
Generates a pydantic model of scraped data for validation
|
||||
@@ -172,7 +166,7 @@ class SheetParser(object):
|
||||
pyd_dict['tips'] = [PydTips(**tips) for tips in self.sub['tips']]
|
||||
else:
|
||||
pyd_dict['tips'] = None
|
||||
psm = PydSubmission(filepath=self.filepath, **pyd_dict)
|
||||
psm = PydSubmission(filepath=self.filepath, run_custom=True, **pyd_dict)
|
||||
return psm
|
||||
|
||||
|
||||
@@ -524,6 +518,8 @@ class SampleParser(object):
|
||||
merge_on_id = self.sample_info_map['lookup_table']['merge_on_id']
|
||||
plate_map_samples = sorted(copy(self.plate_map_samples), key=lambda d: d['id'])
|
||||
lookup_samples = sorted(copy(self.lookup_samples), key=lambda d: d[merge_on_id])
|
||||
print(pformat(plate_map_samples))
|
||||
print(pformat(lookup_samples))
|
||||
for ii, psample in enumerate(plate_map_samples):
|
||||
try:
|
||||
check = psample['id'] == lookup_samples[ii][merge_on_id]
|
||||
|
||||
@@ -637,12 +637,16 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
else:
|
||||
return value
|
||||
|
||||
def __init__(self, **data):
|
||||
def __init__(self, run_custom:bool=False, **data):
|
||||
super().__init__(**data)
|
||||
# this could also be done with default_factory
|
||||
# NOTE: this could also be done with default_factory
|
||||
logger.debug(data)
|
||||
self.submission_object = BasicSubmission.find_polymorphic_subclass(
|
||||
polymorphic_identity=self.submission_type['value'])
|
||||
self.namer = RSLNamer(self.rsl_plate_num['value'], sub_type=self.submission_type['value'])
|
||||
if run_custom:
|
||||
self.submission_object.custom_validation(pyd=self)
|
||||
|
||||
|
||||
def set_attribute(self, key: str, value):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user