Writers and managers

This commit is contained in:
lwark
2025-07-10 14:06:30 -05:00
parent d5961f42a5
commit 2386c7a8ff
16 changed files with 404 additions and 297 deletions

View File

@@ -5,7 +5,7 @@ from tools import Settings
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
logger = logging.getLogger(f"procedure.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
def import_irida(ctx: Settings): def import_irida(ctx: Settings):
""" """

View File

@@ -22,7 +22,7 @@ if TYPE_CHECKING:
from backend.db.models.submissions import Run, ProcedureSampleAssociation from backend.db.models.submissions import Run, ProcedureSampleAssociation
from backend.validators.pydant import PydSample, PydResults from backend.validators.pydant import PydSample, PydResults
logger = logging.getLogger(f'procedure.{__name__}') logger = logging.getLogger(f'submissions.{__name__}')
reagentrole_reagent = Table( reagentrole_reagent = Table(
"_reagentrole_reagent", "_reagentrole_reagent",
@@ -1097,9 +1097,9 @@ class SubmissionType(BaseClass):
str: String from which regex will be compiled. str: String from which regex will be compiled.
""" """
# logger.debug(f"Class for regex: {cls}") # logger.debug(f"Class for regex: {cls}")
logger.debug(f"Looking for {submission_type}") # logger.debug(f"Looking for {submission_type}")
if not isinstance(submission_type, SubmissionType): if not isinstance(submission_type, SubmissionType):
submission_type = cls.query(name=submission_type) submission_type = cls.query(name=submission_type['name'])
if isinstance(submission_type, list): if isinstance(submission_type, list):
if len(submission_type) > 1: if len(submission_type) > 1:
regex = "|".join([item.defaults['regex'] for item in submission_type]) regex = "|".join([item.defaults['regex'] for item in submission_type])
@@ -1176,7 +1176,43 @@ class ProcedureType(BaseClass):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.allowed_result_methods = dict() self.allowed_result_methods = dict()
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]: @property
def template_file_sheets(self) -> List[str]:
"""
Gets names of sheet in the stored blank form.
Returns:
List[str]: List of sheet names
"""
try:
return ExcelFile(BytesIO(self.template_file), engine="openpyxl").sheet_names
except zipfile.BadZipfile:
return []
def set_template_file(self, filepath: Path | str):
"""
Sets the binary store to an Excel file.
Args:
filepath (Path | str): Path to the template file.
Raises:
ValueError: Raised if file is not Excel file.
"""
if isinstance(filepath, str):
filepath = Path(filepath)
try:
ExcelFile(filepath)
except ValueError:
raise ValueError(f"File {filepath} is not of appropriate type.")
with open(filepath, "rb") as f:
data = f.read()
self.template_file = data
self.save()
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
""" """
Make a map of all locations for tips or equipment. Make a map of all locations for tips or equipment.

View File

@@ -22,6 +22,7 @@ from sqlalchemy.ext.hybrid import hybrid_property
from frontend.widgets.functions import select_save_file from frontend.widgets.functions import select_save_file
from . import Base, BaseClass, Reagent, SubmissionType, KitType, ClientLab, Contact, LogMixin, Procedure, \ from . import Base, BaseClass, Reagent, SubmissionType, KitType, ClientLab, Contact, LogMixin, Procedure, \
kittype_procedure kittype_procedure
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence
from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.attributes import flag_modified
@@ -43,7 +44,7 @@ from PIL import Image
if TYPE_CHECKING: if TYPE_CHECKING:
from backend.db.models.kits import ProcedureType, Procedure from backend.db.models.kits import ProcedureType, Procedure
logger = logging.getLogger(f"procedure.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class ClientSubmission(BaseClass, LogMixin): class ClientSubmission(BaseClass, LogMixin):
@@ -630,6 +631,10 @@ class Run(BaseClass, LogMixin):
output["completed_date"] = self.completed_date output["completed_date"] = self.completed_date
return output return output
@property
def sample_count(self):
return len(self.sample)
def details_dict(self, **kwargs): def details_dict(self, **kwargs):
output = super().details_dict() output = super().details_dict()
output['plate_number'] = self.plate_number output['plate_number'] = self.plate_number
@@ -651,7 +656,7 @@ class Run(BaseClass, LogMixin):
output['permission'] = is_power_user() output['permission'] = is_power_user()
output['excluded'] = ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom', output['excluded'] = ['procedure', "runsampleassociation", 'excluded', 'expanded', 'sample', 'id', 'custom',
'permission'] 'permission']
output['sample_count'] = self.sample_count
return output return output
@classmethod @classmethod
@@ -923,37 +928,24 @@ class Run(BaseClass, LogMixin):
Returns: Returns:
PydSubmission: converted object. PydSubmission: converted object.
""" """
from backend.validators import PydSubmission from backend.validators import PydRun
dicto = self.to_dict(full_data=True, backup=backup) dicto = self.details_dict(full_data=True, backup=backup)
new_dict = {} new_dict = {}
for key, value in dicto.items(): for key, value in dicto.items():
missing = value in ['', 'None', None] missing = value in ['', 'None', None]
match key: match key:
case "reagents":
field_value = [item.to_pydantic(kittype=self.extraction_kit) for item in
self.submission_reagent_associations]
case "sample": case "sample":
field_value = [item.to_pydantic() for item in self.submission_sample_associations] field_value = [item.to_pydantic() for item in self.runsampleassociation]
case "equipment":
field_value = [item.to_pydantic() for item in self.submission_equipment_associations]
case "control":
try:
field_value = [item.to_pydantic() for item in self.__getattribute__(key)]
except TypeError as e:
logger.error(f"Error converting {key} to pydantic :{e}")
continue
case "tips":
field_value = [item.to_pydantic() for item in self.submission_tips_associations]
case "proceduretype":
field_value = dict(value=self.__getattribute__(key).name, missing=missing)
case "plate_number": case "plate_number":
key = 'name' key = 'rsl_plate_number'
field_value = dict(value=self.rsl_plate_number, missing=missing) field_value = dict(value=self.rsl_plate_number, missing=missing)
case "submitter_plate_number": new_dict['name'] = field_value
key = "submitter_plate_id"
field_value = dict(value=self.submitter_plate_number, missing=missing)
case "id": case "id":
continue continue
case "clientsubmission":
field_value = self.clientsubmission.to_pydantic()
case "procedure":
field_value = [item.to_pydantic() for item in self.procedure]
case _: case _:
try: try:
key = key.lower().replace(" ", "_") key = key.lower().replace(" ", "_")
@@ -967,7 +959,7 @@ class Run(BaseClass, LogMixin):
new_dict[key] = field_value new_dict[key] = field_value
new_dict['filepath'] = Path(tempfile.TemporaryFile().name) new_dict['filepath'] = Path(tempfile.TemporaryFile().name)
dicto.update(new_dict) dicto.update(new_dict)
return PydSubmission(**dicto) return PydRun(**dicto)
def save(self, original: bool = True): def save(self, original: bool = True):
""" """
@@ -1289,14 +1281,12 @@ class Run(BaseClass, LogMixin):
self.save(original=False) self.save(original=False)
def export(self, obj, output_filepath: str | Path | None = None): def export(self, obj, output_filepath: str | Path | None = None):
from backend.excel import writers from backend import managers
clientsubmission_pyd = self.clientsubmission.to_pydantic()
if not output_filepath: if not output_filepath:
output_filepath = select_save_file(obj=obj, default_name=self.construct_filename(), extension="xlsx") output_filepath = select_save_file(obj=obj, default_name=self.construct_filename(), extension="xlsx")
Writer = getattr(writers, "ClientSubmissionWriter") Manager = getattr(managers, f"Default{self.__class__.__name__}Manager")
writer = Writer(output_filepath=output_filepath, pydant_obj=clientsubmission_pyd, manager = Manager(parent=obj, input_object=self.to_pydantic())
range_dict=self.clientsubmission.range_dict) workbook = manager.write()
workbook: openpyxl.Workbook = writer.write_info()
workbook.save(filename=output_filepath) workbook.save(filename=output_filepath)
def construct_filename(self): def construct_filename(self):
@@ -1970,13 +1960,13 @@ class RunSampleAssociation(BaseClass):
# NOTE: Get associated sample info # NOTE: Get associated sample info
sample = self.sample.to_sub_dict() sample = self.sample.to_sub_dict()
sample['name'] = self.sample.sample_id sample['name'] = self.sample.sample_id
sample['row'] = self.row # sample['row'] = self.row
sample['column'] = self.column # sample['column'] = self.column
try: # try:
sample['well'] = f"{row_map[self.row]}{self.column}" # sample['well'] = f"{row_map[self.row]}{self.column}"
except KeyError as e: # except KeyError as e:
logger.error(f"Unable to find row {self.row} in row_map.") # logger.error(f"Unable to find row {self.row} in row_map.")
sample['Well'] = None # sample['Well'] = None
sample['plate_name'] = self.run.rsl_plate_number sample['plate_name'] = self.run.rsl_plate_number
sample['positive'] = False sample['positive'] = False
return sample return sample
@@ -1989,7 +1979,7 @@ class RunSampleAssociation(BaseClass):
PydSample: Pydantic Model PydSample: Pydantic Model
""" """
from backend.validators import PydSample from backend.validators import PydSample
return PydSample(**self.to_sub_dict()) return PydSample(**self.details_dict())
@property @property
def hitpicked(self) -> dict | None: def hitpicked(self) -> dict | None:

View File

@@ -9,7 +9,7 @@ from typing import List
from openpyxl import load_workbook, Workbook from openpyxl import load_workbook, Workbook
from pathlib import Path from pathlib import Path
from backend.db.models import * from backend.db.models import *
from backend.validators import PydSubmission, RSLNamer from backend.validators import PydRun, RSLNamer
from collections import OrderedDict from collections import OrderedDict
from tools import check_not_nan, is_missing, check_key_or_attr from tools import check_not_nan, is_missing, check_key_or_attr
@@ -45,7 +45,7 @@ class SheetParser(object):
self.sub['proceduretype'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath), self.sub['proceduretype'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath),
missing=True) missing=True)
self.submission_type = SubmissionType.query(name=self.sub['proceduretype']) self.submission_type = SubmissionType.query(name=self.sub['proceduretype'])
self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.sub_object = Run.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
# NOTE: grab the info map from the procedure type in database # NOTE: grab the info map from the procedure type in database
self.parse_info() self.parse_info()
self.import_kit_validation_check() self.import_kit_validation_check()
@@ -130,14 +130,14 @@ class SheetParser(object):
if isinstance(self.sub['kittype'], str): if isinstance(self.sub['kittype'], str):
self.sub['kittype'] = dict(value=self.sub['kittype'], missing=True) self.sub['kittype'] = dict(value=self.sub['kittype'], missing=True)
def to_pydantic(self) -> PydSubmission: def to_pydantic(self) -> PydRun:
""" """
Generates a pydantic model of scraped data for validation Generates a pydantic model of scraped data for validation
Returns: Returns:
PydSubmission: output pydantic model PydSubmission: output pydantic model
""" """
return PydSubmission(filepath=self.filepath, run_custom=True, **self.sub) return PydRun(filepath=self.filepath, run_custom=True, **self.sub)
class InfoParser(object): class InfoParser(object):

