Code cleanup for validators complete.

This commit is contained in:
lwark
2025-09-04 15:21:50 -05:00
parent c8b4762747
commit 610859d84f
3 changed files with 121 additions and 327 deletions

View File

@@ -1,6 +1,6 @@
'''
"""
Contains database, validators and excel operations.
'''
"""
from .db import *
from .excel import *
from .validators import *

View File

@@ -48,8 +48,6 @@ class ClientSubmissionNamer(DefaultNamer):
if not sub_type:
logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.")
sub_type = SubmissionType.query(name="Default")
logger.debug(f"Submission Type: {sub_type}")
# sys.exit()
return sub_type
def get_subtype_from_regex(self) -> SubmissionType:
@@ -84,9 +82,6 @@ class ClientSubmissionNamer(DefaultNamer):
return sub_type
class RSLNamer(object):
"""
Object that will enforce proper formatting on RSL plate names.
@@ -98,17 +93,10 @@ class RSLNamer(object):
self.submission_type = submission_type
if not self.submission_type:
self.submission_type = self.retrieve_submission_type(filename=filename)
# logger.info(f"got submission type: {self.submission_type}")
if self.submission_type:
# self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = SubmissionType.query(name=self.submission_type['name'], limit=1)
self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex(
submission_type=self.submission_type))
# if not data:
# data = dict(submission_type=self.submission_type)
# if "proceduretype" not in data.keys():
# data['proceduretype'] = self.submission_type
# self.parsed_name = self.sub_object.enforce_name(instr=self.parsed_name, data=data)
logger.info(f"Parsed name: {self.parsed_name}")
@classmethod
@@ -227,7 +215,6 @@ class RSLNamer(object):
Returns:
str: Output filename
"""
logger.debug(data)
if "submitted_date" in data.keys():
if isinstance(data['submitted_date'], dict):
if data['submitted_date']['value'] is not None:
@@ -244,13 +231,8 @@ class RSLNamer(object):
today = datetime.now()
if isinstance(today, str):
today = datetime.strptime(today, "%Y-%m-%d")
# if "name" in data.keys():
# logger.debug(f"Found name: {data['name']}")
# plate_number = data['name'].split("-")[-1][0]
# else:
previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype'])
plate_number = len(previous) + 1
logger.debug(f"Using plate number: {plate_number}")
return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}"
@classmethod
@@ -283,5 +265,5 @@ class RSLNamer(object):
return ""
from .pydant import PydRun, PydKitType, PydContact, PydOrganization, PydSample, PydReagent, PydReagentRole, \
from .pydant import PydRun, PydKitType, PydContact, PydClientLab, PydSample, PydReagent, PydReagentRole, \
PydEquipment, PydEquipmentRole, PydTips, PydProcess, PydElastic, PydClientSubmission, PydProcedure, PydResults

View File

@@ -11,7 +11,7 @@ from typing import List, Tuple, Literal
from types import GeneratorType
from . import RSLNamer
from pathlib import Path
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list, row_keys
from backend.db import models
from backend.db.models import *
from sqlalchemy.exc import StatementError
@@ -35,7 +35,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
@model_validator(mode="before")
@classmethod
def prevalidate(cls, data):
# logger.debug(f"Initial data for {cls.__name__}: {pformat(data)}")
sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)]
output = {}
try:
@@ -54,9 +53,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
@model_validator(mode='after')
@classmethod
def validate_model(cls, data):
# _sql_object = getattr(models, cls.__name__.replace("Pyd", ""))
# total_dict = data.model_fields.update(data.model_extra)
for key, value in data.model_extra.items():
if key in cls._sql_object.timestamps:
if isinstance(value, str):
@@ -70,11 +66,11 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
data.__setattr__(key, value)
return data
def __init__(self, **data):
# NOTE: Grab the sql model for validation purposes.
# self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", ""))
super().__init__(**data)
# def __init__(self, **data):
# # NOTE: Grab the sql model for validation purposes.
# # self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", ""))
#
# super().__init__(**data)
def filter_field(self, key: str) -> Any:
"""
@@ -121,8 +117,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
def to_sql(self):
dicto = self.improved_dict(dictionaries=False)
logger.debug(f"Dicto: {dicto}")
# sql, new = self._sql_object().query_or_create(**dicto)
sql, new = self._sql_object.query_or_create(**dicto)
if new:
logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}")
@@ -226,11 +220,6 @@ class PydReagent(PydBaseClass):
else:
return values.data['reagentrole'].strip()
# @field_validator("reagentrole", mode="before")
# @classmethod
# def rescue_reagentrole(cls, value):
# if
def improved_dict(self) -> dict:
"""
Constructs a dictionary consisting of model.fields and model.extras
@@ -261,31 +250,11 @@ class PydReagent(PydBaseClass):
reagentrole = ReagentRole.query(name=self.reagentrole)
reagent.reagentrole = reagentrole
reagent.expiry = self.expiry
# logger.debug(f"Reagent: {reagent}")
# if reagent is None:
# reagent = Reagent()
# for key, value in self.__dict__.items():
# if isinstance(value, dict):
# if key == "misc_info":
# value = value
# else:
# value = value['value']
# # NOTE: reagent method sets fields based on keys in dictionary
# reagent.set_attribute(key, value)
# if procedure is not None and reagent not in procedure.reagents:
# assoc = ProcedureReagentAssociation(reagent=reagent, procedure=procedure)
# assoc.comments = self.comment
# else:
# assoc = None
# else:
# if submission is not None and reagent not in submission.reagents:
# submission.update_reagentassoc(reagent=reagent, role=self.role)
return reagent, report
class PydSample(PydBaseClass):
sample_id: str
# sampletype: str | None = Field(default=None)
submission_rank: int | List[int] | None = Field(default=0, validate_default=True)
enabled: bool = Field(default=True)
row: int = Field(default=0)
@@ -328,7 +297,6 @@ class PydSample(PydBaseClass):
def improved_dict(self, dictionaries: bool = True) -> dict:
output = super().improved_dict(dictionaries=dictionaries)
output['name'] = self.sample_id
# del output['sampletype']
return output
def to_sql(self):
@@ -340,21 +308,6 @@ class PydSample(PydBaseClass):
class PydTips(PydBaseClass):
name: str
lot: str | None = Field(default=None)
# tips: str
# @field_validator('tips', mode='before')
# @classmethod
# def get_role_name(cls, value):
# if isinstance(value, list):
# output = []
# for tips in value:
# if isinstance(tips, Tips):
# tips = tips.name
# output.append(tips)
# value = output
# if isinstance(value, Tips):
# value = value.name
# return value
@report_result
def to_sql(self) -> Tuple[Tips, Report]:
@@ -369,9 +322,6 @@ class PydTips(PydBaseClass):
"""
report = Report()
tips = TipsLot.query(name=self.name, limit=1)
# logger.debug(f"Tips query has yielded: {tips}")
# assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1)
# logger.debug(f"Got association: {assoc}")
return tips, report
@@ -379,7 +329,6 @@ class PydEquipment(PydBaseClass):
asset_number: str
name: str
nickname: str | None
# process: List[dict] | None
process: List[PydProcess] | PydProcess | None
equipmentrole: str | PydEquipmentRole | None
tips: List[PydTips] | PydTips | None = Field(default=[])
@@ -401,8 +350,6 @@ class PydEquipment(PydBaseClass):
@field_validator('process', mode='before')
@classmethod
def process_to_pydantic(cls, value, values):
# if isinstance(value, dict):
# value = value['processes']
if isinstance(value, GeneratorType):
value = [item for item in value]
value = convert_nans_to_nones(value)
@@ -412,13 +359,9 @@ class PydEquipment(PydBaseClass):
value = value.to_pydantic(pyd_model_name="PydProcess")
else:
try:
# value = [item.strip() for item in value]
d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None)
print(f"Next process: {d.details_dict()}")
if d:
# value = PydProcess(**d.details_dict())
value = d.to_pydantic()
# value = next((process.to_pydantic() for process in value))
except AttributeError as e:
logger.error(f"Process Validation error due to {e}")
pass
@@ -427,8 +370,6 @@ class PydEquipment(PydBaseClass):
@field_validator('tips', mode='before')
@classmethod
def tips_to_pydantic(cls, value, values):
# if isinstance(value, dict):
# value = value['processes']
if isinstance(value, GeneratorType):
value = [item for item in value]
value = convert_nans_to_nones(value)
@@ -438,44 +379,16 @@ class PydEquipment(PydBaseClass):
value = value.to_pydantic(pyd_model_name="PydTips")
else:
try:
# value = [item.strip() for item in value]
d: Tips = next(
(tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]),
None)
print(f"Next process: {d.details_dict()}")
if d:
# value = PydProcess(**d.details_dict())
value = d.to_pydantic()
# value = next((process.to_pydantic() for process in value))
except AttributeError as e:
logger.error(f"Process Validation error due to {e}")
pass
return value
# @field_validator('tips', mode='before')
# @classmethod
# def tips_to_pydantic(cls, value):
# match value:
# case list():
# output = []
# for tips in value:
# match tips:
# case Tips():
# tips = tips.to_pydantic()
# case dict():
# tips = PydTips(**tips)
# case _:
# continue
# output.append(tips)
# case _:
# output = value
# return output
#
# @field_validator('tips')
# @classmethod
# def single_out_tips(cls, value, values):
# return value
@report_result
def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[
Equipment, ProcedureEquipmentAssociation]:
@@ -493,7 +406,6 @@ class PydEquipment(PydBaseClass):
procedure = Procedure.query(name=procedure)
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
logger.debug(f"Querying equipment: {self.asset_number}")
equipment = Equipment.query(asset_number=self.asset_number)
if equipment is None:
logger.error("No equipment found. Returning None.")
@@ -507,7 +419,6 @@ class PydEquipment(PydBaseClass):
logger.error(f"Couldn't get association due to {e}, returning...")
return None, None
if new:
# assoc = ProcedureEquipmentAssociation(procedure=procedure, equipment=equipment)
# TODO: This seems precarious. What if there is more than one process?
# NOTE: It looks like the way fetching the process is done in the SQL model, this shouldn't be a problem, but I'll include a failsafe.
# NOTE: I need to find a way to filter this by the kittype involved.
@@ -517,7 +428,6 @@ class PydEquipment(PydBaseClass):
process = Process.query(name=self.processes[0], limit=1)
if process is None:
logger.error(f"Found unknown process: {process}.")
logger.debug(f"Using process: {process}")
assoc.process = process
assoc.equipmentrole = self.equipmentrole
else:
@@ -662,7 +572,6 @@ class PydRun(PydBaseClass): #, extra='allow'):
if "pytest" in sys.modules and sub_type.replace(" ", "") == "BasicRun":
output = "RSL-BS-Test001"
else:
# try:
output = RSLNamer(filename=sub_type.filepath.__str__(), submission_type=sub_type.submissiontype,
data=values.data).parsed_name
return dict(value=output, missing=True)
@@ -685,10 +594,7 @@ class PydRun(PydBaseClass): #, extra='allow'):
super().__init__(**data)
# NOTE: this could also be done with default_factory
submission_type = self.clientsubmission.submissiontype
# logger.debug(submission_type)
self.namer = RSLNamer(self.rsl_plate_number['value'], submission_type=submission_type)
# if run_custom:
# self.submission_object.custom_validation(pyd=self)
def set_attribute(self, key: str, value):
"""
@@ -790,18 +696,14 @@ class PydRun(PydBaseClass): #, extra='allow'):
"""
report = Report()
dicto = self.improved_dict()
# logger.debug(f"Pydantic procedure type: {self.proceduretype['value']}")
# logger.debug(f"Pydantic improved_dict: {pformat(dicto)}")
instance, result = Run.query_or_create(submissiontype=self.submission_type['value'],
rsl_plate_number=self.rsl_plate_number['value'])
# logger.debug(f"Created or queried instance: {instance}")
if instance is None:
report.add_result(Result(msg="Overwrite Cancelled."))
return None, report
report.add_result(result)
self.handle_duplicate_samples()
for key, value in dicto.items():
# logger.debug(f"Checking key {key}, value {value}")
if isinstance(value, dict):
try:
value = value['value']
@@ -857,7 +759,6 @@ class PydRun(PydBaseClass): #, extra='allow'):
value = value
instance.set_attribute(key=key, value=value)
case item if item in instance.jsons:
# logger.debug(f"Validating json value: {item} to value:{pformat(value)}")
try:
ii = value.items()
except AttributeError:
@@ -867,7 +768,6 @@ class PydRun(PydBaseClass): #, extra='allow'):
value[k] = v.strftime("%Y-%m-%d %H:%M:%S")
else:
pass
# logger.debug(f"Setting json value: {item} to value:{pformat(value)}")
instance.set_attribute(key=key, value=value)
case _:
try:
@@ -914,22 +814,8 @@ class PydRun(PydBaseClass): #, extra='allow'):
SubmissionFormWidget: Submission form widget
"""
from frontend.widgets.submission_widget import SubmissionFormWidget
try:
logger.debug(f"PCR info: {self.pcr_info}")
except AttributeError:
pass
return SubmissionFormWidget(parent=parent, pyd=self, disable=disable)
# def to_writer(self) -> "SheetWriter":
# """
# Sends data here to the sheet writer.
#
# Returns:
# SheetWriter: Sheetwriter object that will perform writing.
# """
# from backend.excel.writer import SheetWriter
# return SheetWriter(self)
def construct_filename(self) -> str:
"""
Creates filename for this instance
@@ -942,45 +828,43 @@ class PydRun(PydBaseClass): #, extra='allow'):
"/", "")
return render
def check_kit_integrity(self, extraction_kit: str | dict | None = None, exempt: List[PydReagent] = []) -> Tuple[
List[PydReagent], Report, List[PydReagent]]:
"""
Ensures all reagents expected in kittype are listed in Submission
Args:
extraction_kit (str | dict | None, optional): kittype to be checked. Defaults to None.
exempt (List[PydReagent], optional): List of reagents that don't need to be checked. Defaults to []
Returns:
Tuple[List[PydReagent], Report]: List of reagents and Result object containing a message and any missing components.
"""
report = Report()
# logger.debug(f"The following reagents are exempt from the kittype integrity check:\n{exempt}")
if isinstance(extraction_kit, str):
extraction_kit = dict(value=extraction_kit)
if extraction_kit is not None and extraction_kit != self.extraction_kit['value']:
self.extraction_kit['value'] = extraction_kit['value']
ext_kit = KitType.query(name=self.extraction_kit['value'])
ext_kit_rtypes = [item.to_pydantic() for item in
ext_kit.get_reagents(required_only=True, proceduretype=self.submission_type['value'])]
# NOTE: Exclude any reagenttype found in this pydclientsubmission not expected in kittype.
expected_check = [item.equipmentrole for item in ext_kit_rtypes]
output_reagents = [rt for rt in self.reagents if rt.role in expected_check]
missing_check = [item.role for item in output_reagents]
missing_reagents = [rt for rt in ext_kit_rtypes if
rt.equipmentrole not in missing_check and rt.equipmentrole not in exempt]
# logger.debug(f"Missing reagents: {missing_reagents}")
missing_reagents += [rt for rt in output_reagents if rt.missing]
output_reagents += [rt for rt in missing_reagents if rt not in output_reagents]
# NOTE: if lists are equal return no problem
if len(missing_reagents) == 0:
result = None
else:
result = Result(
msg=f"The excel sheet you are importing is missing some reagents expected by the kittype.\n\nIt looks like you are missing: {[item.equipmentrole.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kittype.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!",
status="Warning")
report.add_result(result)
return output_reagents, report, missing_reagents
# def check_kit_integrity(self, extraction_kit: str | dict | None = None, exempt: List[PydReagent] = []) -> Tuple[
# List[PydReagent], Report, List[PydReagent]]:
# """
# Ensures all reagents expected in kittype are listed in Submission
#
# Args:
# extraction_kit (str | dict | None, optional): kittype to be checked. Defaults to None.
# exempt (List[PydReagent], optional): List of reagents that don't need to be checked. Defaults to []
#
# Returns:
# Tuple[List[PydReagent], Report]: List of reagents and Result object containing a message and any missing components.
# """
# report = Report()
# if isinstance(extraction_kit, str):
# extraction_kit = dict(value=extraction_kit)
# if extraction_kit is not None and extraction_kit != self.extraction_kit['value']:
# self.extraction_kit['value'] = extraction_kit['value']
# ext_kit = KitType.query(name=self.extraction_kit['value'])
# ext_kit_rtypes = [item.to_pydantic() for item in
# ext_kit.get_reagents(required_only=True, proceduretype=self.submission_type['value'])]
# # NOTE: Exclude any reagenttype found in this pydclientsubmission not expected in kittype.
# expected_check = [item.equipmentrole for item in ext_kit_rtypes]
# output_reagents = [rt for rt in self.reagents if rt.role in expected_check]
# missing_check = [item.role for item in output_reagents]
# missing_reagents = [rt for rt in ext_kit_rtypes if
# rt.equipmentrole not in missing_check and rt.equipmentrole not in exempt]
# missing_reagents += [rt for rt in output_reagents if rt.missing]
# output_reagents += [rt for rt in missing_reagents if rt not in output_reagents]
# # NOTE: if lists are equal return no problem
# if len(missing_reagents) == 0:
# result = None
# else:
# result = Result(
# msg=f"The excel sheet you are importing is missing some reagents expected by the kittype.\n\nIt looks like you are missing: {[item.equipmentrole.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kittype.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!",
# status="Warning")
# report.add_result(result)
# return output_reagents, report, missing_reagents
def check_reagent_expiries(self, exempt: List[PydReagent] = []):
report = Report()
@@ -1039,9 +923,7 @@ class PydContact(BaseModel):
area_regex = re.compile(r"^\(?(\d{3})\)?(-| )?")
if len(value) > 8:
match = area_regex.match(value)
# logger.debug(f"Match: {match.group(1)}")
value = area_regex.sub(f"({match.group(1).strip()}) ", value)
# logger.debug(f"Output phone: {value}")
return value
@report_result
@@ -1065,7 +947,7 @@ class PydContact(BaseModel):
value = getattr(self, field)
match field:
case "organization":
value = [Organization.query(name=value)]
value = [ClientLab.query(name=value)]
case _:
pass
try:
@@ -1075,7 +957,7 @@ class PydContact(BaseModel):
return instance, report
class PydOrganization(BaseModel):
class PydClientLab(BaseModel):
name: str
cost_centre: str
contact: List[PydContact] | None
@@ -1109,7 +991,6 @@ class PydOrganization(BaseModel):
value = [item.to_sql() for item in value if item]
case _:
value = getattr(self, field)
logger.debug(f"Setting {field} to {value}")
if value:
setattr(instance, field, value)
return instance, report
@@ -1128,50 +1009,50 @@ class PydReagentRole(BaseModel):
return timedelta(days=value)
return value
@report_result
def to_sql(self, kit: KitType) -> ReagentRole:
"""
Converts this instance into a backend.db.models.ReagentType instance
Args:
kit (KitType): KitType joined to the reagentrole
Returns:
ReagentRole: ReagentType instance
"""
report = Report()
instance: ReagentRole = ReagentRole.query(name=self.name)
if instance is None:
instance = ReagentRole(name=self.name, eol_ext=self.eol_ext)
try:
assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit)
except StatementError:
assoc = None
if assoc is None:
assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses,
required=self.required)
return instance, report
# @report_result
# def to_sql(self, kit: KitType) -> ReagentRole:
# """
# Converts this instance into a backend.db.models.ReagentType instance
#
# Args:
# kit (KitType): KitType joined to the reagentrole
#
# Returns:
# ReagentRole: ReagentType instance
# """
# report = Report()
# instance: ReagentRole = ReagentRole.query(name=self.name)
# if instance is None:
# instance = ReagentRole(name=self.name, eol_ext=self.eol_ext)
# try:
# assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit)
# except StatementError:
# assoc = None
# if assoc is None:
# assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses,
# required=self.required)
# return instance, report
class PydKitType(BaseModel):
name: str
reagent_roles: List[PydReagent] = []
@report_result
def to_sql(self) -> Tuple[KitType, Report]:
"""
Converts this instance into a backend.db.models.kits.KitType instance
Returns:
Tuple[KitType, Report]: KitType instance and report of results.
"""
report = Report()
instance = KitType.query(name=self.name)
if instance is None:
instance = KitType(name=self.name)
for role in self.reagent_roles:
role.to_sql(instance)
return instance, report
# class PydKitType(BaseModel):
# name: str
# reagent_roles: List[PydReagent] = []
#
# @report_result
# def to_sql(self) -> Tuple[KitType, Report]:
# """
# Converts this instance into a backend.db.models.kits.KitType instance
#
# Returns:
# Tuple[KitType, Report]: KitType instance and report of results.
# """
# report = Report()
# instance = KitType.query(name=self.name)
# if instance is None:
# instance = KitType(name=self.name)
# for role in self.reagent_roles:
# role.to_sql(instance)
# return instance, report
class PydEquipmentRole(BaseModel):
@@ -1210,7 +1091,6 @@ class PydProcess(PydBaseClass, extra="allow"):
@field_validator("tips", mode="before")
@classmethod
def enforce_list(cls, value):
# logger.debug(f"Validating field: {value}")
if not isinstance(value, list):
value = [value]
output = []
@@ -1240,10 +1120,8 @@ class PydProcess(PydBaseClass, extra="allow"):
def to_sql(self):
report = Report()
name = self.name.split("-")[0]
logger.debug(f"Query process: {self.name}, version = {self.version}")
# NOTE: can't use query_or_create due to name not being part of ProcessVersion
instance = ProcessVersion.query(name=name, version=self.version, limit=1)
logger.debug(f"Got instance: {instance}")
if not instance:
instance = ProcessVersion()
return instance, report
@@ -1255,7 +1133,6 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
@report_result
def to_sql(self):
# print(self.instance)
fields = [item for item in self.model_extra]
for field in fields:
try:
@@ -1265,11 +1142,8 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
continue
match field_type:
case _RelationshipDeclared():
# logger.debug(f"{field} is a relationship with {field_type.entity.class_}")
field_value = field_type.entity.class_.argument.query(name=getattr(self, field))
# logger.debug(f"{field} query result: {field_value}")
case ColumnProperty():
# logger.debug(f"{field} is a property.")
field_value = getattr(self, field)
self.instance.__setattr__(field, field_value)
return self.instance
@@ -1343,26 +1217,11 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
@field_validator("reagentrole")
@classmethod
def rescue_reagentrole(cls, value, values):
# if not value:
# match values.data['kittype']:
# case dict():
# if "value" in values.data['kittype'].keys():
# roi = values.data['kittype']['value']
# elif "name" in values.data['kittype'].keys():
# roi = values.data['kittype']['name']
# else:
# raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}")
# case str():
# roi = values.data['kittype']
# if roi != cls.model_fields['kittype'].default['value']:
# kittype = KitType.query(name=roi)
# value = {item.name: item.reagent for item in kittype.reagentrole}
if not value:
value = {}
for reagentrole in values.data['proceduretype'].reagentrole:
reagents = [reagent.lot_dicts for reagent in reagentrole.reagent]
value[reagentrole.name] = flatten_list(reagents)
# value = {item.name: item.reagent for item in values.data['proceduretype'].reagentrole}
return value
@field_validator("run")
@@ -1400,25 +1259,25 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
except TypeError:
return len(self.sample)
def update_kittype_reagentroles(self, kittype: str | KitType):
if kittype == self.__class__.model_fields['kittype'].default['value']:
return
if isinstance(kittype, str):
kittype_obj = KitType.query(name=kittype)
try:
self.reagentrole = {
item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")]
for item in
kittype_obj.get_reagents(proceduretype=self.proceduretype)}
except AttributeError:
self.reagentrole = {}
reordered_options = {}
if self.reagentrole:
for k, v in self.reagentrole.items():
reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v)
self.reagentrole = reordered_options
self.kittype['value'] = kittype
self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
# def update_kittype_reagentroles(self, kittype: str | KitType):
# if kittype == self.__class__.model_fields['kittype'].default['value']:
# return
# if isinstance(kittype, str):
# kittype_obj = KitType.query(name=kittype)
# try:
# self.reagentrole = {
# item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")]
# for item in
# kittype_obj.get_reagents(proceduretype=self.proceduretype)}
# except AttributeError:
# self.reagentrole = {}
# reordered_options = {}
# if self.reagentrole:
# for k, v in self.reagentrole.items():
# reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v)
# self.reagentrole = reordered_options
# self.kittype['value'] = kittype
# self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
def reorder_reagents(self, reagentrole: str, options: list):
reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None)
@@ -1430,26 +1289,24 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
options.insert(0, options.pop(options.index(roi)))
return options
def update_kittype_equipmentroles(self, kittype: str | KitType):
if kittype == self.__class__.model_fields['kittype'].default['value']:
return
if isinstance(kittype, str):
kittype_obj = KitType.query(name=kittype)
try:
self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in
kittype_obj.get_reagents(proceduretype=self.proceduretype)}
except AttributeError:
self.reagentrole = {}
self.kittype['value'] = kittype
self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
# def update_kittype_equipmentroles(self, kittype: str | KitType):
# if kittype == self.__class__.model_fields['kittype'].default['value']:
# return
# if isinstance(kittype, str):
# kittype_obj = KitType.query(name=kittype)
# try:
# self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in
# kittype_obj.get_reagents(proceduretype=self.proceduretype)}
# except AttributeError:
# self.reagentrole = {}
# self.kittype['value'] = kittype
# self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
def update_samples(self, sample_list: List[dict]):
# logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}")
for iii, sample_dict in enumerate(sample_list, start=1):
if sample_dict['sample_id'].startswith("blank_"):
sample_dict['sample_id'] = ""
row, column = self.proceduretype.ranked_plate[sample_dict['index']]
# logger.debug(f"Row: {row}, Column: {column}")
try:
sample = next(
(item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper()))
@@ -1468,8 +1325,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sample.row = row
sample.column = column
sample.procedure_rank = sample_dict['index']
# logger.debug(f"Sample of interest: {sample.improved_dict()}")
# logger.debug(f"Updated samples:\n{pformat(self.sample)}")
def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str):
try:
@@ -1484,7 +1339,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
idx = 0
insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
self.reagent.insert(idx, insertable)
# logger.debug(self.reagent)
@classmethod
def update_new_reagents(cls, reagent: PydReagent):
@@ -1505,7 +1359,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql.technician = self.technician['value']
else:
sql.technician = self.technician
# sql.repeat = int(self.repeat)
if sql.repeat:
regex = re.compile(r".*\dR\d$")
repeats = [item for item in self.run.procedure if
@@ -1529,14 +1382,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole)
else:
removable = []
logger.debug(f"Removable: {removable}")
if removable:
if isinstance(removable, list):
for r in removable:
r.delete()
else:
removable.delete()
logger.debug(f"Adding {reagent} to {sql}")
reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole)
try:
start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1
@@ -1603,13 +1454,6 @@ class PydClientSubmission(PydBaseClass):
submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
sample: List[PydSample] | None = Field(default=[])
# @field_validator("submissiontype", mode="before")
# @classmethod
# def enforce_submissiontype(cls, value):
# if isinstance(value, str):
# value = dict(value=value, missing=False)
# return value
@field_validator("submissiontype", "clientlab", "contact", mode="before")
@classmethod
def enforce_value(cls, value):
@@ -1728,7 +1572,6 @@ class PydClientSubmission(PydBaseClass):
"""
from frontend.widgets.submission_widget import ClientSubmissionFormWidget
if not samples:
# samples = [sample for sample in self.sample if sample.sample_id.lower() not in ["", "blank"]]
samples = self.sample
return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable)
@@ -1736,11 +1579,6 @@ class PydClientSubmission(PydBaseClass):
sql = super().to_sql()
assert not any([isinstance(item, PydSample) for item in sql.sample])
sql.sample = []
# if "info_placement" not in sql._misc_info:
# sql._misc_info['info_placement'] = []
# info_placement = []
logger.debug(f"PYD Submission type: {self.submissiontype}")
logger.debug(f"SQL Submission Type: {sql.submissiontype}")
if not sql.submissiontype:
sql.submissiontype = SubmissionType.query(name=self.submissiontype['value'])
match sql.submissiontype:
@@ -1749,7 +1587,6 @@ class PydClientSubmission(PydBaseClass):
case _:
sql.submissiontype = SubmissionType.query(name="Default")
for k in list(self.model_fields.keys()) + list(self.model_extra.keys()):
logger.debug(f"Running {k}")
attribute = getattr(self, k)
match k:
case "filepath":
@@ -1757,14 +1594,6 @@ class PydClientSubmission(PydBaseClass):
continue
case _:
pass
# logger.debug(f"Setting {k} to {attribute}")
# if isinstance(attribute, dict):
# if "location" in attribute:
# info_placement.append(dict(name=k, location=attribute['location']))
# else:
# info_placement.append(dict(name=k, location=None))
# # max_row = max([value['location']['row'] for value in info_placement if value])
# sql._misc_info['info_placement'] = info_placement
return sql
@property
@@ -1775,19 +1604,6 @@ class PydClientSubmission(PydBaseClass):
else:
return max([item.submission_rank for item in self.sample])
# def pad_samples_to_length(self, row_count, column_names):
# output_samples = []
# for iii in range(1, row_count + 1):
# try:
# sample = next((item for item in self.samples if item.submission_rank == iii))
# except StopIteration:
# sample = PydSample(sample_id="")
# for column in column_names:
# setattr(sample, column[0], "")
# sample.submission_rank = iii
# output_samples.append(sample)
# return sorted(output_samples, key=lambda x: x.submission_rank)
def improved_dict(self, dictionaries: bool = True) -> dict:
output = super().improved_dict(dictionaries=dictionaries)
output['sample'] = self.sample
@@ -1798,10 +1614,6 @@ class PydClientSubmission(PydBaseClass):
pass
return sort_dict_by_list(output, self.key_value_order)
# @property
# def writable_dict(self):
# output = self.improved_dict()
@property
def filename_template(self):
submissiontype = SubmissionType.query(name=self.submissiontype['value'])