Updated parser functions to include identifiers.

This commit is contained in:
lwark
2025-06-11 13:18:01 -05:00
parent 592073c2a1
commit 90dc97683f
7 changed files with 254 additions and 25 deletions

View File

@@ -8,6 +8,7 @@ from dateutil.parser import parse
from pandas import DataFrame
from pydantic import BaseModel
from sqlalchemy import Column, INTEGER, String, JSON
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session, InstrumentedAttribute, ColumnProperty
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.exc import ArgumentError
@@ -23,7 +24,7 @@ if 'pytest' in sys.modules:
# NOTE: For inheriting in LogMixin
Base: DeclarativeMeta = declarative_base()
logger = logging.getLogger(f"procedure.{__name__}")
logger = logging.getLogger(f"submissions.{__name__}")
class BaseClass(Base):
@@ -235,7 +236,7 @@ class BaseClass(Base):
def query_or_create(cls, **kwargs) -> Tuple[Any, bool]:
new = False
allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute)]
# and not isinstance(v.property, _RelationshipDeclared)]
# and not isinstance(v.property, _RelationshipDeclared)]
sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed}
logger.debug(f"Sanitized kwargs: {sanitized_kwargs}")
instance = cls.query(**sanitized_kwargs)
@@ -389,7 +390,7 @@ class BaseClass(Base):
try:
template = env.get_template(temp_name)
except TemplateNotFound as e:
# logger.error(f"Couldn't find template {e}")
# logger.error(f"Couldn't find template {e}")
template = env.get_template("details.html")
return template
@@ -553,9 +554,23 @@ class BaseClass(Base):
output_date = datetime.combine(output_date, addition_time).strftime("%Y-%m-%d %H:%M:%S")
return output_date
def details_dict(self):
dicto = {k:v for k,v in self.__dict__.items() if not k.startswith("_")}
def details_dict(self, **kwargs):
relevant = {k: v for k, v in self.__class__.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)}
output = {}
for k, v in relevant.items():
try:
check = v.foreign_keys
except AttributeError:
check = False
if check:
continue
value = getattr(self, k)
match value:
case datetime():
value = value.strftime()
output[k] = value
return output
class LogMixin(Base):

View File

@@ -1046,6 +1046,52 @@ class SubmissionType(BaseClass):
dicto = dict()
return dicto
@classproperty
def regex(cls) -> re.Pattern:
"""
Constructs catchall regex.
Returns:
re.Pattern: Regular expression pattern to discriminate between procedure types.
"""
res = [st.defaults['regex'] for st in cls.query() if st.defaults]
rstring = rf'{"|".join(res)}'
regex = re.compile(rstring, flags=re.IGNORECASE | re.VERBOSE)
return regex
@classmethod
def get_regex(cls, submission_type: SubmissionType | str | None = None) -> re.Pattern:
"""
Gets the regex string for identifying a certain class of procedure.
Args:
submission_type (SubmissionType | str | None, optional): procedure type of interest. Defaults to None.
Returns:
str: String from which regex will be compiled.
"""
# logger.debug(f"Class for regex: {cls}")
logger.debug(f"Looking for {submission_type}")
if not isinstance(submission_type, SubmissionType):
submission_type = cls.query(name=submission_type)
if isinstance(submission_type, list):
if len(submission_type) > 1:
regex = "|".join([item.defaults['regex'] for item in submission_type])
else:
regex = submission_type[0].defaults['regex']
else:
try:
regex = submission_type.defaults['regex']
except AttributeError as e:
logger.error(f"Couldn't get submission type for {submission_type.name}")
regex = None
try:
regex = re.compile(rf"{regex}", flags=re.IGNORECASE | re.VERBOSE)
except re.error as e:
regex = None
# logger.debug(f"Returning regex: {regex}")
return regex
class ProcedureType(BaseClass):
id = Column(INTEGER, primary_key=True)
@@ -1226,6 +1272,7 @@ class ProcedureType(BaseClass):
def total_wells(self):
return self.plate_rows * self.plate_columns
class Procedure(BaseClass):
id = Column(INTEGER, primary_key=True)
name = Column(String, unique=True)

View File