View File

@@ -9,7 +9,7 @@ from pprint import pformat
from typing import List, Generator, Tuple from typing import List, Generator, Tuple
from openpyxl import load_workbook, Workbook from openpyxl import load_workbook, Workbook
from backend.db.models import SubmissionType, KitType, Run from backend.db.models import SubmissionType, KitType, Run
from backend.validators.pydant import PydSubmission from backend.validators.pydant import PydRun
from io import BytesIO from io import BytesIO
from collections import OrderedDict from collections import OrderedDict
@@ -21,7 +21,7 @@ class SheetWriter(object):
object to manage data placement into excel file object to manage data placement into excel file
""" """
def __init__(self, submission: PydSubmission): def __init__(self, submission: PydRun):
""" """
Args: Args:
submission (PydSubmission): Object containing procedure information. submission (PydSubmission): Object containing procedure information.
@@ -35,7 +35,7 @@ class SheetWriter(object):
case 'proceduretype': case 'proceduretype':
self.sub[k] = v['value'] self.sub[k] = v['value']
self.submission_type = SubmissionType.query(name=v['value']) self.submission_type = SubmissionType.query(name=v['value'])
self.run_object = BasicRun.find_polymorphic_subclass( self.run_object = Run.find_polymorphic_subclass(
polymorphic_identity=self.submission_type) polymorphic_identity=self.submission_type)
case _: case _:
if isinstance(v, dict): if isinstance(v, dict):

