Better flexibility with parsers pulling methods from database objects.
This commit is contained in:
92
src/submissions/backend/validators/__init__.py
Normal file
92
src/submissions/backend/validators/__init__.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import logging, re
|
||||
from pathlib import Path
|
||||
from openpyxl import load_workbook
|
||||
from backend.db.models import BasicSubmission
|
||||
from tools import Settings
|
||||
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
class RSLNamer(object):
|
||||
"""
|
||||
Object that will enforce proper formatting on RSL plate names.
|
||||
NOTE: Depreciated in favour of object based methods in 'submissions.py'
|
||||
"""
|
||||
def __init__(self, ctx, instr:str, sub_type:str|None=None):
|
||||
self.ctx = ctx
|
||||
self.submission_type = sub_type
|
||||
|
||||
if self.submission_type == None:
|
||||
self.submission_type = self.retrieve_submission_type(ctx=self.ctx, instr=instr)
|
||||
print(self.submission_type)
|
||||
if self.submission_type != None:
|
||||
enforcer = BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
|
||||
self.parsed_name = self.retrieve_rsl_number(instr=instr, regex=enforcer.get_regex())
|
||||
self.parsed_name = enforcer.enforce_name(ctx=ctx, instr=self.parsed_name)
|
||||
|
||||
@classmethod
|
||||
def retrieve_submission_type(cls, ctx:Settings, instr:str|Path) -> str:
|
||||
match instr:
|
||||
case Path():
|
||||
logger.debug(f"Using path method.")
|
||||
if instr.exists():
|
||||
wb = load_workbook(instr)
|
||||
try:
|
||||
submission_type = [item.strip().title() for item in wb.properties.category.split(";")][0]
|
||||
except AttributeError:
|
||||
try:
|
||||
for type in ctx.submission_types:
|
||||
# This gets the *first* submission type that matches the sheet names in the workbook
|
||||
if wb.sheetnames == ctx.submission_types[type]['excel_map']:
|
||||
submission_type = type.title()
|
||||
except:
|
||||
submission_type = cls.retrieve_submission_type(ctx=ctx, instr=instr.stem.__str__())
|
||||
case str():
|
||||
regex = BasicSubmission.construct_regex()
|
||||
logger.debug(f"Using string method.")
|
||||
m = regex.search(instr)
|
||||
try:
|
||||
submission_type = m.lastgroup
|
||||
except AttributeError as e:
|
||||
logger.critical("No RSL plate number found or submission type found!")
|
||||
case _:
|
||||
submission_type = None
|
||||
if submission_type == None:
|
||||
from frontend.custom_widgets import SubmissionTypeSelector
|
||||
dlg = SubmissionTypeSelector(ctx, title="Couldn't parse submission type.", message="Please select submission type from list below.")
|
||||
if dlg.exec():
|
||||
submission_type = dlg.parse_form()
|
||||
submission_type = submission_type.replace("_", " ")
|
||||
return submission_type
|
||||
|
||||
@classmethod
|
||||
def retrieve_rsl_number(cls, instr:str|Path, regex:str|None=None):
|
||||
"""
|
||||
Uses regex to retrieve the plate number and submission type from an input string
|
||||
|
||||
Args:
|
||||
in_str (str): string to be parsed
|
||||
"""
|
||||
if regex == None:
|
||||
regex = BasicSubmission.construct_regex()
|
||||
else:
|
||||
regex = re.compile(rf'{regex}', re.IGNORECASE | re.VERBOSE)
|
||||
match instr:
|
||||
case Path():
|
||||
m = regex.search(instr.stem)
|
||||
case str():
|
||||
logger.debug(f"Using string method.")
|
||||
m = regex.search(instr)
|
||||
case _:
|
||||
pass
|
||||
if m != None:
|
||||
try:
|
||||
parsed_name = m.group().upper().strip(".")
|
||||
except:
|
||||
parsed_name = None
|
||||
else:
|
||||
parsed_name = None
|
||||
logger.debug(f"Got parsed submission name: {parsed_name}")
|
||||
return parsed_name
|
||||
|
||||
from .pydant import *
|
||||
213
src/submissions/backend/validators/pydant.py
Normal file
213
src/submissions/backend/validators/pydant.py
Normal file
@@ -0,0 +1,213 @@
|
||||
'''
|
||||
Contains pydantic models and accompanying validators
|
||||
'''
|
||||
import uuid
|
||||
from pydantic import BaseModel, field_validator, Field
|
||||
from datetime import date, datetime
|
||||
from dateutil.parser import parse
|
||||
from dateutil.parser._parser import ParserError
|
||||
from typing import List, Any
|
||||
from . import RSLNamer
|
||||
from pathlib import Path
|
||||
import re
|
||||
import logging
|
||||
from tools import check_not_nan, convert_nans_to_nones, Settings
|
||||
from backend.db.functions import lookup_submissions
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
class PydSheetReagent(BaseModel):
|
||||
type: str|None
|
||||
lot: str|None
|
||||
exp: date|None
|
||||
name: str|None
|
||||
|
||||
@field_validator("type", mode='before')
|
||||
@classmethod
|
||||
def remove_undesired_types(cls, value):
|
||||
match value:
|
||||
case "atcc":
|
||||
return None
|
||||
case _:
|
||||
return value
|
||||
|
||||
@field_validator("lot", mode='before')
|
||||
@classmethod
|
||||
def rescue_lot_string(cls, value):
|
||||
if value != None:
|
||||
return convert_nans_to_nones(str(value))
|
||||
return value
|
||||
|
||||
@field_validator("lot")
|
||||
@classmethod
|
||||
def enforce_lot_string(cls, value):
|
||||
if value != None:
|
||||
return value.upper()
|
||||
return value
|
||||
|
||||
@field_validator("exp", mode="before")
|
||||
@classmethod
|
||||
def enforce_date(cls, value):
|
||||
if value != None:
|
||||
match value:
|
||||
case int():
|
||||
return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2).date()
|
||||
case str():
|
||||
return parse(value)
|
||||
case date():
|
||||
return value
|
||||
case _:
|
||||
return convert_nans_to_nones(str(value))
|
||||
if value == None:
|
||||
value = date.today()
|
||||
return value
|
||||
|
||||
@field_validator("name", mode="before")
|
||||
@classmethod
|
||||
def enforce_name(cls, value, values):
|
||||
if value != None:
|
||||
return convert_nans_to_nones(str(value))
|
||||
else:
|
||||
return values.data['type']
|
||||
|
||||
class PydSheetSubmission(BaseModel, extra='allow'):
|
||||
ctx: Settings
|
||||
filepath: Path
|
||||
submission_type: dict|None
|
||||
# For defaults
|
||||
submitter_plate_num: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True)
|
||||
rsl_plate_num: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True)
|
||||
submitted_date: dict|None
|
||||
submitting_lab: dict|None
|
||||
sample_count: dict|None
|
||||
extraction_kit: dict|None
|
||||
technician: dict|None
|
||||
submission_category: dict|None = Field(default=dict(value=None, parsed=False), validate_default=True)
|
||||
reagents: List[dict] = []
|
||||
samples: List[Any]
|
||||
|
||||
@field_validator("submitter_plate_num")
|
||||
@classmethod
|
||||
def enforce_with_uuid(cls, value):
|
||||
logger.debug(f"submitter plate id: {value}")
|
||||
if value['value'] == None or value['value'] == "None":
|
||||
return dict(value=uuid.uuid4().hex.upper(), parsed=False)
|
||||
else:
|
||||
return value
|
||||
|
||||
@field_validator("submitted_date", mode="before")
|
||||
@classmethod
|
||||
def rescue_date(cls, value):
|
||||
if value == None:
|
||||
return dict(value=date.today(), parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("submitted_date")
|
||||
@classmethod
|
||||
def strip_datetime_string(cls, value):
|
||||
if isinstance(value['value'], datetime):
|
||||
return value
|
||||
if isinstance(value['value'], date):
|
||||
return value
|
||||
if isinstance(value['value'], int):
|
||||
return dict(value=datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2).date(), parsed=False)
|
||||
string = re.sub(r"(_|-)\d$", "", value['value'])
|
||||
try:
|
||||
output = dict(value=parse(string).date(), parsed=False)
|
||||
except ParserError as e:
|
||||
logger.error(f"Problem parsing date: {e}")
|
||||
try:
|
||||
output = dict(value=parse(string.replace("-","")).date(), parsed=False)
|
||||
except Exception as e:
|
||||
logger.error(f"Problem with parse fallback: {e}")
|
||||
return output
|
||||
|
||||
@field_validator("submitting_lab", mode="before")
|
||||
@classmethod
|
||||
def rescue_submitting_lab(cls, value):
|
||||
if value == None:
|
||||
return dict(value=None, parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("rsl_plate_num", mode='before')
|
||||
@classmethod
|
||||
def rescue_rsl_number(cls, value):
|
||||
if value == None:
|
||||
return dict(value=None, parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("rsl_plate_num")
|
||||
@classmethod
|
||||
def rsl_from_file(cls, value, values):
|
||||
logger.debug(f"RSL-plate initial value: {value['value']}")
|
||||
sub_type = values.data['submission_type']['value']
|
||||
if check_not_nan(value['value']):
|
||||
if lookup_submissions(ctx=values.data['ctx'], rsl_number=value['value']) == None:
|
||||
return dict(value=value['value'], parsed=True)
|
||||
else:
|
||||
logger.warning(f"Submission number {value} already exists in DB, attempting salvage with filepath")
|
||||
# output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name
|
||||
output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name
|
||||
return dict(value=output, parsed=False)
|
||||
else:
|
||||
output = RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__(), sub_type=sub_type).parsed_name
|
||||
return dict(value=output, parsed=False)
|
||||
|
||||
@field_validator("technician", mode="before")
|
||||
@classmethod
|
||||
def rescue_tech(cls, value):
|
||||
if value == None:
|
||||
return dict(value=None, parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("technician")
|
||||
@classmethod
|
||||
def enforce_tech(cls, value):
|
||||
if check_not_nan(value['value']):
|
||||
value['value'] = re.sub(r"\: \d", "", value['value'])
|
||||
return value
|
||||
else:
|
||||
return dict(value=convert_nans_to_nones(value['value']), parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("sample_count", mode='before')
|
||||
@classmethod
|
||||
def rescue_sample_count(cls, value):
|
||||
if value == None:
|
||||
return dict(value=None, parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("extraction_kit", mode='before')
|
||||
@classmethod
|
||||
def rescue_kit(cls, value):
|
||||
|
||||
if check_not_nan(value):
|
||||
if isinstance(value, str):
|
||||
return dict(value=value, parsed=True)
|
||||
elif isinstance(value, dict):
|
||||
return value
|
||||
else:
|
||||
raise ValueError(f"No extraction kit found.")
|
||||
if value == None:
|
||||
return dict(value=None, parsed=False)
|
||||
return value
|
||||
|
||||
@field_validator("submission_type", mode='before')
|
||||
@classmethod
|
||||
def make_submission_type(cls, value, values):
|
||||
if not isinstance(value, dict):
|
||||
value = {"value": value}
|
||||
if check_not_nan(value['value']):
|
||||
value = value['value'].title()
|
||||
return dict(value=value, parsed=True)
|
||||
# else:
|
||||
# return dict(value="RSL Name not found.")
|
||||
else:
|
||||
return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False)
|
||||
|
||||
@field_validator("submission_category")
|
||||
@classmethod
|
||||
def rescue_category(cls, value, values):
|
||||
if value['value'] not in ["Research", "Diagnostic", "Surveillance"]:
|
||||
value['value'] = values.data['submission_type']['value']
|
||||
return value
|
||||
Reference in New Issue
Block a user