From 610859d84f9567e2ba3d9ced5dc9abc7102d99fb Mon Sep 17 00:00:00 2001 From: lwark Date: Thu, 4 Sep 2025 15:21:50 -0500 Subject: [PATCH] Code cleanup for validators complete. --- src/submissions/backend/__init__.py | 4 +- .../backend/validators/__init__.py | 20 +- src/submissions/backend/validators/pydant.py | 424 +++++------------- 3 files changed, 121 insertions(+), 327 deletions(-) diff --git a/src/submissions/backend/__init__.py b/src/submissions/backend/__init__.py index ee800e3..a34cd2b 100644 --- a/src/submissions/backend/__init__.py +++ b/src/submissions/backend/__init__.py @@ -1,6 +1,6 @@ -''' +""" Contains database, validators and excel operations. -''' +""" from .db import * from .excel import * from .validators import * \ No newline at end of file diff --git a/src/submissions/backend/validators/__init__.py b/src/submissions/backend/validators/__init__.py index eba5dc6..a32bdca 100644 --- a/src/submissions/backend/validators/__init__.py +++ b/src/submissions/backend/validators/__init__.py @@ -48,8 +48,6 @@ class ClientSubmissionNamer(DefaultNamer): if not sub_type: logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.") sub_type = SubmissionType.query(name="Default") - logger.debug(f"Submission Type: {sub_type}") - # sys.exit() return sub_type def get_subtype_from_regex(self) -> SubmissionType: @@ -84,9 +82,6 @@ class ClientSubmissionNamer(DefaultNamer): return sub_type - - - class RSLNamer(object): """ Object that will enforce proper formatting on RSL plate names. @@ -98,17 +93,10 @@ class RSLNamer(object): self.submission_type = submission_type if not self.submission_type: self.submission_type = self.retrieve_submission_type(filename=filename) - # logger.info(f"got submission type: {self.submission_type}") if self.submission_type: - # self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type) self.sub_object = SubmissionType.query(name=self.submission_type['name'], limit=1) self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex( submission_type=self.submission_type)) - # if not data: - # data = dict(submission_type=self.submission_type) - # if "proceduretype" not in data.keys(): - # data['proceduretype'] = self.submission_type - # self.parsed_name = self.sub_object.enforce_name(instr=self.parsed_name, data=data) logger.info(f"Parsed name: {self.parsed_name}") @classmethod @@ -227,7 +215,6 @@ class RSLNamer(object): Returns: str: Output filename """ - logger.debug(data) if "submitted_date" in data.keys(): if isinstance(data['submitted_date'], dict): if data['submitted_date']['value'] is not None: @@ -244,13 +231,8 @@ class RSLNamer(object): today = datetime.now() if isinstance(today, str): today = datetime.strptime(today, "%Y-%m-%d") - # if "name" in data.keys(): - # logger.debug(f"Found name: {data['name']}") - # plate_number = data['name'].split("-")[-1][0] - # else: previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype']) plate_number = len(previous) + 1 - logger.debug(f"Using plate number: {plate_number}") return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}" @classmethod @@ -283,5 +265,5 @@ class RSLNamer(object): return "" -from .pydant import PydRun, PydKitType, PydContact, PydOrganization, PydSample, PydReagent, PydReagentRole, \ +from .pydant import PydRun, PydKitType, PydContact, PydClientLab, PydSample, PydReagent, PydReagentRole, \ PydEquipment, PydEquipmentRole, PydTips, PydProcess, PydElastic, PydClientSubmission, PydProcedure, PydResults diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 9c45232..5b75d7b 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -11,7 +11,7 @@ from typing import List, Tuple, Literal from types import GeneratorType from . import RSLNamer from pathlib import Path -from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list +from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list, row_keys from backend.db import models from backend.db.models import * from sqlalchemy.exc import StatementError @@ -35,7 +35,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): @model_validator(mode="before") @classmethod def prevalidate(cls, data): - # logger.debug(f"Initial data for {cls.__name__}: {pformat(data)}") sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)] output = {} try: @@ -54,9 +53,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): @model_validator(mode='after') @classmethod def validate_model(cls, data): - # _sql_object = getattr(models, cls.__name__.replace("Pyd", "")) - - # total_dict = data.model_fields.update(data.model_extra) for key, value in data.model_extra.items(): if key in cls._sql_object.timestamps: if isinstance(value, str): @@ -70,11 +66,11 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): data.__setattr__(key, value) return data - def __init__(self, **data): - # NOTE: Grab the sql model for validation purposes. - # self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", "")) - - super().__init__(**data) + # def __init__(self, **data): + # # NOTE: Grab the sql model for validation purposes. + # # self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", "")) + # + # super().__init__(**data) def filter_field(self, key: str) -> Any: """ @@ -121,8 +117,6 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): def to_sql(self): dicto = self.improved_dict(dictionaries=False) - logger.debug(f"Dicto: {dicto}") - # sql, new = self._sql_object().query_or_create(**dicto) sql, new = self._sql_object.query_or_create(**dicto) if new: logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}") @@ -226,11 +220,6 @@ class PydReagent(PydBaseClass): else: return values.data['reagentrole'].strip() - # @field_validator("reagentrole", mode="before") - # @classmethod - # def rescue_reagentrole(cls, value): - # if - def improved_dict(self) -> dict: """ Constructs a dictionary consisting of model.fields and model.extras @@ -261,31 +250,11 @@ class PydReagent(PydBaseClass): reagentrole = ReagentRole.query(name=self.reagentrole) reagent.reagentrole = reagentrole reagent.expiry = self.expiry - # logger.debug(f"Reagent: {reagent}") - # if reagent is None: - # reagent = Reagent() - # for key, value in self.__dict__.items(): - # if isinstance(value, dict): - # if key == "misc_info": - # value = value - # else: - # value = value['value'] - # # NOTE: reagent method sets fields based on keys in dictionary - # reagent.set_attribute(key, value) - # if procedure is not None and reagent not in procedure.reagents: - # assoc = ProcedureReagentAssociation(reagent=reagent, procedure=procedure) - # assoc.comments = self.comment - # else: - # assoc = None - # else: - # if submission is not None and reagent not in submission.reagents: - # submission.update_reagentassoc(reagent=reagent, role=self.role) return reagent, report class PydSample(PydBaseClass): sample_id: str - # sampletype: str | None = Field(default=None) submission_rank: int | List[int] | None = Field(default=0, validate_default=True) enabled: bool = Field(default=True) row: int = Field(default=0) @@ -328,7 +297,6 @@ class PydSample(PydBaseClass): def improved_dict(self, dictionaries: bool = True) -> dict: output = super().improved_dict(dictionaries=dictionaries) output['name'] = self.sample_id - # del output['sampletype'] return output def to_sql(self): @@ -340,21 +308,6 @@ class PydSample(PydBaseClass): class PydTips(PydBaseClass): name: str lot: str | None = Field(default=None) - # tips: str - - # @field_validator('tips', mode='before') - # @classmethod - # def get_role_name(cls, value): - # if isinstance(value, list): - # output = [] - # for tips in value: - # if isinstance(tips, Tips): - # tips = tips.name - # output.append(tips) - # value = output - # if isinstance(value, Tips): - # value = value.name - # return value @report_result def to_sql(self) -> Tuple[Tips, Report]: @@ -369,9 +322,6 @@ class PydTips(PydBaseClass): """ report = Report() tips = TipsLot.query(name=self.name, limit=1) - # logger.debug(f"Tips query has yielded: {tips}") - # assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1) - # logger.debug(f"Got association: {assoc}") return tips, report @@ -379,7 +329,6 @@ class PydEquipment(PydBaseClass): asset_number: str name: str nickname: str | None - # process: List[dict] | None process: List[PydProcess] | PydProcess | None equipmentrole: str | PydEquipmentRole | None tips: List[PydTips] | PydTips | None = Field(default=[]) @@ -401,8 +350,6 @@ class PydEquipment(PydBaseClass): @field_validator('process', mode='before') @classmethod def process_to_pydantic(cls, value, values): - # if isinstance(value, dict): - # value = value['processes'] if isinstance(value, GeneratorType): value = [item for item in value] value = convert_nans_to_nones(value) @@ -412,13 +359,9 @@ class PydEquipment(PydBaseClass): value = value.to_pydantic(pyd_model_name="PydProcess") else: try: - # value = [item.strip() for item in value] d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None) - print(f"Next process: {d.details_dict()}") if d: - # value = PydProcess(**d.details_dict()) value = d.to_pydantic() - # value = next((process.to_pydantic() for process in value)) except AttributeError as e: logger.error(f"Process Validation error due to {e}") pass @@ -427,8 +370,6 @@ class PydEquipment(PydBaseClass): @field_validator('tips', mode='before') @classmethod def tips_to_pydantic(cls, value, values): - # if isinstance(value, dict): - # value = value['processes'] if isinstance(value, GeneratorType): value = [item for item in value] value = convert_nans_to_nones(value) @@ -438,44 +379,16 @@ class PydEquipment(PydBaseClass): value = value.to_pydantic(pyd_model_name="PydTips") else: try: - # value = [item.strip() for item in value] d: Tips = next( (tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]), None) - print(f"Next process: {d.details_dict()}") if d: - # value = PydProcess(**d.details_dict()) value = d.to_pydantic() - # value = next((process.to_pydantic() for process in value)) except AttributeError as e: logger.error(f"Process Validation error due to {e}") pass return value - # @field_validator('tips', mode='before') - # @classmethod - # def tips_to_pydantic(cls, value): - # match value: - # case list(): - # output = [] - # for tips in value: - # match tips: - # case Tips(): - # tips = tips.to_pydantic() - # case dict(): - # tips = PydTips(**tips) - # case _: - # continue - # output.append(tips) - # case _: - # output = value - # return output - # - # @field_validator('tips') - # @classmethod - # def single_out_tips(cls, value, values): - # return value - @report_result def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[ Equipment, ProcedureEquipmentAssociation]: @@ -493,7 +406,6 @@ class PydEquipment(PydBaseClass): procedure = Procedure.query(name=procedure) if isinstance(proceduretype, str): proceduretype = ProcedureType.query(name=proceduretype) - logger.debug(f"Querying equipment: {self.asset_number}") equipment = Equipment.query(asset_number=self.asset_number) if equipment is None: logger.error("No equipment found. Returning None.") @@ -507,7 +419,6 @@ class PydEquipment(PydBaseClass): logger.error(f"Couldn't get association due to {e}, returning...") return None, None if new: - # assoc = ProcedureEquipmentAssociation(procedure=procedure, equipment=equipment) # TODO: This seems precarious. What if there is more than one process? # NOTE: It looks like the way fetching the process is done in the SQL model, this shouldn't be a problem, but I'll include a failsafe. # NOTE: I need to find a way to filter this by the kittype involved. @@ -517,7 +428,6 @@ class PydEquipment(PydBaseClass): process = Process.query(name=self.processes[0], limit=1) if process is None: logger.error(f"Found unknown process: {process}.") - logger.debug(f"Using process: {process}") assoc.process = process assoc.equipmentrole = self.equipmentrole else: @@ -662,7 +572,6 @@ class PydRun(PydBaseClass): #, extra='allow'): if "pytest" in sys.modules and sub_type.replace(" ", "") == "BasicRun": output = "RSL-BS-Test001" else: - # try: output = RSLNamer(filename=sub_type.filepath.__str__(), submission_type=sub_type.submissiontype, data=values.data).parsed_name return dict(value=output, missing=True) @@ -685,10 +594,7 @@ class PydRun(PydBaseClass): #, extra='allow'): super().__init__(**data) # NOTE: this could also be done with default_factory submission_type = self.clientsubmission.submissiontype - # logger.debug(submission_type) self.namer = RSLNamer(self.rsl_plate_number['value'], submission_type=submission_type) - # if run_custom: - # self.submission_object.custom_validation(pyd=self) def set_attribute(self, key: str, value): """ @@ -790,18 +696,14 @@ class PydRun(PydBaseClass): #, extra='allow'): """ report = Report() dicto = self.improved_dict() - # logger.debug(f"Pydantic procedure type: {self.proceduretype['value']}") - # logger.debug(f"Pydantic improved_dict: {pformat(dicto)}") instance, result = Run.query_or_create(submissiontype=self.submission_type['value'], rsl_plate_number=self.rsl_plate_number['value']) - # logger.debug(f"Created or queried instance: {instance}") if instance is None: report.add_result(Result(msg="Overwrite Cancelled.")) return None, report report.add_result(result) self.handle_duplicate_samples() for key, value in dicto.items(): - # logger.debug(f"Checking key {key}, value {value}") if isinstance(value, dict): try: value = value['value'] @@ -857,7 +759,6 @@ class PydRun(PydBaseClass): #, extra='allow'): value = value instance.set_attribute(key=key, value=value) case item if item in instance.jsons: - # logger.debug(f"Validating json value: {item} to value:{pformat(value)}") try: ii = value.items() except AttributeError: @@ -867,7 +768,6 @@ class PydRun(PydBaseClass): #, extra='allow'): value[k] = v.strftime("%Y-%m-%d %H:%M:%S") else: pass - # logger.debug(f"Setting json value: {item} to value:{pformat(value)}") instance.set_attribute(key=key, value=value) case _: try: @@ -914,22 +814,8 @@ class PydRun(PydBaseClass): #, extra='allow'): SubmissionFormWidget: Submission form widget """ from frontend.widgets.submission_widget import SubmissionFormWidget - try: - logger.debug(f"PCR info: {self.pcr_info}") - except AttributeError: - pass return SubmissionFormWidget(parent=parent, pyd=self, disable=disable) - # def to_writer(self) -> "SheetWriter": - # """ - # Sends data here to the sheet writer. - # - # Returns: - # SheetWriter: Sheetwriter object that will perform writing. - # """ - # from backend.excel.writer import SheetWriter - # return SheetWriter(self) - def construct_filename(self) -> str: """ Creates filename for this instance @@ -942,45 +828,43 @@ class PydRun(PydBaseClass): #, extra='allow'): "/", "") return render - def check_kit_integrity(self, extraction_kit: str | dict | None = None, exempt: List[PydReagent] = []) -> Tuple[ - List[PydReagent], Report, List[PydReagent]]: - """ - Ensures all reagents expected in kittype are listed in Submission - - Args: - extraction_kit (str | dict | None, optional): kittype to be checked. Defaults to None. - exempt (List[PydReagent], optional): List of reagents that don't need to be checked. Defaults to [] - - Returns: - Tuple[List[PydReagent], Report]: List of reagents and Result object containing a message and any missing components. - """ - report = Report() - # logger.debug(f"The following reagents are exempt from the kittype integrity check:\n{exempt}") - if isinstance(extraction_kit, str): - extraction_kit = dict(value=extraction_kit) - if extraction_kit is not None and extraction_kit != self.extraction_kit['value']: - self.extraction_kit['value'] = extraction_kit['value'] - ext_kit = KitType.query(name=self.extraction_kit['value']) - ext_kit_rtypes = [item.to_pydantic() for item in - ext_kit.get_reagents(required_only=True, proceduretype=self.submission_type['value'])] - # NOTE: Exclude any reagenttype found in this pydclientsubmission not expected in kittype. - expected_check = [item.equipmentrole for item in ext_kit_rtypes] - output_reagents = [rt for rt in self.reagents if rt.role in expected_check] - missing_check = [item.role for item in output_reagents] - missing_reagents = [rt for rt in ext_kit_rtypes if - rt.equipmentrole not in missing_check and rt.equipmentrole not in exempt] - # logger.debug(f"Missing reagents: {missing_reagents}") - missing_reagents += [rt for rt in output_reagents if rt.missing] - output_reagents += [rt for rt in missing_reagents if rt not in output_reagents] - # NOTE: if lists are equal return no problem - if len(missing_reagents) == 0: - result = None - else: - result = Result( - msg=f"The excel sheet you are importing is missing some reagents expected by the kittype.\n\nIt looks like you are missing: {[item.equipmentrole.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kittype.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", - status="Warning") - report.add_result(result) - return output_reagents, report, missing_reagents + # def check_kit_integrity(self, extraction_kit: str | dict | None = None, exempt: List[PydReagent] = []) -> Tuple[ + # List[PydReagent], Report, List[PydReagent]]: + # """ + # Ensures all reagents expected in kittype are listed in Submission + # + # Args: + # extraction_kit (str | dict | None, optional): kittype to be checked. Defaults to None. + # exempt (List[PydReagent], optional): List of reagents that don't need to be checked. Defaults to [] + # + # Returns: + # Tuple[List[PydReagent], Report]: List of reagents and Result object containing a message and any missing components. + # """ + # report = Report() + # if isinstance(extraction_kit, str): + # extraction_kit = dict(value=extraction_kit) + # if extraction_kit is not None and extraction_kit != self.extraction_kit['value']: + # self.extraction_kit['value'] = extraction_kit['value'] + # ext_kit = KitType.query(name=self.extraction_kit['value']) + # ext_kit_rtypes = [item.to_pydantic() for item in + # ext_kit.get_reagents(required_only=True, proceduretype=self.submission_type['value'])] + # # NOTE: Exclude any reagenttype found in this pydclientsubmission not expected in kittype. + # expected_check = [item.equipmentrole for item in ext_kit_rtypes] + # output_reagents = [rt for rt in self.reagents if rt.role in expected_check] + # missing_check = [item.role for item in output_reagents] + # missing_reagents = [rt for rt in ext_kit_rtypes if + # rt.equipmentrole not in missing_check and rt.equipmentrole not in exempt] + # missing_reagents += [rt for rt in output_reagents if rt.missing] + # output_reagents += [rt for rt in missing_reagents if rt not in output_reagents] + # # NOTE: if lists are equal return no problem + # if len(missing_reagents) == 0: + # result = None + # else: + # result = Result( + # msg=f"The excel sheet you are importing is missing some reagents expected by the kittype.\n\nIt looks like you are missing: {[item.equipmentrole.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kittype.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", + # status="Warning") + # report.add_result(result) + # return output_reagents, report, missing_reagents def check_reagent_expiries(self, exempt: List[PydReagent] = []): report = Report() @@ -1039,9 +923,7 @@ class PydContact(BaseModel): area_regex = re.compile(r"^\(?(\d{3})\)?(-| )?") if len(value) > 8: match = area_regex.match(value) - # logger.debug(f"Match: {match.group(1)}") value = area_regex.sub(f"({match.group(1).strip()}) ", value) - # logger.debug(f"Output phone: {value}") return value @report_result @@ -1065,7 +947,7 @@ class PydContact(BaseModel): value = getattr(self, field) match field: case "organization": - value = [Organization.query(name=value)] + value = [ClientLab.query(name=value)] case _: pass try: @@ -1075,7 +957,7 @@ class PydContact(BaseModel): return instance, report -class PydOrganization(BaseModel): +class PydClientLab(BaseModel): name: str cost_centre: str contact: List[PydContact] | None @@ -1109,7 +991,6 @@ class PydOrganization(BaseModel): value = [item.to_sql() for item in value if item] case _: value = getattr(self, field) - logger.debug(f"Setting {field} to {value}") if value: setattr(instance, field, value) return instance, report @@ -1128,50 +1009,50 @@ class PydReagentRole(BaseModel): return timedelta(days=value) return value - @report_result - def to_sql(self, kit: KitType) -> ReagentRole: - """ - Converts this instance into a backend.db.models.ReagentType instance - - Args: - kit (KitType): KitType joined to the reagentrole - - Returns: - ReagentRole: ReagentType instance - """ - report = Report() - instance: ReagentRole = ReagentRole.query(name=self.name) - if instance is None: - instance = ReagentRole(name=self.name, eol_ext=self.eol_ext) - try: - assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit) - except StatementError: - assoc = None - if assoc is None: - assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses, - required=self.required) - return instance, report + # @report_result + # def to_sql(self, kit: KitType) -> ReagentRole: + # """ + # Converts this instance into a backend.db.models.ReagentType instance + # + # Args: + # kit (KitType): KitType joined to the reagentrole + # + # Returns: + # ReagentRole: ReagentType instance + # """ + # report = Report() + # instance: ReagentRole = ReagentRole.query(name=self.name) + # if instance is None: + # instance = ReagentRole(name=self.name, eol_ext=self.eol_ext) + # try: + # assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit) + # except StatementError: + # assoc = None + # if assoc is None: + # assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses, + # required=self.required) + # return instance, report -class PydKitType(BaseModel): - name: str - reagent_roles: List[PydReagent] = [] - - @report_result - def to_sql(self) -> Tuple[KitType, Report]: - """ - Converts this instance into a backend.db.models.kits.KitType instance - - Returns: - Tuple[KitType, Report]: KitType instance and report of results. - """ - report = Report() - instance = KitType.query(name=self.name) - if instance is None: - instance = KitType(name=self.name) - for role in self.reagent_roles: - role.to_sql(instance) - return instance, report +# class PydKitType(BaseModel): +# name: str +# reagent_roles: List[PydReagent] = [] +# +# @report_result +# def to_sql(self) -> Tuple[KitType, Report]: +# """ +# Converts this instance into a backend.db.models.kits.KitType instance +# +# Returns: +# Tuple[KitType, Report]: KitType instance and report of results. +# """ +# report = Report() +# instance = KitType.query(name=self.name) +# if instance is None: +# instance = KitType(name=self.name) +# for role in self.reagent_roles: +# role.to_sql(instance) +# return instance, report class PydEquipmentRole(BaseModel): @@ -1210,7 +1091,6 @@ class PydProcess(PydBaseClass, extra="allow"): @field_validator("tips", mode="before") @classmethod def enforce_list(cls, value): - # logger.debug(f"Validating field: {value}") if not isinstance(value, list): value = [value] output = [] @@ -1240,10 +1120,8 @@ class PydProcess(PydBaseClass, extra="allow"): def to_sql(self): report = Report() name = self.name.split("-")[0] - logger.debug(f"Query process: {self.name}, version = {self.version}") # NOTE: can't use query_or_create due to name not being part of ProcessVersion instance = ProcessVersion.query(name=name, version=self.version, limit=1) - logger.debug(f"Got instance: {instance}") if not instance: instance = ProcessVersion() return instance, report @@ -1255,7 +1133,6 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True): @report_result def to_sql(self): - # print(self.instance) fields = [item for item in self.model_extra] for field in fields: try: @@ -1265,11 +1142,8 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True): continue match field_type: case _RelationshipDeclared(): - # logger.debug(f"{field} is a relationship with {field_type.entity.class_}") field_value = field_type.entity.class_.argument.query(name=getattr(self, field)) - # logger.debug(f"{field} query result: {field_value}") case ColumnProperty(): - # logger.debug(f"{field} is a property.") field_value = getattr(self, field) self.instance.__setattr__(field, field_value) return self.instance @@ -1343,26 +1217,11 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): @field_validator("reagentrole") @classmethod def rescue_reagentrole(cls, value, values): - # if not value: - # match values.data['kittype']: - # case dict(): - # if "value" in values.data['kittype'].keys(): - # roi = values.data['kittype']['value'] - # elif "name" in values.data['kittype'].keys(): - # roi = values.data['kittype']['name'] - # else: - # raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}") - # case str(): - # roi = values.data['kittype'] - # if roi != cls.model_fields['kittype'].default['value']: - # kittype = KitType.query(name=roi) - # value = {item.name: item.reagent for item in kittype.reagentrole} if not value: value = {} for reagentrole in values.data['proceduretype'].reagentrole: reagents = [reagent.lot_dicts for reagent in reagentrole.reagent] value[reagentrole.name] = flatten_list(reagents) - # value = {item.name: item.reagent for item in values.data['proceduretype'].reagentrole} return value @field_validator("run") @@ -1400,25 +1259,25 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): except TypeError: return len(self.sample) - def update_kittype_reagentroles(self, kittype: str | KitType): - if kittype == self.__class__.model_fields['kittype'].default['value']: - return - if isinstance(kittype, str): - kittype_obj = KitType.query(name=kittype) - try: - self.reagentrole = { - item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")] - for item in - kittype_obj.get_reagents(proceduretype=self.proceduretype)} - except AttributeError: - self.reagentrole = {} - reordered_options = {} - if self.reagentrole: - for k, v in self.reagentrole.items(): - reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v) - self.reagentrole = reordered_options - self.kittype['value'] = kittype - self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) + # def update_kittype_reagentroles(self, kittype: str | KitType): + # if kittype == self.__class__.model_fields['kittype'].default['value']: + # return + # if isinstance(kittype, str): + # kittype_obj = KitType.query(name=kittype) + # try: + # self.reagentrole = { + # item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")] + # for item in + # kittype_obj.get_reagents(proceduretype=self.proceduretype)} + # except AttributeError: + # self.reagentrole = {} + # reordered_options = {} + # if self.reagentrole: + # for k, v in self.reagentrole.items(): + # reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v) + # self.reagentrole = reordered_options + # self.kittype['value'] = kittype + # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) def reorder_reagents(self, reagentrole: str, options: list): reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None) @@ -1430,26 +1289,24 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): options.insert(0, options.pop(options.index(roi))) return options - def update_kittype_equipmentroles(self, kittype: str | KitType): - if kittype == self.__class__.model_fields['kittype'].default['value']: - return - if isinstance(kittype, str): - kittype_obj = KitType.query(name=kittype) - try: - self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in - kittype_obj.get_reagents(proceduretype=self.proceduretype)} - except AttributeError: - self.reagentrole = {} - self.kittype['value'] = kittype - self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) + # def update_kittype_equipmentroles(self, kittype: str | KitType): + # if kittype == self.__class__.model_fields['kittype'].default['value']: + # return + # if isinstance(kittype, str): + # kittype_obj = KitType.query(name=kittype) + # try: + # self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in + # kittype_obj.get_reagents(proceduretype=self.proceduretype)} + # except AttributeError: + # self.reagentrole = {} + # self.kittype['value'] = kittype + # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) def update_samples(self, sample_list: List[dict]): - # logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}") for iii, sample_dict in enumerate(sample_list, start=1): if sample_dict['sample_id'].startswith("blank_"): sample_dict['sample_id'] = "" row, column = self.proceduretype.ranked_plate[sample_dict['index']] - # logger.debug(f"Row: {row}, Column: {column}") try: sample = next( (item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper())) @@ -1468,8 +1325,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): sample.row = row sample.column = column sample.procedure_rank = sample_dict['index'] - # logger.debug(f"Sample of interest: {sample.improved_dict()}") - # logger.debug(f"Updated samples:\n{pformat(self.sample)}") def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str): try: @@ -1484,7 +1339,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): idx = 0 insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) self.reagent.insert(idx, insertable) - # logger.debug(self.reagent) @classmethod def update_new_reagents(cls, reagent: PydReagent): @@ -1505,7 +1359,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): sql.technician = self.technician['value'] else: sql.technician = self.technician - # sql.repeat = int(self.repeat) if sql.repeat: regex = re.compile(r".*\dR\d$") repeats = [item for item in self.run.procedure if @@ -1529,14 +1382,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole) else: removable = [] - logger.debug(f"Removable: {removable}") if removable: if isinstance(removable, list): for r in removable: r.delete() else: removable.delete() - logger.debug(f"Adding {reagent} to {sql}") reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole) try: start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1 @@ -1603,13 +1454,6 @@ class PydClientSubmission(PydBaseClass): submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) sample: List[PydSample] | None = Field(default=[]) - # @field_validator("submissiontype", mode="before") - # @classmethod - # def enforce_submissiontype(cls, value): - # if isinstance(value, str): - # value = dict(value=value, missing=False) - # return value - @field_validator("submissiontype", "clientlab", "contact", mode="before") @classmethod def enforce_value(cls, value): @@ -1728,7 +1572,6 @@ class PydClientSubmission(PydBaseClass): """ from frontend.widgets.submission_widget import ClientSubmissionFormWidget if not samples: - # samples = [sample for sample in self.sample if sample.sample_id.lower() not in ["", "blank"]] samples = self.sample return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable) @@ -1736,11 +1579,6 @@ class PydClientSubmission(PydBaseClass): sql = super().to_sql() assert not any([isinstance(item, PydSample) for item in sql.sample]) sql.sample = [] - # if "info_placement" not in sql._misc_info: - # sql._misc_info['info_placement'] = [] - # info_placement = [] - logger.debug(f"PYD Submission type: {self.submissiontype}") - logger.debug(f"SQL Submission Type: {sql.submissiontype}") if not sql.submissiontype: sql.submissiontype = SubmissionType.query(name=self.submissiontype['value']) match sql.submissiontype: @@ -1749,7 +1587,6 @@ class PydClientSubmission(PydBaseClass): case _: sql.submissiontype = SubmissionType.query(name="Default") for k in list(self.model_fields.keys()) + list(self.model_extra.keys()): - logger.debug(f"Running {k}") attribute = getattr(self, k) match k: case "filepath": @@ -1757,14 +1594,6 @@ class PydClientSubmission(PydBaseClass): continue case _: pass - # logger.debug(f"Setting {k} to {attribute}") - # if isinstance(attribute, dict): - # if "location" in attribute: - # info_placement.append(dict(name=k, location=attribute['location'])) - # else: - # info_placement.append(dict(name=k, location=None)) - # # max_row = max([value['location']['row'] for value in info_placement if value]) - # sql._misc_info['info_placement'] = info_placement return sql @property @@ -1775,19 +1604,6 @@ class PydClientSubmission(PydBaseClass): else: return max([item.submission_rank for item in self.sample]) - # def pad_samples_to_length(self, row_count, column_names): - # output_samples = [] - # for iii in range(1, row_count + 1): - # try: - # sample = next((item for item in self.samples if item.submission_rank == iii)) - # except StopIteration: - # sample = PydSample(sample_id="") - # for column in column_names: - # setattr(sample, column[0], "") - # sample.submission_rank = iii - # output_samples.append(sample) - # return sorted(output_samples, key=lambda x: x.submission_rank) - def improved_dict(self, dictionaries: bool = True) -> dict: output = super().improved_dict(dictionaries=dictionaries) output['sample'] = self.sample @@ -1798,10 +1614,6 @@ class PydClientSubmission(PydBaseClass): pass return sort_dict_by_list(output, self.key_value_order) - # @property - # def writable_dict(self): - # output = self.improved_dict() - @property def filename_template(self): submissiontype = SubmissionType.query(name=self.submissiontype['value'])