View File

@@ -13,8 +13,9 @@ logger = logging.getLogger(f"submissions.{__name__}")
class DefaultManager(object): class DefaultManager(object):
def __init__(self, parent, input_object: Path | str | None = None): def __init__(self, parent, input_object: Path | str | None = None):
logger.debug(f"FName before correction: {input_object}") logger.debug(f"FName before correction: {type(input_object)}")
# if input_object != "no_file": # if input_object != "no_file":
self.parent = parent
match input_object: match input_object:
case str(): case str():
self.input_object = Path(input_object) self.input_object = Path(input_object)
@@ -23,15 +24,19 @@ class DefaultManager(object):
self.input_object = input_object self.input_object = input_object
self.pyd = self.parse() self.pyd = self.parse()
case x if issubclass(input_object.__class__, PydBaseClass): case x if issubclass(input_object.__class__, PydBaseClass):
logger.debug("Subclass of PydBaseClass")
self.pyd = input_object self.pyd = input_object
case x if issubclass(input_object.__class__, BaseClass): case x if issubclass(input_object.__class__, BaseClass):
logger.debug("Subclass of BaseClass")
self.pyd = input_object.to_pydantic() self.pyd = input_object.to_pydantic()
case _: case _:
self.input_object = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent)) self.input_object = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent))
self.pyd = self.parse() self.pyd = self.parse()
logger.debug(f"FName after correction: {input_object}") # logger.debug(f"FName after correction: {input_object}")
from .clientsubmissions import DefaultClientSubmission
from .procedures import DefaultProcedure from .clientsubmissions import DefaultClientSubmissionManager
from.results import DefaultResults from .procedures import DefaultProcedureManager
from .results import DefaultResultsManager
from .runs import DefaultRunManager

View File

@@ -10,13 +10,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class DefaultProcedure(DefaultManager): class DefaultProcedureManager(DefaultManager):
def __init__(self, proceduretype: "ProcedureType"|str, parent, fname: Path | str | None = None): def __init__(self, proceduretype: "ProcedureType"|str, parent, input_object: Path | str | None = None):
super().__init__(parent=parent, fname=fname)
if isinstance(proceduretype, str): if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype) proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype self.proceduretype = proceduretype
super().__init__(parent=parent, input_object=input_object)
def parse(self): def parse(self):

View File

@@ -7,7 +7,7 @@ from tools import get_application_from_parent
logger = logging.getLogger(f"submission.{__name__}") logger = logging.getLogger(f"submission.{__name__}")
class DefaultResults(DefaultManager): class DefaultResultsManager(DefaultManager):
def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None): def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None):
logger.debug(f"FName before correction: {fname}") logger.debug(f"FName before correction: {fname}")
@@ -18,4 +18,4 @@ class DefaultResults(DefaultManager):
self.fname = Path(fname) self.fname = Path(fname)
logger.debug(f"FName after correction: {fname}") logger.debug(f"FName after correction: {fname}")
from .pcr_results_manager import PCR from .pcr_results_manager import PCRManager

View File

@@ -5,11 +5,11 @@ import logging
from pathlib import Path from pathlib import Path
from backend.db.models import Procedure from backend.db.models import Procedure
from backend.excel.parsers.results_parsers.pcr_results_parser import PCRSampleParser, PCRInfoParser from backend.excel.parsers.results_parsers.pcr_results_parser import PCRSampleParser, PCRInfoParser
from . import DefaultResults from . import DefaultResultsManager
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class PCR(DefaultResults): class PCRManager(DefaultResultsManager):
def __init__(self, procedure: Procedure, parent, fname:Path|str|None=None): def __init__(self, procedure: Procedure, parent, fname:Path|str|None=None):
super().__init__(procedure=procedure, parent=parent, fname=fname) super().__init__(procedure=procedure, parent=parent, fname=fname)

View File

