""" Contains pydantic models and accompanying validators """ from __future__ import annotations import re, logging, csv, sys, string from pydantic import BaseModel, field_validator, Field, model_validator from datetime import date, datetime, timedelta from dateutil.parser import parse from dateutil.parser import ParserError from typing import List, Tuple, Literal from types import GeneratorType from . import RSLNamer from pathlib import Path from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list from backend.db import models from backend.db.models import * from sqlalchemy.exc import StatementError from sqlalchemy.orm.properties import ColumnProperty from sqlalchemy.orm.relationships import _RelationshipDeclared from sqlalchemy.orm.attributes import InstrumentedAttribute from PyQt6.QtWidgets import QWidget logger = logging.getLogger(f"submission.{__name__}") class PydBaseClass(BaseModel, extra='allow', validate_assignment=True): # _sql_object: ClassVar = None key_value_order: ClassVar = [] @classproperty def _sql_object(cls): return getattr(models, cls.__name__.replace("Pyd", "")) @model_validator(mode="before") @classmethod def prevalidate(cls, data): sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)] output = {} try: items = data.items() except AttributeError as e: logger.error(f"Could not prevalidate {cls.__name__} due to {e} for {pformat(data)}") return data for key, value in items: new_key = key.replace("_", "") if new_key in sql_fields: output[new_key] = value else: output[key] = value return output @model_validator(mode='after') @classmethod def validate_model(cls, data): # _sql_object = getattr(models, cls.__name__.replace("Pyd", "")) # total_dict = data.model_fields.update(data.model_extra) for key, value in data.model_extra.items(): if key in cls._sql_object.timestamps: if isinstance(value, str): data.__setattr__(key, datetime.strptime(value, "%Y-%m-%d")) if key == "row" and isinstance(value, str): if value.lower() in string.ascii_lowercase[0:8]: try: value = row_keys[value] except KeyError: value = value data.__setattr__(key, value) return data def __init__(self, **data): # NOTE: Grab the sql model for validation purposes. # self.__class__._sql_object = getattr(models, self.__class__.__name__.replace("Pyd", "")) logger.debug(f"Initial data: {data}") super().__init__(**data) def filter_field(self, key: str) -> Any: """ Attempts to get value from field dictionary Args: key (str): name of the field of interest Returns: Any (): Value found. """ item = getattr(self, key) match item: case dict(): try: item = item['value'] except KeyError: logger.error(f"Couldn't get dict value: {item}") case _: pass return item def improved_dict(self, dictionaries: bool = True) -> dict: """ Adds model_extra to fields. Args: dictionaries (bool, optional): Are dictionaries expected as input? i.e. Should key['value'] be retrieved. Defaults to True. Returns: dict: This instance as a dictionary """ fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) if dictionaries: output = {k: getattr(self, k) for k in fields} else: output = {k: self.filter_field(k) for k in fields} if "misc_info" in output.keys(): for k, v in output['misc_info'].items(): if k not in output.keys(): output[k] = v del output['misc_info'] return output def to_sql(self): dicto = self.improved_dict(dictionaries=False) logger.debug(f"Dicto: {dicto}") # sql, new = self._sql_object().query_or_create(**dicto) sql, new = self._sql_object.query_or_create(**dicto) if new: logger.warning(f"Creating new {self._sql_object} with values:\n{pformat(dicto)}") return sql @property def fields(self): output = [] for k, v in self.improved_dict().items(): match v: case str() | int() | float() | datetime() | date(): output.append(k) case x if issubclass(v.__class__, PydBaseClass): output.append(k) case _: continue return list(set(output)) class PydReagent(PydBaseClass): lot: str | None reagentrole: str | None expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True) name: str | None = Field(default=None, validate_default=True) missing: bool = Field(default=True) comment: str | None = Field(default="", validate_default=True) @field_validator('comment', mode='before') @classmethod def create_comment(cls, value): if value is None: return "" return value @field_validator("reagentrole", mode='before') @classmethod def remove_undesired_types(cls, value): match value: case "atcc": return None case _: return value @field_validator("reagentrole") @classmethod def rescue_type_with_lookup(cls, value, values): if value is None and values.data['lot'] is not None: try: return Reagent.query(lot=values.data['lot']).name except AttributeError: return value return value @field_validator("lot", mode='before') @classmethod def rescue_lot_string(cls, value): if value is not None: return convert_nans_to_nones(str(value).strip()) return value @field_validator("lot") @classmethod def enforce_lot_string(cls, value): if value is not None: return value.upper().strip() return value @field_validator("expiry", mode="before") @classmethod def enforce_date(cls, value): if value is not None: match value: case int(): return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2) case 'NA': return value case str(): return parse(value) case date(): return datetime.combine(value, datetime.max.time()) case datetime(): return value case _: return convert_nans_to_nones(str(value)) if value is None: value = datetime.combine(date.today(), datetime.max.time()) return value @field_validator("expiry") @classmethod def date_na(cls, value): if isinstance(value, date) and value.year == 1970: value = "NA" return value @field_validator("name", mode="before") @classmethod def enforce_name(cls, value, values): if value is not None: return convert_nans_to_nones(str(value).strip()) else: return values.data['reagentrole'].strip() # @field_validator("reagentrole", mode="before") # @classmethod # def rescue_reagentrole(cls, value): # if def improved_dict(self) -> dict: """ Constructs a dictionary consisting of model.fields and model.extras Returns: dict: Information dictionary """ try: extras = list(self.model_extra.keys()) except AttributeError: extras = [] fields = list(self.model_fields.keys()) + extras return {k: getattr(self, k) for k in fields} @report_result def to_sql(self, procedure: Procedure | str = None) -> Tuple[Reagent, Report]: """ Converts this instance into a backend.db.models.kittype.Reagent instance Returns: Tuple[Reagent, Report]: Reagent instance and result of function """ report = Report() if self.model_extra is not None: self.__dict__.update(self.model_extra) reagent, new = ReagentLot.query_or_create(lot=self.lot, name=self.name) if new: reagentrole = ReagentRole.query(name=self.reagentrole) reagent.reagentrole = reagentrole reagent.expiry = self.expiry # logger.debug(f"Reagent: {reagent}") # if reagent is None: # reagent = Reagent() # for key, value in self.__dict__.items(): # if isinstance(value, dict): # if key == "misc_info": # value = value # else: # value = value['value'] # # NOTE: reagent method sets fields based on keys in dictionary # reagent.set_attribute(key, value) # if procedure is not None and reagent not in procedure.reagents: # assoc = ProcedureReagentAssociation(reagent=reagent, procedure=procedure) # assoc.comments = self.comment # else: # assoc = None # else: # if submission is not None and reagent not in submission.reagents: # submission.update_reagentassoc(reagent=reagent, role=self.role) return reagent, report class PydSample(PydBaseClass): sample_id: str sampletype: str | None = Field(default=None) submission_rank: int | List[int] | None = Field(default=0, validate_default=True) enabled: bool = Field(default=True) row: int = Field(default=0) column: int = Field(default=0) @field_validator("sample_id", mode="before") @classmethod def int_to_str(cls, value): return str(value) @field_validator("sample_id") @classmethod def strip_sub_id(cls, value): match value: case dict(): value['value'] = value['value'].strip().upper() case str(): value = value.strip().upper() case _: pass return value @field_validator("row", mode="before") @classmethod def row_str_to_int(cls, value): if isinstance(value, str): try: value = row_keys[value] except KeyError: value = 0 return value @field_validator("column", mode="before") @classmethod def column_str_to_int(cls, value): if isinstance(value, str): value = 0 return value def improved_dict(self, dictionaries: bool = True) -> dict: output = super().improved_dict(dictionaries=dictionaries) output['name'] = self.sample_id del output['sampletype'] return output def to_sql(self): sql = super().to_sql() sql._misc_info["submission_rank"] = self.submission_rank return sql class PydTips(PydBaseClass): name: str lot: str | None = Field(default=None) tiprole: str @field_validator('tiprole', mode='before') @classmethod def get_role_name(cls, value): if isinstance(value, list): output = [] for tiprole in value: if isinstance(tiprole, TipRole): tiprole = tiprole.name return tiprole value = output if isinstance(value, TipRole): value = value.name return value @report_result def to_sql(self, procedure: Run) -> ProcedureTipsAssociation: """ Convert this object to the SQL version for database storage. Args: procedure (BasicRun): A procedure object to associate tips represented here. Returns: SubmissionTipsAssociation: Association between queried tips and procedure """ report = Report() tips = Tips.query(name=self.name, limit=1) # logger.debug(f"Tips query has yielded: {tips}") assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1) logger.debug(f"Got association: {assoc}") return assoc, report class PydEquipment(PydBaseClass): asset_number: str name: str nickname: str | None # process: List[dict] | None process: List[PydProcess] | PydProcess | None equipmentrole: str | PydEquipmentRole | None tips: List[PydTips] | PydTips | None = Field(default=[]) @field_validator('equipmentrole', mode='before') @classmethod def get_role_name(cls, value): match value: case list(): value = value[0] case GeneratorType(): value = next(value) case _: pass if isinstance(value, EquipmentRole): value = value.name return value @field_validator('process', mode='before') @classmethod def make_empty_list(cls, value): # if isinstance(value, dict): # value = value['processes'] if isinstance(value, GeneratorType): value = [item for item in value] value = convert_nans_to_nones(value) if not value: value = [''] # logger.debug(value) try: # value = [item.strip() for item in value] d = next((process for process in value), None) logger.debug(f"Next process: {d.detail_dict()}") value = PydProcess(d.details_dict()) # value = next((process.to_pydantic() for process in value)) except AttributeError: pass return value @field_validator('tips', mode='before') @classmethod def tips_to_pydantic(cls, value): match value: case list(): output = [] for tips in value: match tips: case Tips(): tips = tips.to_pydantic() case dict(): tips = PydTips(**tips) case _: continue output.append(tips) case _: output = value return output @field_validator('tips') @classmethod def single_out_tips(cls, value, values): return value @report_result def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[ Equipment, ProcedureEquipmentAssociation]: """ Creates Equipment and SubmssionEquipmentAssociations for this PydEquipment Args: procedure ( BasicRun | str ): BasicRun of interest Returns: Tuple[Equipment, RunEquipmentAssociation]: SQL objects """ report = Report() if isinstance(procedure, str): procedure = Procedure.query(name=procedure) if isinstance(proceduretype, str): proceduretype = ProcedureType.query(name=proceduretype) logger.debug(f"Querying equipment: {self.asset_number}") equipment = Equipment.query(asset_number=self.asset_number) if equipment is None: logger.error("No equipment found. Returning None.") return None, None if procedure is not None: # NOTE: Need to make sure the same association is not added to the procedure try: assoc, new = ProcedureEquipmentAssociation.query_or_create(equipment=equipment, procedure=procedure, equipmentrole=self.equipmentrole, limit=1) except TypeError as e: logger.error(f"Couldn't get association due to {e}, returning...") return None, None if new: # assoc = ProcedureEquipmentAssociation(procedure=procedure, equipment=equipment) # TODO: This seems precarious. What if there is more than one process? # NOTE: It looks like the way fetching the process is done in the SQL model, this shouldn't be a problem, but I'll include a failsafe. # NOTE: I need to find a way to filter this by the kittype involved. if len(self.processes) > 1: process = Process.query(proceduretype=procedure.get_submission_type(), equipmentrole=self.role) else: process = Process.query(name=self.processes[0]) if process is None: logger.error(f"Found unknown process: {process}.") # logger.debug(f"Using process: {process}") assoc.process = process assoc.equipmentrole = self.equipmentrole else: logger.warning(f"Found already existing association: {assoc}") assoc = None else: logger.warning(f"No procedure found") assoc = None return equipment, assoc, report def improved_dict(self) -> dict: """ Constructs a dictionary consisting of model.fields and model.extras Returns: dict: Information dictionary """ try: extras = list(self.model_extra.keys()) except AttributeError: extras = [] fields = list(self.model_fields.keys()) + extras return {k: getattr(self, k) for k in fields} class PydRun(PydBaseClass): #, extra='allow'): clientsubmission: PydClientSubmission | None = Field(default=None) rsl_plate_number: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) started_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True) completed_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True) sample_count: dict | None comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True) sample: List[PydSample] | Generator = Field(default=[]) run_cost: float | dict = Field(default=dict(value=0.0, missing=True)) signed_by: str | dict = Field(default="", validate_default=True) procedure: List[PydProcedure] | Generator = Field(default=[]) @field_validator("signed_by") @classmethod def rescue_signed_by(cls, value): if isinstance(value, str): value = dict(value=value, missing=True) return value @field_validator("run_cost") @classmethod def rescue_run_cost(cls, value): if isinstance(value, float): value = dict(value=value, missing=False) return value @field_validator("started_date", mode="before") @classmethod def rescue_start_date(cls, value): try: check = value['value'] is None except TypeError: check = True if check: return dict(value=date.today(), missing=True) return value @field_validator("completed_date", mode="before") @classmethod def rescue_completed_date(cls, value): try: check = value['value'] is None except TypeError: check = True if check: return dict(value=date.today(), missing=True) return value @field_validator("started_date") @classmethod def strip_started_datetime_string(cls, value): match value['value']: case date(): output = datetime.combine(value['value'], datetime.min.time()) case datetime(): pass case int(): output = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2) case str(): string = re.sub(r"(_|-)\d(R\d)?$", "", value['value']) try: output = dict(value=parse(string).date(), missing=True) except ParserError as e: logger.error(f"Problem parsing date: {e}") try: output = parse(string.replace("-", "")).date() except Exception as e: logger.error(f"Problem with parse fallback: {e}") return value case _: raise ValueError(f"Could not get datetime from {value['value']}") value['value'] = output.replace(tzinfo=timezone) return value @field_validator("completed_date") @classmethod def strip_completed_datetime_string(cls, value): match value['value']: case date(): output = datetime.combine(value['value'], datetime.min.time()) case datetime(): pass case int(): output = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2) case str(): string = re.sub(r"(_|-)\d(R\d)?$", "", value['value']) try: output = dict(value=parse(string).date(), missing=True) except ParserError as e: logger.error(f"Problem parsing date: {e}") try: output = parse(string.replace("-", "")).date() except Exception as e: logger.error(f"Problem with parse fallback: {e}") return value case _: raise ValueError(f"Could not get datetime from {value['value']}") value['value'] = output.replace(tzinfo=timezone) return value @field_validator("rsl_plate_number", mode='before') @classmethod def rescue_rsl_number(cls, value): if value is None: return dict(value=None, missing=True) return value @field_validator("rsl_plate_number") @classmethod def rsl_from_file(cls, value, values): sub_type = values.data['clientsubmission'] if check_not_nan(value['value']): value['value'] = value['value'].strip() return value else: if "pytest" in sys.modules and sub_type.replace(" ", "") == "BasicRun": output = "RSL-BS-Test001" else: # try: output = RSLNamer(filename=sub_type.filepath.__str__(), submission_type=sub_type.submissiontype, data=values.data).parsed_name return dict(value=output, missing=True) @field_validator("sample_count", mode='before') @classmethod def rescue_sample_count(cls, value): if value is None: return dict(value=None, missing=True) return value @field_validator("sample", mode="before") @classmethod def expand_samples(cls, value): if isinstance(value, Generator): return [PydSample(**sample) for sample in value] return value def __init__(self, run_custom: bool = False, **data): super().__init__(**data) # NOTE: this could also be done with default_factory submission_type = self.clientsubmission.submissiontype # logger.debug(submission_type) self.namer = RSLNamer(self.rsl_plate_number['value'], submission_type=submission_type) # if run_custom: # self.submission_object.custom_validation(pyd=self) def set_attribute(self, key: str, value): """ Better handling of attribute setting. Args: key (str): Name of field to set value (_type_): Value to set field to. """ self.__setattr__(name=key, value=value) def handle_duplicate_samples(self): """ Collapses multiple sample with same submitter id into one with lists for rows, columns. Necessary to prevent trying to create duplicate sample in SQL creation. """ submitter_ids = list(set([sample.sample_id for sample in self.samples])) output = [] for id in submitter_ids: relevants = [item for item in self.samples if item.sample_id == id] if len(relevants) <= 1: output += relevants else: rows = [item.row[0] for item in relevants] columns = [item.column[0] for item in relevants] ids = [item.assoc_id[0] for item in relevants] ranks = [item.submission_rank[0] for item in relevants] dummy = relevants[0] dummy.assoc_id = ids dummy.row = rows dummy.column = columns dummy.submission_rank = ranks output.append(dummy) self.samples = output def improved_dict(self, dictionaries: bool = True) -> dict: """ Adds model_extra to fields. Args: dictionaries (bool, optional): Are dictionaries expected as input? i.e. Should key['value'] be retrieved. Defaults to True. Returns: dict: This instance as a dictionary """ fields = list(self.model_fields.keys()) + list(self.model_extra.keys()) if dictionaries: output = {k: getattr(self, k) for k in fields} output['reagents'] = [item.improved_dict() for item in self.reagents] output['sample'] = [item.improved_dict() for item in self.samples] try: output['equipment'] = [item.improved_dict() for item in self.equipment] except TypeError: pass else: output = {k: self.filter_field(k) for k in fields} return output def filter_field(self, key: str) -> Any: """ Attempts to get value from field dictionary Args: key (str): name of the field of interest Returns: Any (): Value found. """ item = getattr(self, key) match item: case dict(): try: item = item['value'] except KeyError: logger.error(f"Couldn't get dict value: {item}") case _: pass return item def find_missing(self) -> Tuple[dict, dict]: """ Retrieves info and reagents marked as missing. Returns: Tuple[dict, dict]: Dict for missing info, dict for missing reagents. """ info = {k: v for k, v in self.improved_dict().items() if isinstance(v, dict)} missing_info = {k: v for k, v in info.items() if v['missing']} missing_reagents = [reagent for reagent in self.reagents if reagent.missing] return missing_info, missing_reagents @report_result def to_sql(self) -> Tuple[Run | None, Report]: """ Converts this instance into a backend.db.models.procedure.BasicRun instance Returns: Tuple[BasicRun, Result]: BasicRun instance, result object """ report = Report() dicto = self.improved_dict() # logger.debug(f"Pydantic procedure type: {self.proceduretype['value']}") # logger.debug(f"Pydantic improved_dict: {pformat(dicto)}") instance, result = Run.query_or_create(submissiontype=self.submission_type['value'], rsl_plate_number=self.rsl_plate_number['value']) # logger.debug(f"Created or queried instance: {instance}") if instance is None: report.add_result(Result(msg="Overwrite Cancelled.")) return None, report report.add_result(result) self.handle_duplicate_samples() for key, value in dicto.items(): # logger.debug(f"Checking key {key}, value {value}") if isinstance(value, dict): try: value = value['value'] except KeyError: if key == "custom": pass else: continue if value is None: continue match key: case "reagents": for reagent in self.reagents: reagent = reagent.to_sql(submission=instance) case "sample": for sample in self.samples: sample, associations, _ = sample.to_sql(run=instance) for assoc in associations: if assoc is not None: if assoc not in instance.clientsubmissionsampleassociation: instance.clientsubmissionsampleassociation.append(assoc) else: logger.warning(f"Sample association {assoc} is already present in {instance}") case "equipment": for equip in self.equipment: if equip is None: continue equip, association = equip.to_sql(procedure=instance, kittype=self.extraction_kit) if association is not None: instance.submission_equipment_associations.append(association) case "tips": for tips in self.tips: if tips is None: continue try: association = tips.to_sql(procedure=instance) except AttributeError: continue if association is not None: if association not in instance.submission_tips_associations: instance.submission_tips_associations.append(association) else: logger.warning(f"Tips association {association} is already present in {instance}") case item if item in instance.timestamps: logger.warning(f"Incoming timestamp key: {item}, with value: {value}") if isinstance(value, date): value = datetime.combine(value, datetime.now().time()) value = value.replace(tzinfo=timezone) elif isinstance(value, str): value: datetime = datetime.strptime(value, "%Y-%m-%d") value = value.replace(tzinfo=timezone) else: value = value instance.set_attribute(key=key, value=value) case item if item in instance.jsons: # logger.debug(f"Validating json value: {item} to value:{pformat(value)}") try: ii = value.items() except AttributeError: ii = {} for k, v in ii: if isinstance(v, datetime): value[k] = v.strftime("%Y-%m-%d %H:%M:%S") else: pass # logger.debug(f"Setting json value: {item} to value:{pformat(value)}") instance.set_attribute(key=key, value=value) case _: try: check = instance.__getattribute__(key) != value except AttributeError: continue if check: try: instance.set_attribute(key=key, value=value) except AttributeError as e: logger.error(f"Could not set attribute: {key} to {value} due to: \n\n {e}") continue except KeyError: continue else: logger.warning(f"{key} already == {value} so no updating.") try: instance.calculate_base_cost() except (TypeError, AttributeError) as e: logger.error(f"Looks like that kittype doesn't have cost breakdown yet due to: {e}, using 0.") try: instance.run_cost = instance.extraction_kit.cost_per_run except AttributeError: instance.run_cost = 0 # NOTE: Apply any discounts that are applicable for client and kittype. try: discounts = [item.amount for item in Discount.query(kittype=instance.extraction_kit, organization=instance.clientlab)] if len(discounts) > 0: instance.run_cost = instance.run_cost - sum(discounts) except Exception as e: logger.error(f"An unknown exception occurred when calculating discounts: {e}") return instance, report def to_form(self, parent: QWidget, disable: list | None = None): """ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget Args: disable (list, optional): a list of widgets to be disabled in the form. Defaults to None. parent (QWidget): parent widget of the constructed object Returns: SubmissionFormWidget: Submission form widget """ from frontend.widgets.submission_widget import SubmissionFormWidget try: logger.debug(f"PCR info: {self.pcr_info}") except AttributeError: pass return SubmissionFormWidget(parent=parent, pyd=self, disable=disable) # def to_writer(self) -> "SheetWriter": # """ # Sends data here to the sheet writer. # # Returns: # SheetWriter: Sheetwriter object that will perform writing. # """ # from backend.excel.writer import SheetWriter # return SheetWriter(self) def construct_filename(self) -> str: """ Creates filename for this instance Returns: str: Output filename """ template = self.clientsubmission.filename_template render = self.namer.construct_export_name(template=template, **self.improved_dict(dictionaries=False)).replace( "/", "") return render def check_kit_integrity(self, extraction_kit: str | dict | None = None, exempt: List[PydReagent] = []) -> Tuple[ List[PydReagent], Report, List[PydReagent]]: """ Ensures all reagents expected in kittype are listed in Submission Args: extraction_kit (str | dict | None, optional): kittype to be checked. Defaults to None. exempt (List[PydReagent], optional): List of reagents that don't need to be checked. Defaults to [] Returns: Tuple[List[PydReagent], Report]: List of reagents and Result object containing a message and any missing components. """ report = Report() # logger.debug(f"The following reagents are exempt from the kittype integrity check:\n{exempt}") if isinstance(extraction_kit, str): extraction_kit = dict(value=extraction_kit) if extraction_kit is not None and extraction_kit != self.extraction_kit['value']: self.extraction_kit['value'] = extraction_kit['value'] ext_kit = KitType.query(name=self.extraction_kit['value']) ext_kit_rtypes = [item.to_pydantic() for item in ext_kit.get_reagents(required_only=True, proceduretype=self.submission_type['value'])] # NOTE: Exclude any reagenttype found in this pydclientsubmission not expected in kittype. expected_check = [item.equipmentrole for item in ext_kit_rtypes] output_reagents = [rt for rt in self.reagents if rt.role in expected_check] missing_check = [item.role for item in output_reagents] missing_reagents = [rt for rt in ext_kit_rtypes if rt.equipmentrole not in missing_check and rt.equipmentrole not in exempt] # logger.debug(f"Missing reagents: {missing_reagents}") missing_reagents += [rt for rt in output_reagents if rt.missing] output_reagents += [rt for rt in missing_reagents if rt not in output_reagents] # NOTE: if lists are equal return no problem if len(missing_reagents) == 0: result = None else: result = Result( msg=f"The excel sheet you are importing is missing some reagents expected by the kittype.\n\nIt looks like you are missing: {[item.equipmentrole.upper() for item in missing_reagents]}\n\nAlternatively, you may have set the wrong extraction kittype.\n\nThe program will populate lists using existing reagents.\n\nPlease make sure you check the lots carefully!", status="Warning") report.add_result(result) return output_reagents, report, missing_reagents def check_reagent_expiries(self, exempt: List[PydReagent] = []): report = Report() expired = [] for reagent in self.reagents: if reagent not in exempt: role_eol = ReagentRole.query(name=reagent.role).eol_ext try: dt = datetime.combine(reagent.expiry, datetime.max.time()) except TypeError: continue if datetime.now() > dt + role_eol: expired.append(f"{reagent.role}, {reagent.lot}: {reagent.expiry.date()} + {role_eol.days}") if expired: output = '\n'.join(expired) result = Result(status="Warning", msg=f"The following reagents are expired:\n\n{output}" ) report.add_result(result) return report def export_csv(self, filename: Path | str): try: worksheet = self.csv except AttributeError: logger.error("No csv found.") return if isinstance(filename, str): filename = Path(filename) with open(filename, 'w', newline="") as f: c = csv.writer(f) for r in worksheet.rows: c.writerow([cell.value for cell in r]) @property def sample_list(self) -> List[dict]: samples = [] for sample in self.samples: sample = sample.improved_dict() sample['row'] = sample['row'][0] sample['column'] = sample['column'][0] sample['submission_rank'] = sample['submission_rank'][0] samples.append(sample) samples = sorted(samples, key=itemgetter("submission_rank")) return samples class PydContact(BaseModel): name: str phone: str | None email: str | None @field_validator("phone") @classmethod def enforce_phone_number(cls, value): area_regex = re.compile(r"^\(?(\d{3})\)?(-| )?") if len(value) > 8: match = area_regex.match(value) # logger.debug(f"Match: {match.group(1)}") value = area_regex.sub(f"({match.group(1).strip()}) ", value) # logger.debug(f"Output phone: {value}") return value @report_result def to_sql(self) -> Tuple[Contact, Report]: """ Converts this instance into a backend.db.models.organization. Contact instance. Does not query for existing contact. Returns: Contact: Contact instance """ report = Report() instance = Contact.query(name=self.name, phone=self.phone, email=self.email) if not instance or isinstance(instance, list): instance = Contact() try: all_fields = self.model_fields + self.model_extra except TypeError: all_fields = self.model_fields for field in all_fields: value = getattr(self, field) match field: case "organization": value = [Organization.query(name=value)] case _: pass try: instance.__setattr__(field, value) except AttributeError as e: logger.error(f"Could not set {instance} {field} to {value} due to {e}") return instance, report class PydOrganization(BaseModel): name: str cost_centre: str contact: List[PydContact] | None @field_validator("contact", mode="before") @classmethod def string_to_list(cls, value): if isinstance(value, str): value = Contact.query(name=value) try: value = [value.to_pydantic()] except AttributeError: return None return value @report_result def to_sql(self) -> ClientLab: """ Converts this instance into a backend.db.models.organization.Organization instance. Returns: Organization: Organization instance """ report = Report() instance = ClientLab() for field in self.model_fields: match field: case "contact": value = getattr(self, field) if value: value = [item.to_sql() for item in value if item] case _: value = getattr(self, field) logger.debug(f"Setting {field} to {value}") if value: setattr(instance, field, value) return instance, report class PydReagentRole(BaseModel): name: str eol_ext: timedelta | int | None uses: dict | None required: int | None = Field(default=1) @field_validator("eol_ext") @classmethod def int_to_timedelta(cls, value): if isinstance(value, int): return timedelta(days=value) return value @report_result def to_sql(self, kit: KitType) -> ReagentRole: """ Converts this instance into a backend.db.models.ReagentType instance Args: kit (KitType): KitType joined to the reagentrole Returns: ReagentRole: ReagentType instance """ report = Report() instance: ReagentRole = ReagentRole.query(name=self.name) if instance is None: instance = ReagentRole(name=self.name, eol_ext=self.eol_ext) try: assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit) except StatementError: assoc = None if assoc is None: assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses, required=self.required) return instance, report class PydKitType(BaseModel): name: str reagent_roles: List[PydReagent] = [] @report_result def to_sql(self) -> Tuple[KitType, Report]: """ Converts this instance into a backend.db.models.kits.KitType instance Returns: Tuple[KitType, Report]: KitType instance and report of results. """ report = Report() instance = KitType.query(name=self.name) if instance is None: instance = KitType(name=self.name) for role in self.reagent_roles: role.to_sql(instance) return instance, report class PydEquipmentRole(BaseModel): name: str equipment: List[PydEquipment] process: List[str] | None @field_validator("process", mode="before") @classmethod def expand_processes(cls, value): if isinstance(value, GeneratorType): value = [item for item in value] return value def to_form(self, parent, used: list) -> "RoleComboBox": """ Creates a widget for user input into this class. Args: parent (_type_): parent widget used (list): list of equipment already added to procedure Returns: RoleComboBox: widget """ from frontend.widgets.equipment_usage import RoleComboBox return RoleComboBox(parent=parent, role=self, used=used) class PydProcess(PydBaseClass, extra="allow"): name: str version: str = Field(default="1") proceduretype: List[str] equipment: List[str] equipmentrole: List[str] # kittype: List[str] tiprole: List[str] @field_validator("proceduretype", "equipment", "equipmentrole", "tiprole", mode="before") @classmethod def enforce_list(cls, value): if not isinstance(value, list): value = [value] output = [] for v in value: if issubclass(v.__class__, BaseClass): output.append(v.name) else: output.append(v) return output @report_result def to_sql(self): report = Report() instance = Process.query(name=self.name) if not instance: instance = Process() fields = [item for item in self.model_fields] for field in fields: logger.debug(f"Field: {field}") try: field_type = getattr(instance.__class__, field).property except AttributeError: logger.error(f"No attribute: {field} in {instance.__class__}") continue match field_type: case _RelationshipDeclared(): logger.debug(f"{field} is a relationship with {field_type.entity.class_}") query_str = getattr(self, field) if isinstance(query_str, list): query_str = query_str[0] if query_str in ["", " ", None]: continue logger.debug(f"Querying {field_type.entity.class_} with name {query_str}") field_value = field_type.entity.class_.query(name=query_str) logger.debug(f"{field} query result: {field_value}") case ColumnProperty(): logger.debug(f"{field} is a property.") field_value = getattr(self, field) instance.set_attribute(key=field, value=field_value) return instance, report class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True): """Allows for creation of arbitrary pydantic models""" instance: BaseClass @report_result def to_sql(self): # print(self.instance) fields = [item for item in self.model_extra] for field in fields: try: field_type = getattr(self.instance.__class__, field).property except AttributeError: logger.error(f"No attribute: {field} in {self.instance.__class__}") continue match field_type: case _RelationshipDeclared(): # logger.debug(f"{field} is a relationship with {field_type.entity.class_}") field_value = field_type.entity.class_.argument.query(name=getattr(self, field)) # logger.debug(f"{field} query result: {field_value}") case ColumnProperty(): # logger.debug(f"{field} is a property.") field_value = getattr(self, field) self.instance.__setattr__(field, field_value) return self.instance # NOTE: Generified objects below: class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): proceduretype: ProcedureType | None = Field(default=None) run: Run | str | None = Field(default=None) name: dict = Field(default=dict(value="NA", missing=True), validate_default=True) technician: dict = Field(default=dict(value="NA", missing=True)) repeat: bool = Field(default=False) repeat_of: str | None = Field(default=None) # kittype: dict = Field(default=dict(value="NA", missing=True)) # possible_kits: list | None = Field(default=[], validate_default=True) plate_map: str | None = Field(default=None) reagent: list | None = Field(default=[]) reagentrole: dict | None = Field(default={}, validate_default=True) sample: List[PydSample] = Field(default=[]) equipment: List[PydEquipment] = Field(default=[]) result: List[PydResults] | List[dict] = Field(default=[]) @field_validator("name", "technician", mode="before")#"kittype", mode="before") @classmethod def convert_to_dict(cls, value): if not value: value = "NA" if isinstance(value, str): value = dict(value=value, missing=False) return value @field_validator("proceduretype", mode="before") @classmethod def lookup_proceduretype(cls, value): match value: case dict(): value = ProcedureType.query(name=value['name']) case str(): value = ProcedureType.query(name=value) case _: pass return value @field_validator("name") @classmethod def rescue_name(cls, value, values): if value['value'] == cls.model_fields['name'].default['value']: if values.data['proceduretype']: procedure_type = values.data['proceduretype'].name else: procedure_type = None if values.data['run']: run = values.data['run'].rsl_plate_number else: run = None value['value'] = f"{run}-{procedure_type}" value['missing'] = True return value # @field_validator("possible_kits") # @classmethod # def rescue_possible_kits(cls, value, values): # if not value: # try: # if values.data['proceduretype']: # value = [kittype.__dict__['name'] for kittype in values.data['proceduretype'].kittype] # except KeyError: # pass # return value @field_validator("name", "technician")#, "kittype") @classmethod def set_colour(cls, value): try: if value["missing"]: value["colour"] = "FE441D" else: value["colour"] = "6ffe1d" except KeyError: pass return value @field_validator("reagentrole") @classmethod def rescue_reagentrole(cls, value, values): # if not value: # match values.data['kittype']: # case dict(): # if "value" in values.data['kittype'].keys(): # roi = values.data['kittype']['value'] # elif "name" in values.data['kittype'].keys(): # roi = values.data['kittype']['name'] # else: # raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}") # case str(): # roi = values.data['kittype'] # if roi != cls.model_fields['kittype'].default['value']: # kittype = KitType.query(name=roi) # value = {item.name: item.reagent for item in kittype.reagentrole} if not value: value = {} for reagentrole in values.data['proceduretype'].reagentrole: reagents = [reagent.lot_dicts for reagent in reagentrole.reagent] value[reagentrole.name] = flatten_list(reagents) # value = {item.name: item.reagent for item in values.data['proceduretype'].reagentrole} return value @field_validator("run") @classmethod def lookup_run(cls, value): if isinstance(value, str): value = Run.query(name=value) return value @field_validator("repeat_of") @classmethod def drop_empty_string(cls, value): if value == "": value = None return value @property def rows_columns_count(self) -> tuple[int, int]: try: proc: ProcedureType = Procedure.query(name=self.name).proceduretype except AttributeError as e: logger.error(f"Can't get rows, columns due to {e}") return 0, 0 return proc.plate_rows, proc.plate_columns @property def max_sample_rank(self) -> int: rows, columns = self.rows_columns_count output = rows * columns if output > 0: return output else: try: return max([item.procedure_rank for item in self.sample]) except TypeError: return len(self.sample) def update_kittype_reagentroles(self, kittype: str | KitType): if kittype == self.__class__.model_fields['kittype'].default['value']: return if isinstance(kittype, str): kittype_obj = KitType.query(name=kittype) try: self.reagentrole = { item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")] for item in kittype_obj.get_reagents(proceduretype=self.proceduretype)} except AttributeError: self.reagentrole = {} reordered_options = {} if self.reagentrole: for k, v in self.reagentrole.items(): reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v) self.reagentrole = reordered_options self.kittype['value'] = kittype self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) def reorder_reagents(self, reagentrole: str, options: list): reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None) if not reagent_used: return options roi = next((item for item in options if item.lot == reagent_used.lot and item.name == reagent_used.name), None) if not roi: return options options.insert(0, options.pop(options.index(roi))) return options def update_kittype_equipmentroles(self, kittype: str | KitType): if kittype == self.__class__.model_fields['kittype'].default['value']: return if isinstance(kittype, str): kittype_obj = KitType.query(name=kittype) try: self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in kittype_obj.get_reagents(proceduretype=self.proceduretype)} except AttributeError: self.reagentrole = {} self.kittype['value'] = kittype self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype))) def update_samples(self, sample_list: List[dict]): # logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}") for iii, sample_dict in enumerate(sample_list, start=1): if sample_dict['sample_id'].startswith("blank_"): sample_dict['sample_id'] = "" row, column = self.proceduretype.ranked_plate[sample_dict['index']] # logger.debug(f"Row: {row}, Column: {column}") try: sample = next( (item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper())) except StopIteration: # NOTE: Code to check for added controls. logger.debug( f"Sample not found by name: {sample_dict['sample_id']}, checking row {row} column {column}") try: sample = next( (item for item in self.sample if item.row == row and item.column == column)) except StopIteration: logger.error(f"Couldn't find sample: {pformat(sample_dict)}") continue sample.sample_id = sample_dict['sample_id'] sample.well_id = sample_dict['sample_id'] sample.row = row sample.column = column sample.procedure_rank = sample_dict['index'] # logger.debug(f"Sample of interest: {sample.improved_dict()}") # logger.debug(f"Updated samples:\n{pformat(self.sample)}") def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str): try: removable = next((item for item in self.reagent if item.reagentrole == reagentrole), None) except AttributeError as e: logger.error(self.reagent) raise e if removable: idx = self.reagent.index(removable) self.reagent.remove(removable) else: idx = 0 insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) self.reagent.insert(idx, insertable) logger.debug(self.reagent) @classmethod def update_new_reagents(cls, reagent: PydReagent): reg = reagent.to_sql() reg.save() def to_sql(self, new: bool = False): from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation if new: sql = Procedure() else: sql = super().to_sql() logger.debug(f"Initial PYD: {pformat(self.__dict__)}") # sql.results = [result.to_sql() for result in self.results] if isinstance(self.name, dict): sql.name = self.name['value'] else: sql.name = self.name if isinstance(self.technician, dict): sql.technician = self.technician['value'] else: sql.technician = self.technician sql.repeat = self.repeat if sql.repeat: regex = re.compile(r".*\dR\d$") repeats = [item for item in self.run.procedure if self.repeat_of in item.name and bool(regex.match(item.name))] sql.name = f"{self.repeat_of}R{str(len(repeats) + 1)}" sql.repeat_of = self.repeat_of sql.started_date = datetime.now() if self.run: sql.run = self.run if self.proceduretype: sql.proceduretype = self.proceduretype # Note: convert any new reagents to sql and save # for reagentrole, reagents in self.reagentrole.items(): for reagent in self.reagent: if not reagent.lot or reagent.name == "--New--": continue # self.update_new_reagents(reagent) # NOTE: reset reagent associations. # sql.procedurereagentassociation = [] for reagent in self.reagent: if isinstance(reagent, dict): reagent = PydReagent(**reagent) logger.debug(reagent) reagentrole = reagent.reagentrole reagent = reagent.to_sql() # logger.debug(reagentrole) if reagent not in sql.reagentlot: # NOTE: Remove any previous association for this role. if sql.id: removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole) else: removable = [] logger.debug(f"Removable: {removable}") if removable: if isinstance(removable, list): for r in removable: r.delete() else: removable.delete() # logger.debug(f"Adding {reagent} to {sql}") reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole) try: start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1 except ValueError: start_index = 1 relevant_samples = [sample for sample in self.sample if not sample.sample_id.startswith("blank_") and not sample.sample_id == ""] # logger.debug(f"start index: {start_index}") assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1) # logger.debug(f"Association id range: {assoc_id_range}") for iii, sample in enumerate(relevant_samples): sample_sql = sample.to_sql() if sql.run: if sample_sql not in sql.run.sample: logger.debug(f"sample {sample_sql} not found in {sql.run.sample}") run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, column=sample.column) # else: # logger.debug(f"sample {sample_sql} found in {sql.run.sample}") if sample_sql not in sql.sample: proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, row=sample.row, column=sample.column, procedure_rank=sample.procedure_rank) sys.exit(pformat(self.equipment)) for equipment in self.equipment: equip = Equipment.query(name=equipment.name) if equip not in sql.equipment: equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql, equipmentrole=equip.equipmentrole[0]) process = equipment.process.to_sql() equip_assoc.process = process return sql, None class PydClientSubmission(PydBaseClass): # sql_object: ClassVar = ClientSubmission key_value_order = ["submitter_plate_id", "submitted_date", "client_lab", "contact", "contact_email", "cost_centre", "submission_type", "sample_count", "submission_category"] filepath: Path | None = Field(default=None) submissiontype: dict | None submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True) clientlab: dict | None sample_count: dict | None full_batch_size: int | dict = Field(default=0) submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True) cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True) sample: List[PydSample] | None = Field(default=[]) # @field_validator("submissiontype", mode="before") # @classmethod # def enforce_submissiontype(cls, value): # if isinstance(value, str): # value = dict(value=value, missing=False) # return value @field_validator("submissiontype", "clientlab", "contact", mode="before") @classmethod def enforce_value(cls, value): if isinstance(value, str): value = dict(value=value, missing=False) return value @field_validator("submitted_date", mode="before") @classmethod def enforce_submitted_date(cls, value): match value: case str(): value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S"), missing=False) case date() | datetime(): value = dict(value=value, missing=False) case _: pass return value @field_validator("submitter_plate_id", mode="before") @classmethod def enforce_submitter_plate_id(cls, value): if isinstance(value, str): value = dict(value=value, missing=False) return value @field_validator("submission_category", mode="before") @classmethod def enforce_submission_category_id(cls, value): if isinstance(value, str): value = dict(value=value, missing=False) return value @field_validator("sample_count", mode="before") @classmethod def enforce_sample_count(cls, value): if isinstance(value, str) or isinstance(value, int): value = dict(value=value, missing=False) return value @field_validator("sample_count") @classmethod def enforce_integer(cls, value): try: value['value'] = int(value['value']) except ValueError: raise f"sample count value must be an integer" return value @field_validator("submitter_plate_id") @classmethod def create_submitter_plate_num(cls, value, values): if value['value'] in [None, "None"]: val = f"{values.data['submissiontype']['value']}-{values.data['submission_category']['value']}-{values.data['submitted_date']['value']}" return dict(value=val, missing=True) else: value['value'] = value['value'].strip() return value @field_validator("submitted_date") @classmethod def rescue_date(cls, value): if not value: value = dict(value=None) try: check = value['value'] is None except TypeError: check = True if check: value.update(dict(value=date.today(), missing=True)) else: match value['value']: case str(): value['value'] = datetime.strptime(value['value'], "%Y-%m-%d") value['value'] = datetime.combine(value['value'], datetime.now().time()) case _: pass return value @field_validator("submission_category") @classmethod def enforce_typing(cls, value, values): if not value['value'] in ["Research", "Diagnostic", "Surveillance", "Validation"]: try: value['value'] = values.data['submissiontype']['value'] except (AttributeError, KeyError): value['value'] = "NA" return value @field_validator("comment", mode="before") @classmethod def convert_comment_string(cls, value): if isinstance(value, str): value = dict(value=value, missing=True) return value @field_validator("full_batch_size") @classmethod def dict_to_int(cls, value): if isinstance(value, dict): value = value['value'] value = int(value) return value def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None): """ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget Args: samples (): disable (list, optional): a list of widgets to be disabled in the form. Defaults to None. parent (QWidget): parent widget of the constructed object Returns: SubmissionFormWidget: Submission form widget """ from frontend.widgets.submission_widget import ClientSubmissionFormWidget if not samples: # samples = [sample for sample in self.sample if sample.sample_id.lower() not in ["", "blank"]] samples = self.sample return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable) def to_sql(self): sql = super().to_sql() assert not any([isinstance(item, PydSample) for item in sql.sample]) sql.sample = [] # if "info_placement" not in sql._misc_info: # sql._misc_info['info_placement'] = [] # info_placement = [] logger.debug(f"PYD Submission type: {self.submissiontype}") logger.debug(f"SQL Submission Type: {sql.submissiontype}") if not sql.submissiontype: sql.submissiontype = SubmissionType.query(name=self.submissiontype['value']) match sql.submissiontype: case SubmissionType(): pass case _: sql.submissiontype = SubmissionType.query(name="Default") for k in list(self.model_fields.keys()) + list(self.model_extra.keys()): logger.debug(f"Running {k}") attribute = getattr(self, k) match k: case "filepath": sql._misc_info[k] = attribute.__str__() continue case _: pass # logger.debug(f"Setting {k} to {attribute}") # if isinstance(attribute, dict): # if "location" in attribute: # info_placement.append(dict(name=k, location=attribute['location'])) # else: # info_placement.append(dict(name=k, location=None)) # # max_row = max([value['location']['row'] for value in info_placement if value]) # sql._misc_info['info_placement'] = info_placement return sql @property def max_sample_rank(self) -> int: output = self.full_batch_size if output > 0: return output else: return max([item.submission_rank for item in self.sample]) # def pad_samples_to_length(self, row_count, column_names): # output_samples = [] # for iii in range(1, row_count + 1): # try: # sample = next((item for item in self.samples if item.submission_rank == iii)) # except StopIteration: # sample = PydSample(sample_id="") # for column in column_names: # setattr(sample, column[0], "") # sample.submission_rank = iii # output_samples.append(sample) # return sorted(output_samples, key=lambda x: x.submission_rank) def improved_dict(self, dictionaries: bool = True) -> dict: output = super().improved_dict(dictionaries=dictionaries) output['sample'] = self.sample output['client_lab'] = output['clientlab'] try: output['contact_email'] = output['contact']['email'] except TypeError: pass return sort_dict_by_list(output, self.key_value_order) # @property # def writable_dict(self): # output = self.improved_dict() @property def filename_template(self): submissiontype = SubmissionType.query(name=self.submissiontype['value']) return submissiontype.defaults['filename_template'] class PydResults(PydBaseClass, arbitrary_types_allowed=True): result: dict = Field(default={}) result_type: str = Field(default="NA") img: None | bytes = Field(default=None) parent: Procedure | ProcedureSampleAssociation | None = Field(default=None) date_analyzed: datetime | None = Field(default=None) @field_validator("date_analyzed") @classmethod def set_today(cls, value): match value: case str(): value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") case datetime(): pass case date(): value = datetime.combine(value, datetime.max.time()) case _: value = datetime.now() return value def to_sql(self): sql, _ = Results.query_or_create(result_type=self.result_type, result=self.results) try: check = sql.image except FileNotFoundError: check = False if not check: sql.image = self.img if not sql.date_analyzed: sql.date_analyzed = self.date_analyzed match self.parent: case ProcedureSampleAssociation(): sql.sampleprocedureassociation = self.parent case Procedure(): sql.procedure = self.parent case _: logger.error("Improper association found.") return sql