@@ -8,6 +8,7 @@ from openpyxl import load_workbook
from pandas import DataFrame
from backend.validators import pydant
from backend.db.models import Procedure
from dataclasses import dataclass
logger = logging.getLogger(f"submissions.{__name__}")
@@ -16,16 +17,34 @@ class DefaultParser(object):
def __repr__(self):
return f"{self.__class__.__name__}<{self.filepath.stem}>"
def __new__(cls, *args, **kwargs):
filepath = kwargs['filepath']
if isinstance(filepath, str):
filepath = Path(filepath)
try:
assert filepath.exists()
except AssertionError:
raise FileNotFoundError(f"File {filepath} does not exist.")
instance = super().__new__(cls)
instance.filepath = filepath
return instance
def __init__(self, filepath: Path | str, procedure: Procedure|None=None, range_dict: dict | None = None, *args, **kwargs):
"""
Args:
filepath (Path|str): Must be given as a kwarg. eg. filepath=X
procedure ():
range_dict ():
*args ():
**kwargs ():
"""
self.procedure = procedure
try:
self._pyd_object = getattr(pydant, f"Pyd{self.__class__.__name__.replace('Parser', '')}")
except AttributeError:
self._pyd_object = pydant.PydResults
if isinstance(filepath, str):
self.filepath = Path(filepath)
else:
self.filepath = filepath
self.workbook = load_workbook(self.filepath, data_only=True)
if not range_dict:
self.range_dict = self.__class__.default_range_dict

View File

@@ -2,15 +2,69 @@
"""
import logging
from pathlib import Path
from string import ascii_lowercase
from typing import Generator
from openpyxl.reader.excel import load_workbook
from tools import row_keys
from backend.db.models import SubmissionType
from . import DefaultKEYVALUEParser, DefaultTABLEParser
logger = logging.getLogger(f"submissions.{__name__}")
class ClientSubmissionParser(DefaultKEYVALUEParser):
class SubmissionTyperMixin(object):
@classmethod
def retrieve_submissiontype(cls, filepath: Path):
# NOTE: Attempt 1, get from form properties:
sub_type = cls.get_subtype_from_properties(filepath=filepath)
if not sub_type:
# NOTE: Attempt 2, get by opening file and using default parser
logger.warning(
f"Getting submissiontype from file properties failed, falling back on preparse.\nDepending on excel structure this might yield an incorrect submissiontype")
sub_type = cls.get_subtype_from_preparse(filepath=filepath)
if not sub_type:
logger.warning(
f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on excel structure this might yield an incorrect submissiontype")
sub_type = cls.get_subtype_from_regex(filepath=filepath)
return sub_type
@classmethod
def get_subtype_from_regex(cls, filepath: Path):
regex = SubmissionType.regex
m = regex.search(filepath.__str__())
try:
sub_type = m.lastgroup
except AttributeError as e:
sub_type = None
logger.critical(f"No procedure type found or procedure type found!: {e}")
return sub_type
@classmethod
def get_subtype_from_preparse(cls, filepath: Path):
parser = ClientSubmissionParser(filepath)
sub_type = next((value for k, value in parser.parsed_info if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
sub_type = None
return sub_type
@classmethod
def get_subtype_from_properties(cls, filepath: Path):
wb = load_workbook(filepath)
# NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";")
sub_type = next((item.strip().title() for item in categories), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
sub_type = None
return sub_type
class ClientSubmissionParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter info from "sample list" sheet
"""
@@ -23,11 +77,16 @@ class ClientSubmissionParser(DefaultKEYVALUEParser):
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, *args, **kwargs):
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.info_map
super().__init__(filepath=filepath, **kwargs)
class SampleParser(DefaultTABLEParser):
class ClientSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
"""
Object for retrieving submitter info from "sample list" sheet
Object for retrieving submitter samples from "sample list" sheet
"""
default_range_dict = [dict(
@@ -36,6 +95,12 @@ class SampleParser(DefaultTABLEParser):
sheet="Sample List"
)]
def __init__(self, filepath: Path | str, *args, **kwargs):
self.submissiontype = self.retrieve_submissiontype(filepath=filepath)
if "range_dict" not in kwargs:
kwargs['range_dict'] = self.submissiontype.sample_map
super().__init__(filepath=filepath, **kwargs)
@property
def parsed_info(self) -> Generator[dict, None, None]:
output = super().parsed_info

View File