@@ -92,10 +92,10 @@ class RSLNamer(object):
self.submission_type = submission_type self.submission_type = submission_type
if not self.submission_type: if not self.submission_type:
self.submission_type = self.retrieve_submission_type(filename=filename) self.submission_type = self.retrieve_submission_type(filename=filename)
logger.info(f"got submission type: {self.submission_type}") # logger.info(f"got submission type: {self.submission_type}")
if self.submission_type: if self.submission_type:
# self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type) # self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = SubmissionType.query(name=submission_type, limit=1) self.sub_object = SubmissionType.query(name=self.submission_type['name'], limit=1)
self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex( self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex(
submission_type=self.submission_type)) submission_type=self.submission_type))
# if not data: # if not data:
@@ -275,5 +275,5 @@ class RSLNamer(object):
return "" return ""
from .pydant import PydSubmission, PydKitType, PydContact, PydOrganization, PydSample, PydReagent, PydReagentRole, \ from .pydant import PydRun, PydKitType, PydContact, PydOrganization, PydSample, PydReagent, PydReagentRole, \
PydEquipment, PydEquipmentRole, PydTips, PydProcess, PydElastic, PydClientSubmission, PydProcedure, PydResults PydEquipment, PydEquipmentRole, PydTips, PydProcess, PydElastic, PydClientSubmission, PydProcedure, PydResults

View File

@@ -22,11 +22,12 @@ from sqlalchemy.orm.relationships import _RelationshipDeclared
from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.attributes import InstrumentedAttribute
from PyQt6.QtWidgets import QWidget from PyQt6.QtWidgets import QWidget
logger = logging.getLogger(f"procedure.{__name__}") logger = logging.getLogger(f"submission.{__name__}")
class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
_sql_object: ClassVar = None _sql_object: ClassVar = None
# _misc_info: dict|None = None # _misc_info: dict|None = None
@model_validator(mode="before") @model_validator(mode="before")
@@ -113,7 +114,8 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
if hasattr(self, "misc_info") and "info_placement" in self.misc_info: if hasattr(self, "misc_info") and "info_placement" in self.misc_info:
for k, v in output.items(): for k, v in output.items():
try: try:
output[k]['location'] = [item['location'] for item in self.misc_info['info_placement'] if item['name'] == k] output[k]['location'] = [item['location'] for item in self.misc_info['info_placement'] if
item['name'] == k]
except (TypeError, KeyError): except (TypeError, KeyError):
continue continue
return output return output
@@ -128,9 +130,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
return sql return sql
class PydReagent(PydBaseClass): class PydReagent(PydBaseClass):
lot: str | None lot: str | None
reagentrole: str | None reagentrole: str | None
@@ -299,9 +298,19 @@ class PydSample(PydBaseClass):
@field_validator("row", mode="before") @field_validator("row", mode="before")
@classmethod @classmethod
def str_to_int(cls, value): def row_str_to_int(cls, value):
if isinstance(value, str): if isinstance(value, str):
try:
value = row_keys[value] value = row_keys[value]
except KeyError:
value = 0
return value
@field_validator("column", mode="before")
@classmethod
def column_str_to_int(cls, value):
if isinstance(value, str):
value = 0
return value return value
def improved_dict(self, dictionaries: bool = True) -> dict: def improved_dict(self, dictionaries: bool = True) -> dict:
@@ -316,7 +325,7 @@ class PydSample(PydBaseClass):
return sql return sql
class PydTips(BaseModel): class PydTips(PydBaseClass):
name: str name: str
lot: str | None = Field(default=None) lot: str | None = Field(default=None)
tiprole: str tiprole: str
@@ -354,7 +363,7 @@ class PydTips(BaseModel):
return assoc, report return assoc, report
class PydEquipment(BaseModel, extra='ignore'): class PydEquipment(PydBaseClass, extra='ignore'):
asset_number: str asset_number: str
name: str name: str
nickname: str | None nickname: str | None
@@ -473,66 +482,74 @@ class PydEquipment(BaseModel, extra='ignore'):
return {k: getattr(self, k) for k in fields} return {k: getattr(self, k) for k in fields}
class PydSubmission(BaseModel, extra='allow'): class PydRun(PydBaseClass, extra='allow'):
filepath: Path
submissiontype: dict | None clientsubmission: PydClientSubmission | None = Field(default=None)
submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
rsl_plate_number: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) rsl_plate_number: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True) started_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True)
clientlab: dict | None completed_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True)
sample_count: dict | None sample_count: dict | None
kittype: dict | None
technician: dict | None
submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True) comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True)
reagent: List[dict] | List[PydReagent] = [] sample: List[PydSample] | Generator = Field(default=[])
sample: List[PydSample] | Generator run_cost: float | dict = Field(default=dict(value=0.0, missing=True))
equipment: List[PydEquipment] | None = [] signed_by: str | dict = Field(default="", validate_default=True)
cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) procedure: List[PydProcedure] | Generator = Field(default=[])
contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
tips: List[PydTips] | None = []
@field_validator("tips", mode="before") @field_validator("signed_by")
@classmethod @classmethod
def expand_tips(cls, value): def rescue_signed_by(cls, value):
if isinstance(value, dict): if isinstance(value, str):
value = value['value'] value = dict(value=value, missing=True)
if isinstance(value, Generator):
return [PydTips(**tips) for tips in value]
if not value:
return []
return value return value
@field_validator('equipment', mode='before') # @field_validator("tips", mode="before")
# @classmethod
# def expand_tips(cls, value):
# if isinstance(value, dict):
# value = value['value']
# if isinstance(value, Generator):
# return [PydTips(**tips) for tips in value]
# if not value:
# return []
# return value
#
# @field_validator('equipment', mode='before')
# @classmethod
# def convert_equipment_dict(cls, value):
# if isinstance(value, dict):
# return value['value']
# if isinstance(value, Generator):
# return [PydEquipment(**equipment) for equipment in value]
# if not value:
# return []
# return value
# @field_validator('comment', mode='before')
# @classmethod
# def create_comment(cls, value):
# if value is None:
# return ""
# return value
#
# @field_validator("submitter_plate_id")
# @classmethod
# def enforce_with_uuid(cls, value):
# if value['value'] in [None, "None"]:
# return dict(value=uuid.uuid4().hex.upper(), missing=True)
# else:
# value['value'] = value['value'].strip()
# return value
@field_validator("run_cost")
@classmethod @classmethod
def convert_equipment_dict(cls, value): def rescue_run_cost(cls, value):
if isinstance(value, dict): if isinstance(value, float):
return value['value'] value = dict(value=value, missing=False)
if isinstance(value, Generator):
return [PydEquipment(**equipment) for equipment in value]
if not value:
return []
return value return value
@field_validator('comment', mode='before') @field_validator("started_date", mode="before")
@classmethod @classmethod
def create_comment(cls, value): def rescue_start_date(cls, value):
if value is None:
return ""
return value
@field_validator("submitter_plate_id")
@classmethod
def enforce_with_uuid(cls, value):
if value['value'] in [None, "None"]:
return dict(value=uuid.uuid4().hex.upper(), missing=True)
else:
value['value'] = value['value'].strip()
return value
@field_validator("submitted_date", mode="before")
@classmethod
def rescue_date(cls, value):
try: try:
check = value['value'] is None check = value['value'] is None
except TypeError: except TypeError:
@@ -541,9 +558,20 @@ class PydSubmission(BaseModel, extra='allow'):
return dict(value=date.today(), missing=True) return dict(value=date.today(), missing=True)
return value return value
@field_validator("submitted_date") @field_validator("completed_date", mode="before")
@classmethod @classmethod
def strip_datetime_string(cls, value): def rescue_completed_date(cls, value):
try:
check = value['value'] is None
except TypeError:
check = True
if check:
return dict(value=date.today(), missing=True)
return value
@field_validator("started_date")
@classmethod
def strip_started_datetime_string(cls, value):
match value['value']: match value['value']:
case date(): case date():
output = datetime.combine(value['value'], datetime.min.time()) output = datetime.combine(value['value'], datetime.min.time())
@@ -567,35 +595,61 @@ class PydSubmission(BaseModel, extra='allow'):
value['value'] = output.replace(tzinfo=timezone) value['value'] = output.replace(tzinfo=timezone)
return value return value
@field_validator("clientlab", mode="before") @field_validator("completed_date")
@classmethod @classmethod
def rescue_submitting_lab(cls, value): def strip_completed_datetime_string(cls, value):
if value is None: match value['value']:
return dict(value=None, missing=True) case date():
output = datetime.combine(value['value'], datetime.min.time())
case datetime():
pass
case int():
output = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2)
case str():
string = re.sub(r"(_|-)\d(R\d)?$", "", value['value'])
try:
output = dict(value=parse(string).date(), missing=True)
except ParserError as e:
logger.error(f"Problem parsing date: {e}")
try:
output = parse(string.replace("-", "")).date()
except Exception as e:
logger.error(f"Problem with parse fallback: {e}")
return value
case _:
raise ValueError(f"Could not get datetime from {value['value']}")
value['value'] = output.replace(tzinfo=timezone)
return value return value
@field_validator("clientlab") # @field_validator("clientlab", mode="before")
@classmethod # @classmethod
def lookup_submitting_lab(cls, value): # def rescue_submitting_lab(cls, value):
if isinstance(value['value'], str): # if value is None:
try: # return dict(value=None, missing=True)
value['value'] = ClientLab.query(name=value['value']).name # return value
except AttributeError: #
value['value'] = None # @field_validator("clientlab")
if value['value'] is None: # @classmethod
value['missing'] = True # def lookup_submitting_lab(cls, value):
if "pytest" in sys.modules: # if isinstance(value['value'], str):
value['value'] = "Nosocomial" # try:
return value # value['value'] = ClientLab.query(name=value['value']).name
from frontend.widgets.pop_ups import ObjectSelector # except AttributeError:
dlg = ObjectSelector(title="Missing Submitting Lab", # value['value'] = None
message="We need a submitting lab. Please select from the list.", # if value['value'] is None:
obj_type=ClientLab) # value['missing'] = True
if dlg.exec(): # if "pytest" in sys.modules:
value['value'] = dlg.parse_form() # value['value'] = "Nosocomial"
else: # return value
value['value'] = None # from frontend.widgets.pop_ups import ObjectSelector
return value # dlg = ObjectSelector(title="Missing Submitting Lab",
# message="We need a submitting lab. Please select from the list.",
# obj_type=ClientLab)
# if dlg.exec():
# value['value'] = dlg.parse_form()
# else:
# value['value'] = None
# return value
@field_validator("rsl_plate_number", mode='before') @field_validator("rsl_plate_number", mode='before')
@classmethod @classmethod
@@ -607,7 +661,7 @@ class PydSubmission(BaseModel, extra='allow'):
@field_validator("rsl_plate_number") @field_validator("rsl_plate_number")
@classmethod @classmethod
def rsl_from_file(cls, value, values): def rsl_from_file(cls, value, values):
sub_type = values.data['proceduretype']['value'] sub_type = values.data['clientsubmission']
if check_not_nan(value['value']): if check_not_nan(value['value']):
value['value'] = value['value'].strip() value['value'] = value['value'].strip()
return value return value
@@ -615,25 +669,28 @@ class PydSubmission(BaseModel, extra='allow'):
if "pytest" in sys.modules and sub_type.replace(" ", "") == "BasicRun": if "pytest" in sys.modules and sub_type.replace(" ", "") == "BasicRun":
output = "RSL-BS-Test001" output = "RSL-BS-Test001"
else: else:
output = RSLNamer(filename=values.data['filepath'].__str__(), submission_type=sub_type, # try:
output = RSLNamer(filename=sub_type.filepath.__str__(), submission_type=sub_type.submissiontype,
data=values.data).parsed_name data=values.data).parsed_name
return dict(value=output, missing=True) return dict(value=output, missing=True)
@field_validator("technician", mode="before") # @field_validator("technician", mode="before")
@classmethod # @classmethod
def rescue_tech(cls, value): # def rescue_tech(cls, value):
if value is None: # if value is None:
return dict(value=None, missing=True) # return dict(value=None, missing=True)
return value # return value
#
@field_validator("technician") # @field_validator("technician")
@classmethod # @classmethod
def enforce_tech(cls, value): # def enforce_tech(cls, value):
if check_not_nan(value['value']): # if check_not_nan(value['value']):
value['value'] = re.sub(r"\: \d", "", value['value']) # value['value'] = re.sub(r"\: \d", "", value['value'])
return value # return value
else: # else:
return dict(value=convert_nans_to_nones(value['value']), missing=True) # return dict(value=convert_nans_to_nones(value['value']), missing=True)
@field_validator("sample_count", mode='before') @field_validator("sample_count", mode='before')
@classmethod @classmethod
@@ -642,54 +699,54 @@ class PydSubmission(BaseModel, extra='allow'):
return dict(value=None, missing=True) return dict(value=None, missing=True)
return value return value
@field_validator("kittype", mode='before') # @field_validator("kittype", mode='before')
@classmethod # @classmethod
def rescue_kit(cls, value): # def rescue_kit(cls, value):
if check_not_nan(value): # if check_not_nan(value):
if isinstance(value, str): # if isinstance(value, str):
return dict(value=value, missing=False) # return dict(value=value, missing=False)
elif isinstance(value, dict): # elif isinstance(value, dict):
return value # return value
else: # else:
raise ValueError(f"No extraction kittype found.") # raise ValueError(f"No extraction kittype found.")
if value is None: # if value is None:
# NOTE: Kit selection is done in the clientsubmissionparser, so should not be necessary here. # # NOTE: Kit selection is done in the clientsubmissionparser, so should not be necessary here.
return dict(value=None, missing=True) # return dict(value=None, missing=True)
return value # return value
#
# @field_validator("submissiontype", mode='before')
# @classmethod
# def make_submission_type(cls, value, values):
# if not isinstance(value, dict):
# value = dict(value=value)
# if check_not_nan(value['value']):
# value = value['value'].title()
# return dict(value=value, missing=False)
# else:
# return dict(value=RSLNamer.retrieve_submission_type(filename=values.data['filepath']).title(), missing=True)
#
# @field_validator("submission_category", mode="before")
# @classmethod
# def create_category(cls, value):
# if not isinstance(value, dict):
# return dict(value=value, missing=True)
# return value
#
# @field_validator("submission_category")
# @classmethod
# def rescue_category(cls, value, values):
# if isinstance(value['value'], str):
# value['value'] = value['value'].title()
# if value['value'] not in ["Research", "Diagnostic", "Surveillance", "Validation"]:
# value['value'] = values.data['proceduretype']['value']
# return value
@field_validator("submissiontype", mode='before') # @field_validator("reagent", mode="before")
@classmethod # @classmethod
def make_submission_type(cls, value, values): # def expand_reagents(cls, value):
if not isinstance(value, dict): # if isinstance(value, Generator):
value = dict(value=value) # return [PydReagent(**reagent) for reagent in value]
if check_not_nan(value['value']): # return value
value = value['value'].title()
return dict(value=value, missing=False)
else:
return dict(value=RSLNamer.retrieve_submission_type(filename=values.data['filepath']).title(), missing=True)
@field_validator("submission_category", mode="before")
@classmethod
def create_category(cls, value):
if not isinstance(value, dict):
return dict(value=value, missing=True)
return value
@field_validator("submission_category")
@classmethod
def rescue_category(cls, value, values):
if isinstance(value['value'], str):
value['value'] = value['value'].title()
if value['value'] not in ["Research", "Diagnostic", "Surveillance", "Validation"]:
value['value'] = values.data['proceduretype']['value']
return value
@field_validator("reagent", mode="before")
@classmethod
def expand_reagents(cls, value):
if isinstance(value, Generator):
return [PydReagent(**reagent) for reagent in value]
return value
@field_validator("sample", mode="before") @field_validator("sample", mode="before")
@classmethod @classmethod
@@ -698,80 +755,82 @@ class PydSubmission(BaseModel, extra='allow'):
return [PydSample(**sample) for sample in value] return [PydSample(**sample) for sample in value]
return value return value
@field_validator("sample") # @field_validator("sample")
@classmethod # @classmethod
def assign_ids(cls, value): # def assign_ids(cls, value):
starting_id = ClientSubmissionSampleAssociation.autoincrement_id() # starting_id = ClientSubmissionSampleAssociation.autoincrement_id()
for iii, sample in enumerate(value, start=starting_id): # for iii, sample in enumerate(value, start=starting_id):
# NOTE: Why is this a list? Answer: to zip with the lists of rows and columns in case of multiple of the same sample. # # NOTE: Why is this a list? Answer: to zip with the lists of rows and columns in case of multiple of the same sample.
sample.assoc_id = [iii] # sample.assoc_id = [iii]
return value # return value
@field_validator("cost_centre", mode="before") # @field_validator("cost_centre", mode="before")
@classmethod # @classmethod
def rescue_cost_centre(cls, value): # def rescue_cost_centre(cls, value):
match value: # match value:
case dict(): # case dict():
return value # return value
case _: # case _:
return dict(value=value, missing=True) # return dict(value=value, missing=True)
#
@field_validator("cost_centre") # @field_validator("cost_centre")
@classmethod # @classmethod
def get_cost_centre(cls, value, values): # def get_cost_centre(cls, value, values):
match value['value']: # match value['value']:
case None: # case None:
from backend.db.models import Organization # from backend.db.models import Organization
org = Organization.query(name=values.data['clientlab']['value']) # org = Organization.query(name=values.data['clientlab']['value'])
try: # try:
return dict(value=org.cost_centre, missing=True) # return dict(value=org.cost_centre, missing=True)
except AttributeError: # except AttributeError:
return dict(value="xxx", missing=True) # return dict(value="xxx", missing=True)
case _: # case _:
return value # return value
#
@field_validator("contact") # @field_validator("contact")
@classmethod # @classmethod
def get_contact_from_org(cls, value, values): # def get_contact_from_org(cls, value, values):
# logger.debug(f"Value coming in: {value}") # # logger.debug(f"Value coming in: {value}")
match value: # match value:
case dict(): # case dict():
if isinstance(value['value'], tuple): # if isinstance(value['value'], tuple):
value['value'] = value['value'][0] # value['value'] = value['value'][0]
case tuple(): # case tuple():
value = dict(value=value[0], missing=False) # value = dict(value=value[0], missing=False)
case _: # case _:
value = dict(value=value, missing=False) # value = dict(value=value, missing=False)
# logger.debug(f"Value after match: {value}") # # logger.debug(f"Value after match: {value}")
check = Contact.query(name=value['value']) # check = Contact.query(name=value['value'])
# logger.debug(f"Check came back with {check}") # # logger.debug(f"Check came back with {check}")
if not isinstance(check, Contact): # if not isinstance(check, Contact):
org = values.data['clientlab']['value'] # org = values.data['clientlab']['value']
# logger.debug(f"Checking organization: {org}") # # logger.debug(f"Checking organization: {org}")
if isinstance(org, str): # if isinstance(org, str):
org = ClientLab.query(name=values.data['clientlab']['value'], limit=1) # org = ClientLab.query(name=values.data['clientlab']['value'], limit=1)
if isinstance(org, ClientLab): # if isinstance(org, ClientLab):
contact = org.contact[0].name # contact = org.contact[0].name
else: # else:
logger.warning(f"All attempts at defaulting Contact failed, returning: {value}") # logger.warning(f"All attempts at defaulting Contact failed, returning: {value}")
return value # return value
if isinstance(contact, tuple): # if isinstance(contact, tuple):
contact = contact[0] # contact = contact[0]
value = dict(value=f"Defaulted to: {contact}", missing=False) # value = dict(value=f"Defaulted to: {contact}", missing=False)
# logger.debug(f"Value after query: {value}") # # logger.debug(f"Value after query: {value}")
return value # return value
else: # else:
# logger.debug(f"Value after bypass check: {value}") # # logger.debug(f"Value after bypass check: {value}")
return value # return value
def __init__(self, run_custom: bool = False, **data): def __init__(self, run_custom: bool = False, **data):
super().__init__(**data) super().__init__(**data)
# NOTE: this could also be done with default_factory # NOTE: this could also be done with default_factory
self.submission_object = Run.find_polymorphic_subclass( # self.submission_object = Run.find_polymorphic_subclass(
polymorphic_identity=self.submission_type['value']) # polymorphic_identity=self.submission_type['value'])
self.namer = RSLNamer(self.rsl_plate_number['value'], submission_type=self.submission_type['value']) submission_type = self.clientsubmission.submissiontype
if run_custom: # logger.debug(submission_type)
self.submission_object.custom_validation(pyd=self) self.namer = RSLNamer(self.rsl_plate_number['value'], submission_type=submission_type)
# if run_custom:
# self.submission_object.custom_validation(pyd=self)
def set_attribute(self, key: str, value): def set_attribute(self, key: str, value):
""" """
@@ -1020,7 +1079,7 @@ class PydSubmission(BaseModel, extra='allow'):
Returns: Returns:
str: Output filename str: Output filename
""" """
template = self.submission_object.filename_template() template = self.clientsubmission.filename_template
render = self.namer.construct_export_name(template=template, **self.improved_dict(dictionaries=False)).replace( render = self.namer.construct_export_name(template=template, **self.improved_dict(dictionaries=False)).replace(
"/", "") "/", "")
return render return render
@@ -1537,7 +1596,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
self.kittype['value'] = kittype self.kittype['value'] = kittype
self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
def reorder_reagents(self, reagentrole: str, options:list): def reorder_reagents(self, reagentrole: str, options: list):
reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None) reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None)
if not reagent_used: if not reagent_used:
return options return options
@@ -1547,8 +1606,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
options.insert(0, options.pop(options.index(roi))) options.insert(0, options.pop(options.index(roi)))
return options return options
def update_kittype_equipmentroles(self, kittype: str | KitType): def update_kittype_equipmentroles(self, kittype: str | KitType):
if kittype == self.__class__.model_fields['kittype'].default['value']: if kittype == self.__class__.model_fields['kittype'].default['value']:
return return
@@ -1808,6 +1865,23 @@ class PydClientSubmission(PydBaseClass):
sql._misc_info['info_placement'] = info_placement sql._misc_info['info_placement'] = info_placement
return sql return sql
def pad_samples_to_length(self, row_count, column_names):
output_samples = []
for iii in range(1, row_count + 1):
try:
sample = next((item for item in self.model_extra['samples'] if item.submission_rank == iii))
except StopIteration:
sample = PydSample(sample_id="")
for column in column_names:
setattr(sample, column[0], "")
sample.submission_rank = iii
output_samples.append(sample)
return sorted(output_samples, key=lambda x: x.submission_rank)
@property
def filename_template(self):
submissiontype = SubmissionType.query(name=self.submissiontype['value'])
return submissiontype.defaults['filename_template']
class PydResults(PydBaseClass, arbitrary_types_allowed=True): class PydResults(PydBaseClass, arbitrary_types_allowed=True):