@@ -13,6 +13,73 @@ from datetime import datetime
logger = logging.getLogger(f"submissions.{__name__}")
class DefaultNamer(object):
def __init__(self, filepath: str | Path, **kwargs):
if isinstance(filepath, str):
filepath = Path(filepath)
try:
assert filepath.exists()
except AssertionError:
raise FileNotFoundError(f"File {filepath} does not exist.")
self.filepath = filepath
class ClientSubmissionNamer(DefaultNamer):
def __init__(self, filepath: str | Path, submissiontype: str|SubmissionType|None=None,
data: dict | None = None, **kwargs):
super().__init__(filepath=filepath)
if not submissiontype:
submissiontype = self.retrieve_submissiontype(filepath=self.filepath)
if isinstance(submissiontype, str):
submissiontype = SubmissionType.query(name=submissiontype)
def retrieve_submissiontype(self, filepath: str | Path):
# NOTE: Attempt 1, get from form properties:
sub_type = self.get_subtype_from_properties()
if not sub_type:
# NOTE: Attempt 2, get by opening file and using default parser
logger.warning(f"Getting submissiontype from file properties failed, falling back on preparse.\nDepending on excel structure this might yield an incorrect submissiontype")
sub_type = self.get_subtype_from_preparse()
if not sub_type:
logger.warning(f"Getting submissiontype from preparse failed, falling back on filename regex.\nDepending on excel structure this might yield an incorrect submissiontype")
sub_type = self.get_subtype_from_regex()
return sub_type
def get_subtype_from_regex(self):
regex = SubmissionType.regex
m = regex.search(self.filepath.__str__())
try:
sub_type = m.lastgroup
except AttributeError as e:
sub_type = None
logger.critical(f"No procedure type found or procedure type found!: {e}")
return sub_type
def get_subtype_from_preparse(self):
from backend.excel.parsers.submission_parser import ClientSubmissionParser
parser = ClientSubmissionParser(self.filepath)
sub_type = next((value for k, value in parser.parsed_info if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
sub_type = None
return sub_type
def get_subtype_from_properties(self):
wb = load_workbook(self.filepath)
# NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";")
sub_type = next((item.strip().title() for item in categories), None)
sub_type = SubmissionType.query(name=sub_type)
if isinstance(sub_type, list):
sub_type = None
return sub_type
class RSLNamer(object):
"""
@@ -25,16 +92,17 @@ class RSLNamer(object):
self.submission_type = submission_type
if not self.submission_type:
self.submission_type = self.retrieve_submission_type(filename=filename)
logger.info(f"got procedure type: {self.submission_type}")
logger.info(f"got submission type: {self.submission_type}")
if self.submission_type:
self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
# self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = SubmissionType.query(name=submission_type, limit=1)
self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex(
submission_type=submission_type))
if not data:
data = dict(submission_type=self.submission_type)
if "proceduretype" not in data.keys():
data['proceduretype'] = self.submission_type
self.parsed_name = self.sub_object.enforce_name(instr=self.parsed_name, data=data)
submission_type=self.submission_type))
# if not data:
# data = dict(submission_type=self.submission_type)
# if "proceduretype" not in data.keys():
# data['proceduretype'] = self.submission_type
# self.parsed_name = self.sub_object.enforce_name(instr=self.parsed_name, data=data)
logger.info(f"Parsed name: {self.parsed_name}")
@classmethod
@@ -83,7 +151,7 @@ class RSLNamer(object):
def st_from_str(file_name: str) -> str:
if file_name.startswith("tmp"):
return "Bacterial Culture"
regex = BasicRun.regex
regex = SubmissionType.regex
m = regex.search(file_name)
try:
sub_type = m.lastgroup

View File

@@ -10,7 +10,7 @@ from .functions import select_open_file, select_save_file
import logging
from pathlib import Path
from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent
from backend.excel import ClientSubmissionParser, SampleParser
from backend.excel import ClientSubmissionParser, ClientSampleParser
from backend.validators import PydSubmission, PydReagent, PydClientSubmission, PydSample
from backend.db import (
ClientLab, SubmissionType, Reagent,
@@ -129,12 +129,12 @@ class SubmissionFormContainer(QWidget):
self.clientsubmissionparser = ClientSubmissionParser(filepath=fname)
try:
# self.prsr = SheetParser(filepath=fname)
self.sampleparser = SampleParser(filepath=fname)
self.sampleparser = ClientSampleParser(filepath=fname)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
return
except AttributeError:
self.sampleparser = SampleParser(filepath=fname)
self.sampleparser = ClientSampleParser(filepath=fname)
self.pydclientsubmission = self.clientsubmissionparser.to_pydantic()
self.pydsamples = self.sampleparser.to_pydantic()
# logger.debug(f"Samples: {pformat(self.pydclientsubmission.sample)}")

View File

@@ -0,0 +1,15 @@
{% extends "details.html" %}
<head>
{% block head %}
{{ super() }}
<title>Procedure Details for {{ procedure['name'] }}</title>
{% endblock %}
</head>
<body>
{% block body %}
{% endblock %}
</body>