View File

@@ -30,7 +30,7 @@ from .turnaround import TurnaroundTime
from .concentrations import Concentrations from .concentrations import Concentrations
from .omni_search import SearchBox from .omni_search import SearchBox
logger = logging.getLogger(f'procedure.{__name__}') logger = logging.getLogger(f'submissions.{__name__}')
class App(QMainWindow): class App(QMainWindow):

View File

@@ -18,7 +18,7 @@ from pprint import pformat
from typing import List from typing import List
logger = logging.getLogger(f"procedure.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class SubmissionDetails(QDialog): class SubmissionDetails(QDialog):

View File

@@ -16,7 +16,7 @@ from backend.db.models import Run, ClientSubmission, Procedure
from tools import Report, Result, report_result, get_application_from_parent from tools import Report, Result, report_result, get_application_from_parent
from .functions import select_open_file from .functions import select_open_file
logger = logging.getLogger(f"procedure.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class pandasModel(QAbstractTableModel): class pandasModel(QAbstractTableModel):

View File

@@ -7,13 +7,13 @@ from PyQt6.QtWidgets import (
) )
from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker
from backend.managers import DefaultClientSubmission
from .functions import select_open_file, select_save_file from .functions import select_open_file, select_save_file
import logging import logging
from pathlib import Path from pathlib import Path
from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser
from backend.validators import PydSubmission, PydReagent, PydClientSubmission, PydSample from backend.validators import PydRun, PydReagent, PydClientSubmission, PydSample
from backend.db import ( from backend.db import (
ClientLab, SubmissionType, Reagent, ClientLab, SubmissionType, Reagent,
ReagentRole, KitTypeReagentRoleAssociation, Run ReagentRole, KitTypeReagentRoleAssociation, Run
@@ -103,6 +103,7 @@ class SubmissionFormContainer(QWidget):
Returns: Returns:
Report: Object to give results of import. Report: Object to give results of import.
""" """
from backend.managers import DefaultClientSubmissionManager
self.app.raise_() self.app.raise_()
self.app.activateWindow() self.app.activateWindow()
logger.info(f"\n\nStarting Import...\n\n") logger.info(f"\n\nStarting Import...\n\n")
@@ -141,7 +142,7 @@ class SubmissionFormContainer(QWidget):
# self.pydclientsubmission = self.clientsubmissionparser.to_pydantic() # self.pydclientsubmission = self.clientsubmissionparser.to_pydantic()
# self.pydsamples = self.sampleparser.to_pydantic() # self.pydsamples = self.sampleparser.to_pydantic()
# logger.debug(f"Samples: {pformat(self.pydclientsubmission.sample)}") # logger.debug(f"Samples: {pformat(self.pydclientsubmission.sample)}")
self.clientsubmission_manager = DefaultClientSubmission(parent=self, fname=fname) self.clientsubmission_manager = DefaultClientSubmissionManager(parent=self, fname=fname)
self.pydclientsubmission = self.clientsubmission_manager.parse() self.pydclientsubmission = self.clientsubmission_manager.parse()
checker = SampleChecker(self, "Sample Checker", self.pydclientsubmission.samples) checker = SampleChecker(self, "Sample Checker", self.pydclientsubmission.samples)
if checker.exec(): if checker.exec():
@@ -188,7 +189,7 @@ class SubmissionFormContainer(QWidget):
class SubmissionFormWidget(QWidget): class SubmissionFormWidget(QWidget):
update_reagent_fields = ['kittype'] update_reagent_fields = ['kittype']
def __init__(self, parent: QWidget, pyd: PydSubmission, disable: list | None = None) -> None: def __init__(self, parent: QWidget, pyd: PydRun, disable: list | None = None) -> None:
super().__init__(parent) super().__init__(parent)
if disable is None: if disable is None:
disable = [] disable = []

View File

@@ -503,7 +503,8 @@ def setup_lookup(func):
try: try:
sanitized_kwargs[k] = v['value'] sanitized_kwargs[k] = v['value']
except KeyError: except KeyError:
raise ValueError("Could not sanitize dictionary in query. Make sure you parse it first.")
raise ValueError(f"Could not sanitize dictionary {v} in query. Make sure you parse it first.")
elif v is not None: elif v is not None:
sanitized_kwargs[k] = v sanitized_kwargs[k] = v
return func(*args, **sanitized_kwargs) return func(*args, **sanitized_kwargs)