diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8a947b..4366326 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+# 202509.02
+
+- First Useable updated version.
+
# 202504.04
- Added html links for equipment/processes/tips.
diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py
index 200b779..fbf5d70 100644
--- a/src/submissions/backend/db/models/__init__.py
+++ b/src/submissions/backend/db/models/__init__.py
@@ -5,7 +5,6 @@ from __future__ import annotations
import sys, logging, json, inspect
from datetime import datetime, date
from pprint import pformat
-
from dateutil.parser import parse
from jinja2 import TemplateNotFound, Template
from pandas import DataFrame
@@ -19,7 +18,7 @@ from sqlalchemy.exc import ArgumentError
from typing import Any, List, ClassVar
from pathlib import Path
from sqlalchemy.orm.relationships import _RelationshipDeclared
-from tools import report_result, list_sort_dict, jinja_template_loading, Report, Result
+from tools import report_result, list_sort_dict, jinja_template_loading, Report, Result, ctx
# NOTE: Load testing environment
if 'pytest' in sys.modules:
@@ -92,10 +91,6 @@ class BaseClass(Base):
Returns:
Session: DB session from ctx settings.
"""
- if 'pytest' not in sys.modules:
- from tools import ctx
- else:
- from test_settings import ctx
return ctx.database_session
@classmethod
@@ -107,10 +102,6 @@ class BaseClass(Base):
Returns:
Path: Location of the Submissions directory in Settings object
"""
- if 'pytest' not in sys.modules:
- from tools import ctx
- else:
- from test_settings import ctx
return ctx.directory_path
@classmethod
@@ -122,10 +113,6 @@ class BaseClass(Base):
Returns:
Path: Location of the Submissions backup directory in Settings object
"""
- if 'pytest' not in sys.modules:
- from tools import ctx
- else:
- from test_settings import ctx
return ctx.backup_path
def __init__(self, *args, **kwargs):
@@ -284,7 +271,7 @@ class BaseClass(Base):
if issubclass(v.__class__, PydBaseClass):
setattr(instance, k, v.to_sql())
instance._misc_info.update(outside_kwargs)
- logger.info(f"Instance from query or create: {instance}, new: {new}")
+ # logger.info(f"Instance from query or create: {instance}, new: {new}")
return instance, new
@classmethod
diff --git a/src/submissions/backend/db/models/procedures.py b/src/submissions/backend/db/models/procedures.py
index 268f87d..43fbf91 100644
--- a/src/submissions/backend/db/models/procedures.py
+++ b/src/submissions/backend/db/models/procedures.py
@@ -87,7 +87,7 @@ class ReagentRole(BaseClass):
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
- logger.info(f"Instance from query or create: {instance}")
+ # logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod
@@ -785,7 +785,7 @@ class ProcedureType(BaseClass):
)
return PydProcedure(**output)
- def construct_plate_map(self, sample_dicts: List["PydSample"]) -> str:
+ def construct_plate_map(self, sample_dicts: List["PydSample"], creation:bool=True, vw_modifier:float=1.0) -> str:
"""
Constructs an html based plate map for procedure details.
@@ -800,13 +800,13 @@ class ProcedureType(BaseClass):
if self.plate_rows == 0 or self.plate_columns == 0:
return "
"
sample_dicts = self.pad_sample_dicts(sample_dicts=sample_dicts)
- vw = round((-0.07 * len(sample_dicts)) + 12.2, 1)
+ vw = round((-0.07 * len(sample_dicts)) + (12.2 * vw_modifier), 1)
# NOTE: An overly complicated list comprehension create a list of sample locations
# NOTE: next will return a blank cell if no value found for row/column
env = jinja_template_loading()
template = env.get_template("support/plate_map.html")
html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=sample_dicts,
- vw=vw)
+ vw=vw, creation=creation)
return html + "
"
def pad_sample_dicts(self, sample_dicts: List["PydSample"]):
@@ -985,6 +985,7 @@ class Procedure(BaseClass):
output['sample_count'] = len(active_samples)
output['clientlab'] = self.run.clientsubmission.clientlab.name
output['cost'] = 0.00
+ output['platemap'] = self.make_procedure_platemap()
return output
def to_pydantic(self, **kwargs):
@@ -1038,6 +1039,12 @@ class Procedure(BaseClass):
output = {k: v for k, v in dicto.items()}
return output
+ def make_procedure_platemap(self):
+ dicto = [sample.to_pydantic() for sample in self.proceduresampleassociation]
+ html = self.proceduretype.construct_plate_map(sample_dicts=dicto, creation=False, vw_modifier=1.15)
+ return html
+
+
class ProcedureTypeReagentRoleAssociation(BaseClass):
"""
@@ -1143,7 +1150,7 @@ class ProcedureTypeReagentRoleAssociation(BaseClass):
case _:
pass
setattr(instance, k, v)
- logger.info(f"Instance from query or create: {instance.__dict__}\nis new: {new}")
+ # logger.info(f"Instance from query or create: {instance.__dict__}\nis new: {new}")
return instance, new
@classmethod
@@ -1423,7 +1430,7 @@ class EquipmentRole(BaseClass):
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
- logger.info(f"Instance from query or create: {instance}")
+ # logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod
@@ -1816,6 +1823,9 @@ class Process(BaseClass):
class ProcessVersion(BaseClass):
+
+ pyd_model_name = "Process"
+
id = Column(INTEGER, primary_key=True) #: Process id, primary key
version = Column(FLOAT(2), default=1.00) #: Version number
date_verified = Column(TIMESTAMP) #: Date this version was deemed worthy
@@ -1867,7 +1877,7 @@ class ProcessVersion(BaseClass):
version: str | float | None = None,
name: str | None = None,
limit: int = 0,
- **kwargs) -> ReagentLot | List[ReagentLot]:
+ **kwargs) -> ProcessVersion | List[ProcessVersion]:
query: Query = cls.__database_session__.query(cls)
match name:
case str():
@@ -1881,6 +1891,9 @@ class ProcessVersion(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
+ # def to_pydantic(self, pyd_model_name: str | None = None, **kwargs):
+ # output = super().to_pydantic(pyd_model_name=pyd_model_name, **kwargs)
+
class Tips(BaseClass):
"""
diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py
index cddd3f0..fca3713 100644
--- a/src/submissions/backend/db/models/submissions.py
+++ b/src/submissions/backend/db/models/submissions.py
@@ -879,7 +879,7 @@ class Run(BaseClass, LogMixin):
Returns:
PydSubmission: converted object.
"""
- from backend.validators import PydRun
+ from backend.validators import PydClientSubmission, PydRun
dicto = self.details_dict(full_data=True, backup=backup)
new_dict = {}
for key, value in dicto.items():
@@ -1916,6 +1916,8 @@ class ProcedureSampleAssociation(BaseClass):
misc = output['misc_info']
output.update(relevant)
output['misc_info'] = misc
+ output['row'] = self.row
+ output['column'] = self.column
output['results'] = [result.details_dict() for result in output['results']]
return output
diff --git a/src/submissions/backend/excel/__init__.py b/src/submissions/backend/excel/__init__.py
index 3bc33bb..6efd46e 100644
--- a/src/submissions/backend/excel/__init__.py
+++ b/src/submissions/backend/excel/__init__.py
@@ -2,7 +2,6 @@
Contains pandas and openpyxl convenience functions for interacting with excel workbooks
"""
-# from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser
from .parsers import (
DefaultParser, DefaultKEYVALUEParser, DefaultTABLEParser,
ProcedureInfoParser, ProcedureSampleParser, ProcedureReagentParser, ProcedureEquipmentParser,
diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py
index 730db97..72ca84c 100644
--- a/src/submissions/backend/excel/reports.py
+++ b/src/submissions/backend/excel/reports.py
@@ -8,7 +8,7 @@ from pathlib import Path
from datetime import date
from typing import Tuple, List
from backend.db.models import Procedure, Run
-from tools import jinja_template_loading, get_first_blank_df_row, row_map, flatten_list
+from tools import jinja_template_loading, get_first_blank_df_row, row_map, flatten_list, ctx
from PyQt6.QtWidgets import QWidget
from openpyxl.worksheet.worksheet import Worksheet
@@ -173,10 +173,6 @@ class TurnaroundMaker(ReportArchetype):
Returns:
"""
- if 'pytest' not in sys.modules:
- from tools import ctx
- else:
- from test_settings import ctx
days = sub.turnaround_time
try:
tat = sub.get_default_info("turnaround_time")
diff --git a/src/submissions/backend/excel/writers/__init__.py b/src/submissions/backend/excel/writers/__init__.py
index 17fb703..6571c66 100644
--- a/src/submissions/backend/excel/writers/__init__.py
+++ b/src/submissions/backend/excel/writers/__init__.py
@@ -36,7 +36,6 @@ class DefaultWriter(object):
value = value['name']
except (KeyError, ValueError):
return
- # logger.debug(f"Value type: {type(value)}")
match value:
case x if issubclass(value.__class__, BaseClass):
value = value.name
@@ -81,7 +80,6 @@ class DefaultWriter(object):
self.worksheet = self.prewrite(self.worksheet, start_row=start_row)
self.start_row = self.delineate_start_row(start_row=start_row)
self.end_row = self.delineate_end_row(start_row=start_row)
- logger.debug(f"Rows for {self.__class__.__name__}:\tstart: {self.start_row}, end: {self.end_row}")
return workbook
def delineate_start_row(self, start_row: int = 1) -> int:
@@ -93,7 +91,6 @@ class DefaultWriter(object):
Returns:
int
"""
- logger.debug(f"{self.__class__.__name__} will start looking for blank rows at {start_row}")
for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row):
if all([item.value is None for item in row]):
return iii
@@ -146,7 +143,6 @@ class DefaultKEYVALUEWriter(DefaultWriter):
dictionary = sort_dict_by_list(dictionary=dictionary, order_list=self.key_order)
for ii, (k, v) in enumerate(dictionary.items(), start=self.start_row):
value = self.stringify_value(value=v)
- logger.debug(f"{self.__class__.__name__} attempting to write {value}")
if value is None:
continue
self.worksheet.cell(column=1, row=ii, value=self.prettify_key(k))
@@ -172,7 +168,6 @@ class DefaultTABLEWriter(DefaultWriter):
def delineate_end_row(self, start_row: int = 1) -> int:
end_row = start_row + len(self.pydant_obj) + 1
- logger.debug(f"End row has been delineated as {start_row} + {len(self.pydant_obj)} + 1 = {end_row}")
return end_row
def pad_samples_to_length(self, row_count,
@@ -220,7 +215,6 @@ class DefaultTABLEWriter(DefaultWriter):
value = object.improved_dict()[header.lower().replace(" ", "_")]
except (AttributeError, KeyError):
value = ""
- # logger.debug(f"{self.__class__.__name__} attempting to write {value}")
self.worksheet.cell(row=write_row, column=column, value=self.stringify_value(value))
self.worksheet = self.postwrite(self.worksheet)
return workbook
diff --git a/src/submissions/backend/excel/writers/procedure_writers/__init__.py b/src/submissions/backend/excel/writers/procedure_writers/__init__.py
index 62cec8c..2187c15 100644
--- a/src/submissions/backend/excel/writers/procedure_writers/__init__.py
+++ b/src/submissions/backend/excel/writers/procedure_writers/__init__.py
@@ -1,5 +1,5 @@
"""
-Default
+Default writers for procedures.
"""
from __future__ import annotations
import logging, sys
@@ -18,9 +18,7 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter):
'reagentrole', 'results', 'sample', 'tips', 'reagentlot']
def __init__(self, pydant_obj, *args, **kwargs):
-
super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
-
self.fill_dictionary = {k: v for k, v in self.fill_dictionary.items() if k not in self.__class__.exclude}
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
diff --git a/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py b/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py
index 256178f..439d5d5 100644
--- a/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py
+++ b/src/submissions/backend/excel/writers/results_writers/pcr_results_writer.py
@@ -1,5 +1,5 @@
"""
-
+Writers for PCR results from Design and Analysis Software
"""
from __future__ import annotations
import logging
diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py
index e3b906f..8dc6d93 100644
--- a/src/submissions/backend/validators/pydant.py
+++ b/src/submissions/backend/validators/pydant.py
@@ -8,15 +8,13 @@ from pydantic import BaseModel, field_validator, Field, model_validator
from datetime import date, datetime, timedelta
from dateutil.parser import parse
from dateutil.parser import ParserError
-from typing import List, Tuple, Literal
+from typing import List, Tuple, Literal, Generator
from types import GeneratorType
from . import RSLNamer
from pathlib import Path
-from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list, row_keys, \
- flatten_list
+from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list, row_keys, flatten_list
from backend.db import models
from backend.db.models import *
-from sqlalchemy.exc import StatementError
from sqlalchemy.orm.properties import ColumnProperty
from sqlalchemy.orm.relationships import _RelationshipDeclared
from sqlalchemy.orm.attributes import InstrumentedAttribute
@@ -140,6 +138,7 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
class PydReagentLot(PydBaseClass):
lot: str | None
+ name: str | None = Field(default=None)
expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True)
missing: bool = Field(default=True)
comment: str | None = Field(default="", validate_default=True)
@@ -339,7 +338,7 @@ class PydEquipment(PydBaseClass):
name: str
nickname: str | None
processes: List[PydProcess] | PydProcess | None
- processversion: PydProcess | None
+ processversion: PydProcessVersion | None = Field(default=None)
equipmentrole: str | PydEquipmentRole | None
tips: List[PydTips] | PydTips | None = Field(default=[])
@@ -472,6 +471,693 @@ class PydEquipment(PydBaseClass):
return {k: getattr(self, k) for k in fields}
+
+class PydContact(BaseModel):
+ name: str
+ phone: str | None
+ email: str | None
+
+ @field_validator("phone")
+ @classmethod
+ def enforce_phone_number(cls, value):
+ area_regex = re.compile(r"^\(?(\d{3})\)?(-| )?")
+ if len(value) > 8:
+ match = area_regex.match(value)
+ value = area_regex.sub(f"({match.group(1).strip()}) ", value)
+ return value
+
+ @report_result
+ def to_sql(self) -> Tuple[Contact, Report]:
+ """
+ Converts this instance into a backend.db.models.organization. Contact instance.
+ Does not query for existing contact.
+
+ Returns:
+ Contact: Contact instance
+ """
+ report = Report()
+ instance = Contact.query(name=self.name, phone=self.phone, email=self.email)
+ if not instance or isinstance(instance, list):
+ instance = Contact()
+ try:
+ all_fields = self.model_fields + self.model_extra
+ except TypeError:
+ all_fields = self.model_fields
+ for field in all_fields:
+ value = getattr(self, field)
+ match field:
+ case "organization":
+ value = [ClientLab.query(name=value)]
+ case _:
+ pass
+ try:
+ instance.__setattr__(field, value)
+ except AttributeError as e:
+ logger.error(f"Could not set {instance} {field} to {value} due to {e}")
+ return instance, report
+
+
+class PydClientLab(BaseModel):
+ name: str
+ cost_centre: str
+ contact: List[PydContact] | None
+
+ @field_validator("contact", mode="before")
+ @classmethod
+ def string_to_list(cls, value):
+ if isinstance(value, str):
+ value = Contact.query(name=value)
+ try:
+ value = [value.to_pydantic()]
+ except AttributeError:
+ return None
+ return value
+
+ @report_result
+ def to_sql(self) -> ClientLab:
+ """
+ Converts this instance into a backend.db.models.organization.Organization instance.
+
+ Returns:
+ Organization: Organization instance
+ """
+ report = Report()
+ instance = ClientLab()
+ for field in self.model_fields:
+ match field:
+ case "contact":
+ value = getattr(self, field)
+ if value:
+ value = [item.to_sql() for item in value if item]
+ case _:
+ value = getattr(self, field)
+ if value:
+ setattr(instance, field, value)
+ return instance, report
+
+
+class PydReagentRole(BaseModel):
+ name: str
+ eol_ext: timedelta | int | None
+ uses: dict | None
+ required: int | None = Field(default=1)
+
+ @field_validator("eol_ext")
+ @classmethod
+ def int_to_timedelta(cls, value):
+ if isinstance(value, int):
+ return timedelta(days=value)
+ return value
+
+
+class PydEquipmentRole(BaseModel):
+ name: str
+ equipment: List[PydEquipment]
+ process: List[str] | None
+
+ @field_validator("process", mode="before")
+ @classmethod
+ def expand_processes(cls, value):
+ if isinstance(value, GeneratorType):
+ value = [item for item in value]
+ return value
+
+ def to_form(self, parent, used: list) -> "RoleComboBox":
+ """
+ Creates a widget for user input into this class.
+
+ Args:
+ parent (_type_): parent widget
+ used (list): list of equipment already added to procedure
+
+ Returns:
+ RoleComboBox: widget
+ """
+ from frontend.widgets.equipment_usage import RoleComboBox
+ return RoleComboBox(parent=parent, role=self, used=used)
+
+
+class PydProcess(PydBaseClass, extra="allow"):
+ name: str
+ version: str = Field(default="1.0")
+ # equipment: List[str]
+ tips: List[PydTips]
+
+ @field_validator("tips", mode="before")
+ @classmethod
+ def enforce_list(cls, value):
+ if not isinstance(value, list):
+ value = [value]
+ output = []
+ for v in value:
+ if issubclass(v.__class__, BaseClass):
+ output.append(v.name)
+ else:
+ output.append(v)
+ return output
+
+ @field_validator("tips", mode="before")
+ @classmethod
+ def validate_tips(cls, value):
+ if not value:
+ return []
+ value = [item for item in value if item]
+ return value
+
+ @field_validator("version", mode="before")
+ @classmethod
+ def enforce_float_string(cls, value):
+ if isinstance(value, float):
+ value = str(value)
+ return value
+
+ @report_result
+ def to_sql(self):
+ report = Report()
+ name = self.name.split("-")[0]
+ # NOTE: can't use query_or_create due to name not being part of ProcessVersion
+ instance = ProcessVersion.query(name=name, version=self.version, limit=1)
+ if not instance:
+ instance = ProcessVersion()
+ return instance, report
+
+
+class PydProcessVersion(BaseModel, extra="allow", arbitrary_types_allowed=True):
+ version: float
+ name: str
+
+ def to_sql(self):
+ instance = ProcessVersion.query(name=self.name, version=self.version, limit=1)
+ if not instance:
+ instance = ProcessVersion()
+ return instance
+
+
+class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
+ """Allows for creation of arbitrary pydantic models"""
+ instance: BaseClass
+
+ @report_result
+ def to_sql(self):
+ fields = [item for item in self.model_extra]
+ for field in fields:
+ try:
+ field_type = getattr(self.instance.__class__, field).property
+ except AttributeError:
+ logger.error(f"No attribute: {field} in {self.instance.__class__}")
+ continue
+ match field_type:
+ case _RelationshipDeclared():
+ field_value = field_type.entity.class_.argument.query(name=getattr(self, field))
+ case ColumnProperty():
+ field_value = getattr(self, field)
+ self.instance.__setattr__(field, field_value)
+ return self.instance
+
+
+# NOTE: Generified objects below:
+
+class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
+ proceduretype: ProcedureType | None = Field(default=None)
+ run: Run | str | None = Field(default=None)
+ name: dict = Field(default=dict(value="NA", missing=True), validate_default=True)
+ technician: dict = Field(default=dict(value="NA", missing=True))
+ repeat: bool = Field(default=False)
+ repeat_of: Procedure | None = Field(default=None)
+ plate_map: str | None = Field(default=None)
+ reagent: list | None = Field(default=[])
+ reagentrole: dict | None = Field(default={}, validate_default=True)
+ sample: List[PydSample] = Field(default=[])
+ equipment: List[PydEquipment] = Field(default=[])
+ result: List[PydResults] | List[dict] = Field(default=[])
+
+ @field_validator("name", "technician", mode="before")#"kittype", mode="before")
+ @classmethod
+ def convert_to_dict(cls, value):
+ if not value:
+ value = "NA"
+ if isinstance(value, str):
+ value = dict(value=value, missing=False)
+ return value
+
+ @field_validator("proceduretype", mode="before")
+ @classmethod
+ def lookup_proceduretype(cls, value):
+ match value:
+ case dict():
+ value = ProcedureType.query(name=value['name'])
+ case str():
+ value = ProcedureType.query(name=value)
+ case _:
+ pass
+ return value
+
+ @field_validator("name")
+ @classmethod
+ def rescue_name(cls, value, values):
+ if value['value'] == cls.model_fields['name'].default['value']:
+ if values.data['proceduretype']:
+ procedure_type = values.data['proceduretype'].name
+ else:
+ procedure_type = None
+ if values.data['run']:
+ run = values.data['run'].rsl_plate_number
+ else:
+ run = None
+ value['value'] = f"{run}-{procedure_type}"
+ value['missing'] = True
+ return value
+
+ @field_validator("name", "technician")#, "kittype")
+ @classmethod
+ def set_colour(cls, value):
+ try:
+ if value["missing"]:
+ value["colour"] = "FE441D"
+ else:
+ value["colour"] = "6ffe1d"
+ except KeyError:
+ pass
+ return value
+
+ @field_validator("reagentrole")
+ @classmethod
+ def rescue_reagentrole(cls, value, values):
+ if not value:
+ value = {}
+ for reagentrole in values.data['proceduretype'].reagentrole:
+ reagents = [reagent.lot_dicts for reagent in reagentrole.reagent]
+ value[reagentrole.name] = flatten_list(reagents)
+ return value
+
+ @field_validator("run")
+ @classmethod
+ def lookup_run(cls, value):
+ if isinstance(value, str):
+ value = Run.query(name=value)
+ return value
+
+ @field_validator("repeat_of")
+ @classmethod
+ def drop_empty_string(cls, value):
+ if value == "":
+ value = None
+ return value
+
+ @property
+ def rows_columns_count(self) -> tuple[int, int]:
+ try:
+ proc: ProcedureType = Procedure.query(name=self.name).proceduretype
+ except AttributeError as e:
+ logger.error(f"Can't get rows, columns due to {e}")
+ return 0, 0
+ return proc.plate_rows, proc.plate_columns
+
+ @property
+ def max_sample_rank(self) -> int:
+ rows, columns = self.rows_columns_count
+ output = rows * columns
+ if output > 0:
+ return output
+ else:
+ try:
+ return max([item.procedure_rank for item in self.sample])
+ except TypeError:
+ return len(self.sample)
+
+ # def update_kittype_reagentroles(self, kittype: str | KitType):
+ # if kittype == self.__class__.model_fields['kittype'].default['value']:
+ # return
+ # if isinstance(kittype, str):
+ # kittype_obj = KitType.query(name=kittype)
+ # try:
+ # self.reagentrole = {
+ # item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")]
+ # for item in
+ # kittype_obj.get_reagents(proceduretype=self.proceduretype)}
+ # except AttributeError:
+ # self.reagentrole = {}
+ # reordered_options = {}
+ # if self.reagentrole:
+ # for k, v in self.reagentrole.items():
+ # reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v)
+ # self.reagentrole = reordered_options
+ # self.kittype['value'] = kittype
+ # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
+
+ def reorder_reagents(self, reagentrole: str, options: list):
+ reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None)
+ if not reagent_used:
+ return options
+ roi = next((item for item in options if item.lot == reagent_used.lot and item.name == reagent_used.name), None)
+ if not roi:
+ return options
+ options.insert(0, options.pop(options.index(roi)))
+ return options
+
+ # def update_kittype_equipmentroles(self, kittype: str | KitType):
+ # if kittype == self.__class__.model_fields['kittype'].default['value']:
+ # return
+ # if isinstance(kittype, str):
+ # kittype_obj = KitType.query(name=kittype)
+ # try:
+ # self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in
+ # kittype_obj.get_reagents(proceduretype=self.proceduretype)}
+ # except AttributeError:
+ # self.reagentrole = {}
+ # self.kittype['value'] = kittype
+ # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
+
+ def update_samples(self, sample_list: List[dict]):
+ for iii, sample_dict in enumerate(sample_list, start=1):
+ if sample_dict['sample_id'].startswith("blank_"):
+ sample_dict['sample_id'] = ""
+ row, column = self.proceduretype.ranked_plate[sample_dict['index']]
+ try:
+ sample = next(
+ (item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper()))
+ except StopIteration:
+ # NOTE: Code to check for added controls.
+ logger.debug(
+ f"Sample not found by name: {sample_dict['sample_id']}, checking row {row} column {column}")
+ try:
+ sample = next(
+ (item for item in self.sample if item.row == row and item.column == column))
+ except StopIteration:
+ logger.error(f"Couldn't find sample: {pformat(sample_dict)}")
+ continue
+ sample.sample_id = sample_dict['sample_id']
+ sample.well_id = sample_dict['sample_id']
+ sample.row = row
+ sample.column = column
+ sample.procedure_rank = sample_dict['index']
+
+ def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str):
+ try:
+ removable = next((item for item in self.reagent if item.reagentrole == reagentrole), None)
+ except AttributeError as e:
+ logger.error(self.reagent)
+ raise e
+ if removable:
+ idx = self.reagent.index(removable)
+ self.reagent.remove(removable)
+ else:
+ idx = 0
+ insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
+ self.reagent.insert(idx, insertable)
+
+ @classmethod
+ def update_new_reagents(cls, reagent: PydReagent):
+ reg = reagent.to_sql()
+ reg.save()
+
+ def to_sql(self, new: bool = False):
+ from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation
+ logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}")
+ if new:
+ sql = Procedure()
+ else:
+ sql = super().to_sql()
+ if isinstance(self.name, dict):
+ sql.name = self.name['value']
+ else:
+ sql.name = self.name
+ if isinstance(self.technician, dict):
+ sql.technician = self.technician['value']
+ else:
+ sql.technician = self.technician
+ if sql.repeat:
+ regex = re.compile(r".*\dR\d$")
+ repeats = [item for item in self.run.procedure if
+ self.repeat_of.name in item.name and bool(regex.match(item.name))]
+ sql.name = f"{self.repeat_of.name}-R{str(len(repeats) + 1)}"
+ sql.repeat_of = self.repeat_of
+ sql.started_date = datetime.now()
+ if self.run:
+ sql.run = self.run
+ if self.proceduretype:
+ sql.proceduretype = self.proceduretype
+ # NOTE: reset reagent associations.
+ for reagent in self.reagent:
+ if isinstance(reagent, dict):
+ reagent = PydReagent(**reagent)
+ reagentrole = reagent.reagentrole
+ reagent = reagent.to_sql()
+ if reagent not in sql.reagentlot:
+ # NOTE: Remove any previous association for this role.
+ if sql.id:
+ removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole)
+ else:
+ removable = []
+ if removable:
+ if isinstance(removable, list):
+ for r in removable:
+ r.delete()
+ else:
+ removable.delete()
+ reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole)
+ try:
+ start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1
+ except ValueError:
+ start_index = 1
+ relevant_samples = [sample for sample in self.sample if
+ not sample.sample_id.startswith("blank_") and not sample.sample_id == ""]
+ assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1)
+ for iii, sample in enumerate(relevant_samples):
+ sample_sql = sample.to_sql()
+ if sql.run:
+ if sample_sql not in sql.run.sample:
+ run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row,
+ column=sample.column)
+ if sample_sql not in sql.sample:
+ proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
+ row=sample.row, column=sample.column,
+ procedure_rank=sample.procedure_rank)
+ for equipment in self.equipment:
+ equip, _ = equipment.to_sql()
+ logger.debug(f"Equipment:\n{pformat(equip.__dict__)}")
+ if isinstance(equipment.processes, list):
+ equipment.process = equipment.processes[0]
+ if isinstance(equipment.tips, list):
+ try:
+ equipment.tips = equipment.tips[0]
+ except IndexError:
+ equipment.tips = None
+ if equip not in sql.equipment:
+ equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql,
+ equipmentrole=equip.equipmentrole[0])
+ process = equipment.process.to_sql()
+ equip_assoc.processversion = process
+ try:
+ tipslot = equipment.tips.to_sql()
+ logger.debug(f"Tipslot: {tipslot.__dict__}")
+ except AttributeError:
+ tipslot = None
+ equip_assoc.tipslot = tipslot
+ return sql, None
+
+
+class PydClientSubmission(PydBaseClass):
+ # sql_object: ClassVar = ClientSubmission
+
+ key_value_order = ["submitter_plate_id",
+ "submitted_date",
+ "client_lab",
+ "contact",
+ "contact_email",
+ "cost_centre",
+ "submission_type",
+ "sample_count",
+ "submission_category"]
+
+ filepath: Path | None = Field(default=None)
+ submissiontype: dict | None
+ submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True)
+ clientlab: dict | None
+ sample_count: dict | None
+ full_batch_size: int | dict = Field(default=0)
+ submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
+ comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True)
+ cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
+ contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
+ submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
+ sample: List[PydSample] | None = Field(default=[])
+
+ @field_validator("submissiontype", "clientlab", "contact", mode="before")
+ @classmethod
+ def enforce_value(cls, value):
+ if isinstance(value, str):
+ value = dict(value=value, missing=False)
+ return value
+
+ @field_validator("submitted_date", mode="before")
+ @classmethod
+ def enforce_submitted_date(cls, value):
+ match value:
+ case str():
+ value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S"), missing=False)
+ case date() | datetime():
+ value = dict(value=value, missing=False)
+ case _:
+ pass
+ return value
+
+ @field_validator("submitter_plate_id", mode="before")
+ @classmethod
+ def enforce_submitter_plate_id(cls, value):
+ if isinstance(value, str):
+ value = dict(value=value, missing=False)
+ return value
+
+ @field_validator("submission_category", mode="before")
+ @classmethod
+ def enforce_submission_category_id(cls, value):
+ if isinstance(value, str):
+ value = dict(value=value, missing=False)
+ return value
+
+ @field_validator("sample_count", mode="before")
+ @classmethod
+ def enforce_sample_count(cls, value):
+ if isinstance(value, str) or isinstance(value, int):
+ value = dict(value=value, missing=False)
+ return value
+
+ @field_validator("sample_count")
+ @classmethod
+ def enforce_integer(cls, value):
+ if not value['value']:
+ value['value'] = 0
+ try:
+ value['value'] = int(value['value'])
+ except (ValueError, TypeError):
+ raise f"sample count value must be an integer"
+ return value
+
+ @field_validator("submitter_plate_id")
+ @classmethod
+ def create_submitter_plate_num(cls, value, values):
+ if value['value'] in [None, "None"]:
+ val = f"{values.data['submissiontype']['value']}-{values.data['submission_category']['value']}-{values.data['submitted_date']['value']}"
+ return dict(value=val, missing=True)
+ else:
+ value['value'] = value['value'].strip()
+ return value
+
+ @field_validator("submitted_date")
+ @classmethod
+ def rescue_date(cls, value):
+ if not value:
+ value = dict(value=None)
+ try:
+ check = value['value'] is None
+ except TypeError:
+ check = True
+ if check:
+ value.update(dict(value=date.today(), missing=True))
+ else:
+ match value['value']:
+ case str():
+ value['value'] = datetime.strptime(value['value'], "%Y-%m-%d")
+ value['value'] = datetime.combine(value['value'], datetime.now().time())
+ case _:
+ pass
+ return value
+
+ @field_validator("submission_category")
+ @classmethod
+ def enforce_typing(cls, value, values):
+ if not value['value'] in ["Research", "Diagnostic", "Surveillance", "Validation"]:
+ try:
+ value['value'] = values.data['submissiontype']['value']
+ except (AttributeError, KeyError):
+ value['value'] = "NA"
+ return value
+
+ @field_validator("comment", mode="before")
+ @classmethod
+ def convert_comment_string(cls, value):
+ if isinstance(value, str):
+ value = dict(value=value, missing=True)
+ return value
+
+ @field_validator("full_batch_size")
+ @classmethod
+ def dict_to_int(cls, value):
+ if isinstance(value, dict):
+ value = value['value']
+ value = int(value)
+ return value
+
+ @field_validator("cost_centre", mode="before")
+ @classmethod
+ def str_to_dict(cls, value):
+ if isinstance(value, str):
+ value = dict(value=value)
+ return value
+
+ def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None):
+ """
+ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget
+
+ Args:
+ samples ():
+ disable (list, optional): a list of widgets to be disabled in the form. Defaults to None.
+ parent (QWidget): parent widget of the constructed object
+
+ Returns:
+ SubmissionFormWidget: Submission form widget
+ """
+ from frontend.widgets.submission_widget import ClientSubmissionFormWidget
+ if not samples:
+ samples = self.sample
+ return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable)
+
+ def to_sql(self):
+ sql = super().to_sql()
+ assert not any([isinstance(item, PydSample) for item in sql.sample])
+ sql.sample = []
+ if not sql.submissiontype:
+ sql.submissiontype = SubmissionType.query(name=self.submissiontype['value'])
+ match sql.submissiontype:
+ case SubmissionType():
+ pass
+ case _:
+ sql.submissiontype = SubmissionType.query(name="Default")
+ for k in list(self.model_fields.keys()) + list(self.model_extra.keys()):
+ attribute = getattr(self, k)
+ match k:
+ case "filepath":
+ sql._misc_info[k] = attribute.__str__()
+ continue
+ case _:
+ pass
+ return sql
+
+ @property
+ def max_sample_rank(self) -> int:
+ output = self.full_batch_size
+ if output > 0:
+ return output
+ else:
+ return max([item.submission_rank for item in self.sample])
+
+ def improved_dict(self, dictionaries: bool = True) -> dict:
+ output = super().improved_dict(dictionaries=dictionaries)
+ output['sample'] = self.sample
+ output['client_lab'] = output['clientlab']
+ try:
+ output['contact_email'] = output['contact']['email']
+ except TypeError:
+ pass
+ return sort_dict_by_list(output, self.key_value_order)
+
+ @property
+ def filename_template(self):
+ submissiontype = SubmissionType.query(name=self.submissiontype['value'])
+ return submissiontype.defaults['filename_template']
+
+
class PydRun(PydBaseClass): #, extra='allow'):
clientsubmission: PydClientSubmission | None = Field(default=None)
@@ -931,728 +1617,6 @@ class PydRun(PydBaseClass): #, extra='allow'):
return samples
-class PydContact(BaseModel):
- name: str
- phone: str | None
- email: str | None
-
- @field_validator("phone")
- @classmethod
- def enforce_phone_number(cls, value):
- area_regex = re.compile(r"^\(?(\d{3})\)?(-| )?")
- if len(value) > 8:
- match = area_regex.match(value)
- value = area_regex.sub(f"({match.group(1).strip()}) ", value)
- return value
-
- @report_result
- def to_sql(self) -> Tuple[Contact, Report]:
- """
- Converts this instance into a backend.db.models.organization. Contact instance.
- Does not query for existing contact.
-
- Returns:
- Contact: Contact instance
- """
- report = Report()
- instance = Contact.query(name=self.name, phone=self.phone, email=self.email)
- if not instance or isinstance(instance, list):
- instance = Contact()
- try:
- all_fields = self.model_fields + self.model_extra
- except TypeError:
- all_fields = self.model_fields
- for field in all_fields:
- value = getattr(self, field)
- match field:
- case "organization":
- value = [ClientLab.query(name=value)]
- case _:
- pass
- try:
- instance.__setattr__(field, value)
- except AttributeError as e:
- logger.error(f"Could not set {instance} {field} to {value} due to {e}")
- return instance, report
-
-
-class PydClientLab(BaseModel):
- name: str
- cost_centre: str
- contact: List[PydContact] | None
-
- @field_validator("contact", mode="before")
- @classmethod
- def string_to_list(cls, value):
- if isinstance(value, str):
- value = Contact.query(name=value)
- try:
- value = [value.to_pydantic()]
- except AttributeError:
- return None
- return value
-
- @report_result
- def to_sql(self) -> ClientLab:
- """
- Converts this instance into a backend.db.models.organization.Organization instance.
-
- Returns:
- Organization: Organization instance
- """
- report = Report()
- instance = ClientLab()
- for field in self.model_fields:
- match field:
- case "contact":
- value = getattr(self, field)
- if value:
- value = [item.to_sql() for item in value if item]
- case _:
- value = getattr(self, field)
- if value:
- setattr(instance, field, value)
- return instance, report
-
-
-class PydReagentRole(BaseModel):
- name: str
- eol_ext: timedelta | int | None
- uses: dict | None
- required: int | None = Field(default=1)
-
- @field_validator("eol_ext")
- @classmethod
- def int_to_timedelta(cls, value):
- if isinstance(value, int):
- return timedelta(days=value)
- return value
-
- # @report_result
- # def to_sql(self, kit: KitType) -> ReagentRole:
- # """
- # Converts this instance into a backend.db.models.ReagentType instance
- #
- # Args:
- # kit (KitType): KitType joined to the reagentrole
- #
- # Returns:
- # ReagentRole: ReagentType instance
- # """
- # report = Report()
- # instance: ReagentRole = ReagentRole.query(name=self.name)
- # if instance is None:
- # instance = ReagentRole(name=self.name, eol_ext=self.eol_ext)
- # try:
- # assoc = KitTypeReagentRoleAssociation.query(reagentrole=instance, kittype=kit)
- # except StatementError:
- # assoc = None
- # if assoc is None:
- # assoc = KitTypeReagentRoleAssociation(kittype=kit, reagentrole=instance, uses=self.uses,
- # required=self.required)
- # return instance, report
-
-
-# class PydKitType(BaseModel):
-# name: str
-# reagent_roles: List[PydReagent] = []
-#
-# @report_result
-# def to_sql(self) -> Tuple[KitType, Report]:
-# """
-# Converts this instance into a backend.db.models.kits.KitType instance
-#
-# Returns:
-# Tuple[KitType, Report]: KitType instance and report of results.
-# """
-# report = Report()
-# instance = KitType.query(name=self.name)
-# if instance is None:
-# instance = KitType(name=self.name)
-# for role in self.reagent_roles:
-# role.to_sql(instance)
-# return instance, report
-
-
-class PydEquipmentRole(BaseModel):
- name: str
- equipment: List[PydEquipment]
- process: List[str] | None
-
- @field_validator("process", mode="before")
- @classmethod
- def expand_processes(cls, value):
- if isinstance(value, GeneratorType):
- value = [item for item in value]
- return value
-
- def to_form(self, parent, used: list) -> "RoleComboBox":
- """
- Creates a widget for user input into this class.
-
- Args:
- parent (_type_): parent widget
- used (list): list of equipment already added to procedure
-
- Returns:
- RoleComboBox: widget
- """
- from frontend.widgets.equipment_usage import RoleComboBox
- return RoleComboBox(parent=parent, role=self, used=used)
-
-
-class PydProcess(PydBaseClass, extra="allow"):
- name: str
- version: str = Field(default="1.0")
- # equipment: List[str]
- tips: List[PydTips]
-
- @field_validator("tips", mode="before")
- @classmethod
- def enforce_list(cls, value):
- if not isinstance(value, list):
- value = [value]
- output = []
- for v in value:
- if issubclass(v.__class__, BaseClass):
- output.append(v.name)
- else:
- output.append(v)
- return output
-
- @field_validator("tips", mode="before")
- @classmethod
- def validate_tips(cls, value):
- if not value:
- return []
- value = [item for item in value if item]
- return value
-
- @field_validator("version", mode="before")
- @classmethod
- def enforce_float_string(cls, value):
- if isinstance(value, float):
- value = str(value)
- return value
-
- @report_result
- def to_sql(self):
- report = Report()
- name = self.name.split("-")[0]
- # NOTE: can't use query_or_create due to name not being part of ProcessVersion
- instance = ProcessVersion.query(name=name, version=self.version, limit=1)
- if not instance:
- instance = ProcessVersion()
- return instance, report
-
-
-class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
- """Allows for creation of arbitrary pydantic models"""
- instance: BaseClass
-
- @report_result
- def to_sql(self):
- fields = [item for item in self.model_extra]
- for field in fields:
- try:
- field_type = getattr(self.instance.__class__, field).property
- except AttributeError:
- logger.error(f"No attribute: {field} in {self.instance.__class__}")
- continue
- match field_type:
- case _RelationshipDeclared():
- field_value = field_type.entity.class_.argument.query(name=getattr(self, field))
- case ColumnProperty():
- field_value = getattr(self, field)
- self.instance.__setattr__(field, field_value)
- return self.instance
-
-
-# NOTE: Generified objects below:
-
-class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
- proceduretype: ProcedureType | None = Field(default=None)
- run: Run | str | None = Field(default=None)
- name: dict = Field(default=dict(value="NA", missing=True), validate_default=True)
- technician: dict = Field(default=dict(value="NA", missing=True))
- repeat: bool = Field(default=False)
- repeat_of: Procedure | None = Field(default=None)
- plate_map: str | None = Field(default=None)
- reagent: list | None = Field(default=[])
- reagentrole: dict | None = Field(default={}, validate_default=True)
- sample: List[PydSample] = Field(default=[])
- equipment: List[PydEquipment] = Field(default=[])
- result: List[PydResults] | List[dict] = Field(default=[])
-
- @field_validator("name", "technician", mode="before")#"kittype", mode="before")
- @classmethod
- def convert_to_dict(cls, value):
- if not value:
- value = "NA"
- if isinstance(value, str):
- value = dict(value=value, missing=False)
- return value
-
- @field_validator("proceduretype", mode="before")
- @classmethod
- def lookup_proceduretype(cls, value):
- match value:
- case dict():
- value = ProcedureType.query(name=value['name'])
- case str():
- value = ProcedureType.query(name=value)
- case _:
- pass
- return value
-
- @field_validator("name")
- @classmethod
- def rescue_name(cls, value, values):
- if value['value'] == cls.model_fields['name'].default['value']:
- if values.data['proceduretype']:
- procedure_type = values.data['proceduretype'].name
- else:
- procedure_type = None
- if values.data['run']:
- run = values.data['run'].rsl_plate_number
- else:
- run = None
- value['value'] = f"{run}-{procedure_type}"
- value['missing'] = True
- return value
-
- @field_validator("name", "technician")#, "kittype")
- @classmethod
- def set_colour(cls, value):
- try:
- if value["missing"]:
- value["colour"] = "FE441D"
- else:
- value["colour"] = "6ffe1d"
- except KeyError:
- pass
- return value
-
- @field_validator("reagentrole")
- @classmethod
- def rescue_reagentrole(cls, value, values):
- if not value:
- value = {}
- for reagentrole in values.data['proceduretype'].reagentrole:
- reagents = [reagent.lot_dicts for reagent in reagentrole.reagent]
- value[reagentrole.name] = flatten_list(reagents)
- return value
-
- @field_validator("run")
- @classmethod
- def lookup_run(cls, value):
- if isinstance(value, str):
- value = Run.query(name=value)
- return value
-
- @field_validator("repeat_of")
- @classmethod
- def drop_empty_string(cls, value):
- if value == "":
- value = None
- return value
-
- @property
- def rows_columns_count(self) -> tuple[int, int]:
- try:
- proc: ProcedureType = Procedure.query(name=self.name).proceduretype
- except AttributeError as e:
- logger.error(f"Can't get rows, columns due to {e}")
- return 0, 0
- return proc.plate_rows, proc.plate_columns
-
- @property
- def max_sample_rank(self) -> int:
- rows, columns = self.rows_columns_count
- output = rows * columns
- if output > 0:
- return output
- else:
- try:
- return max([item.procedure_rank for item in self.sample])
- except TypeError:
- return len(self.sample)
-
- # def update_kittype_reagentroles(self, kittype: str | KitType):
- # if kittype == self.__class__.model_fields['kittype'].default['value']:
- # return
- # if isinstance(kittype, str):
- # kittype_obj = KitType.query(name=kittype)
- # try:
- # self.reagentrole = {
- # item.name: item.get_reagents(kittype=kittype_obj) + [PydReagent(name="--New--", lot="", reagentrole="")]
- # for item in
- # kittype_obj.get_reagents(proceduretype=self.proceduretype)}
- # except AttributeError:
- # self.reagentrole = {}
- # reordered_options = {}
- # if self.reagentrole:
- # for k, v in self.reagentrole.items():
- # reordered_options[k] = self.reorder_reagents(reagentrole=k, options=v)
- # self.reagentrole = reordered_options
- # self.kittype['value'] = kittype
- # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
-
- def reorder_reagents(self, reagentrole: str, options: list):
- reagent_used = next((reagent for reagent in self.reagent if reagent.reagentrole == reagentrole), None)
- if not reagent_used:
- return options
- roi = next((item for item in options if item.lot == reagent_used.lot and item.name == reagent_used.name), None)
- if not roi:
- return options
- options.insert(0, options.pop(options.index(roi)))
- return options
-
- # def update_kittype_equipmentroles(self, kittype: str | KitType):
- # if kittype == self.__class__.model_fields['kittype'].default['value']:
- # return
- # if isinstance(kittype, str):
- # kittype_obj = KitType.query(name=kittype)
- # try:
- # self.equipment = {item.name: item.get_reagents(kittype=kittype_obj) for item in
- # kittype_obj.get_reagents(proceduretype=self.proceduretype)}
- # except AttributeError:
- # self.reagentrole = {}
- # self.kittype['value'] = kittype
- # self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
-
- def update_samples(self, sample_list: List[dict]):
- for iii, sample_dict in enumerate(sample_list, start=1):
- if sample_dict['sample_id'].startswith("blank_"):
- sample_dict['sample_id'] = ""
- row, column = self.proceduretype.ranked_plate[sample_dict['index']]
- try:
- sample = next(
- (item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper()))
- except StopIteration:
- # NOTE: Code to check for added controls.
- logger.debug(
- f"Sample not found by name: {sample_dict['sample_id']}, checking row {row} column {column}")
- try:
- sample = next(
- (item for item in self.sample if item.row == row and item.column == column))
- except StopIteration:
- logger.error(f"Couldn't find sample: {pformat(sample_dict)}")
- continue
- sample.sample_id = sample_dict['sample_id']
- sample.well_id = sample_dict['sample_id']
- sample.row = row
- sample.column = column
- sample.procedure_rank = sample_dict['index']
-
- def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str):
- try:
- removable = next((item for item in self.reagent if item.reagentrole == reagentrole), None)
- except AttributeError as e:
- logger.error(self.reagent)
- raise e
- if removable:
- idx = self.reagent.index(removable)
- self.reagent.remove(removable)
- else:
- idx = 0
- insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
- self.reagent.insert(idx, insertable)
-
- @classmethod
- def update_new_reagents(cls, reagent: PydReagent):
- reg = reagent.to_sql()
- reg.save()
-
- def to_sql(self, new: bool = False):
- from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation
- logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}")
- if new:
- sql = Procedure()
- else:
- sql = super().to_sql()
- if isinstance(self.name, dict):
- sql.name = self.name['value']
- else:
- sql.name = self.name
- if isinstance(self.technician, dict):
- sql.technician = self.technician['value']
- else:
- sql.technician = self.technician
- if sql.repeat:
- regex = re.compile(r".*\dR\d$")
- repeats = [item for item in self.run.procedure if
- self.repeat_of.name in item.name and bool(regex.match(item.name))]
- sql.name = f"{self.repeat_of.name}-R{str(len(repeats) + 1)}"
- sql.repeat_of = self.repeat_of
- sql.started_date = datetime.now()
- if self.run:
- sql.run = self.run
- if self.proceduretype:
- sql.proceduretype = self.proceduretype
- # NOTE: reset reagent associations.
- for reagent in self.reagent:
- if isinstance(reagent, dict):
- reagent = PydReagent(**reagent)
- reagentrole = reagent.reagentrole
- reagent = reagent.to_sql()
- if reagent not in sql.reagentlot:
- # NOTE: Remove any previous association for this role.
- if sql.id:
- removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole)
- else:
- removable = []
- if removable:
- if isinstance(removable, list):
- for r in removable:
- r.delete()
- else:
- removable.delete()
- reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole)
- try:
- start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1
- except ValueError:
- start_index = 1
- relevant_samples = [sample for sample in self.sample if
- not sample.sample_id.startswith("blank_") and not sample.sample_id == ""]
- assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1)
- for iii, sample in enumerate(relevant_samples):
- sample_sql = sample.to_sql()
- if sql.run:
- if sample_sql not in sql.run.sample:
- run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row,
- column=sample.column)
- if sample_sql not in sql.sample:
- proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
- row=sample.row, column=sample.column,
- procedure_rank=sample.procedure_rank)
- for equipment in self.equipment:
- equip, _ = equipment.to_sql()
- logger.debug(f"Equipment:\n{pformat(equip.__dict__)}")
- if isinstance(equipment.process, list):
- equipment.process = equipment.process[0]
- if isinstance(equipment.tips, list):
- try:
- equipment.tips = equipment.tips[0]
- except IndexError:
- equipment.tips = None
- if equip not in sql.equipment:
- equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql,
- equipmentrole=equip.equipmentrole[0])
- process = equipment.process.to_sql()
- equip_assoc.processversion = process
- logger.debug(f"Tips: {type(equipment.tips)}")
- try:
- tipslot = equipment.tips.to_sql()
- logger.debug(f"Tipslot: {tipslot.__dict__}")
- except AttributeError:
- tipslot = None
-
- equip_assoc.tipslot = tipslot
- return sql, None
-
-
-class PydClientSubmission(PydBaseClass):
- # sql_object: ClassVar = ClientSubmission
-
- key_value_order = ["submitter_plate_id",
- "submitted_date",
- "client_lab",
- "contact",
- "contact_email",
- "cost_centre",
- "submission_type",
- "sample_count",
- "submission_category"]
-
- filepath: Path | None = Field(default=None)
- submissiontype: dict | None
- submitted_date: dict | None = Field(default=dict(value=date.today(), missing=True), validate_default=True)
- clientlab: dict | None
- sample_count: dict | None
- full_batch_size: int | dict = Field(default=0)
- submission_category: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
- comment: dict | None = Field(default=dict(value="", missing=True), validate_default=True)
- cost_centre: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
- contact: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
- submitter_plate_id: dict | None = Field(default=dict(value=None, missing=True), validate_default=True)
- sample: List[PydSample] | None = Field(default=[])
-
- @field_validator("submissiontype", "clientlab", "contact", mode="before")
- @classmethod
- def enforce_value(cls, value):
- if isinstance(value, str):
- value = dict(value=value, missing=False)
- return value
-
- @field_validator("submitted_date", mode="before")
- @classmethod
- def enforce_submitted_date(cls, value):
- match value:
- case str():
- value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S"), missing=False)
- case date() | datetime():
- value = dict(value=value, missing=False)
- case _:
- pass
- return value
-
- @field_validator("submitter_plate_id", mode="before")
- @classmethod
- def enforce_submitter_plate_id(cls, value):
- if isinstance(value, str):
- value = dict(value=value, missing=False)
- return value
-
- @field_validator("submission_category", mode="before")
- @classmethod
- def enforce_submission_category_id(cls, value):
- if isinstance(value, str):
- value = dict(value=value, missing=False)
- return value
-
- @field_validator("sample_count", mode="before")
- @classmethod
- def enforce_sample_count(cls, value):
- if isinstance(value, str) or isinstance(value, int):
- value = dict(value=value, missing=False)
- return value
-
- @field_validator("sample_count")
- @classmethod
- def enforce_integer(cls, value):
- if not value['value']:
- value['value'] = 0
- try:
- value['value'] = int(value['value'])
- except (ValueError, TypeError):
- raise f"sample count value must be an integer"
- return value
-
- @field_validator("submitter_plate_id")
- @classmethod
- def create_submitter_plate_num(cls, value, values):
- if value['value'] in [None, "None"]:
- val = f"{values.data['submissiontype']['value']}-{values.data['submission_category']['value']}-{values.data['submitted_date']['value']}"
- return dict(value=val, missing=True)
- else:
- value['value'] = value['value'].strip()
- return value
-
- @field_validator("submitted_date")
- @classmethod
- def rescue_date(cls, value):
- if not value:
- value = dict(value=None)
- try:
- check = value['value'] is None
- except TypeError:
- check = True
- if check:
- value.update(dict(value=date.today(), missing=True))
- else:
- match value['value']:
- case str():
- value['value'] = datetime.strptime(value['value'], "%Y-%m-%d")
- value['value'] = datetime.combine(value['value'], datetime.now().time())
- case _:
- pass
- return value
-
- @field_validator("submission_category")
- @classmethod
- def enforce_typing(cls, value, values):
- if not value['value'] in ["Research", "Diagnostic", "Surveillance", "Validation"]:
- try:
- value['value'] = values.data['submissiontype']['value']
- except (AttributeError, KeyError):
- value['value'] = "NA"
- return value
-
- @field_validator("comment", mode="before")
- @classmethod
- def convert_comment_string(cls, value):
- if isinstance(value, str):
- value = dict(value=value, missing=True)
- return value
-
- @field_validator("full_batch_size")
- @classmethod
- def dict_to_int(cls, value):
- if isinstance(value, dict):
- value = value['value']
- value = int(value)
- return value
-
- @field_validator("cost_centre", mode="before")
- @classmethod
- def str_to_dict(cls, value):
- if isinstance(value, str):
- value = dict(value=value)
- return value
-
- def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None):
- """
- Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget
-
- Args:
- samples ():
- disable (list, optional): a list of widgets to be disabled in the form. Defaults to None.
- parent (QWidget): parent widget of the constructed object
-
- Returns:
- SubmissionFormWidget: Submission form widget
- """
- from frontend.widgets.submission_widget import ClientSubmissionFormWidget
- if not samples:
- samples = self.sample
- return ClientSubmissionFormWidget(parent=parent, clientsubmission=self, samples=samples, disable=disable)
-
- def to_sql(self):
- sql = super().to_sql()
- assert not any([isinstance(item, PydSample) for item in sql.sample])
- sql.sample = []
- if not sql.submissiontype:
- sql.submissiontype = SubmissionType.query(name=self.submissiontype['value'])
- match sql.submissiontype:
- case SubmissionType():
- pass
- case _:
- sql.submissiontype = SubmissionType.query(name="Default")
- for k in list(self.model_fields.keys()) + list(self.model_extra.keys()):
- attribute = getattr(self, k)
- match k:
- case "filepath":
- sql._misc_info[k] = attribute.__str__()
- continue
- case _:
- pass
- return sql
-
- @property
- def max_sample_rank(self) -> int:
- output = self.full_batch_size
- if output > 0:
- return output
- else:
- return max([item.submission_rank for item in self.sample])
-
- def improved_dict(self, dictionaries: bool = True) -> dict:
- output = super().improved_dict(dictionaries=dictionaries)
- output['sample'] = self.sample
- output['client_lab'] = output['clientlab']
- try:
- output['contact_email'] = output['contact']['email']
- except TypeError:
- pass
- return sort_dict_by_list(output, self.key_value_order)
-
- @property
- def filename_template(self):
- submissiontype = SubmissionType.query(name=self.submissiontype['value'])
- return submissiontype.defaults['filename_template']
-
-
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
result: dict = Field(default={})
result_type: str = Field(default="NA")
diff --git a/src/submissions/frontend/widgets/procedure_creation.py b/src/submissions/frontend/widgets/procedure_creation.py
index c8c6249..2c43698 100644
--- a/src/submissions/frontend/widgets/procedure_creation.py
+++ b/src/submissions/frontend/widgets/procedure_creation.py
@@ -69,13 +69,10 @@ class ProcedureCreation(QDialog):
equipment['name'] == relevant_procedure_item.name))
equipmentrole['equipment'].insert(0, equipmentrole['equipment'].pop(
equipmentrole['equipment'].index(item_in_er_list)))
- # proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True)
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
- self.update_equipment = EquipmentUsage.update_equipment
regex = re.compile(r".*R\d$")
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
- logger.debug(f"Procedure:\n{pformat(self.procedure.__dict__)}")
- logger.debug(f"ProcedureType:\n{pformat(proceduretype_dict)}")
+ # sys.exit(f"ProcedureDict:\n{pformat(proceduretype_dict)}")
html = render_details_template(
template_name="procedure_creation",
js_in=["procedure_form", "grid_drag", "context_menu"],
@@ -88,8 +85,9 @@ class ProcedureCreation(QDialog):
self.webview.setHtml(html)
@pyqtSlot(str, str, str, str)
- def update_equipment(self, equipmentrole: str, equipment: str, process: str, tips: str):
+ def update_equipment(self, equipmentrole: str, equipment: str, processversion: str, tips: str):
from backend.db.models import Equipment, ProcessVersion, TipsLot
+ logger.debug(f"\n\nEquipmentRole: {equipmentrole}, Equipment: {equipment}, Process: {processversion}, Tips: {tips}\n\n")
try:
equipment_of_interest = next(
(item for item in self.procedure.equipment if item.equipmentrole == equipmentrole))
@@ -103,9 +101,9 @@ class ProcedureCreation(QDialog):
eoi.name = equipment.name
eoi.asset_number = equipment.asset_number
eoi.nickname = equipment.nickname
- process_name, version = process.split("-v")
- process = ProcessVersion.query(name=process_name, version=version, limit=1)
- eoi.process = process
+ process_name, version = processversion.split("-v")
+ processversion = ProcessVersion.query(name=process_name, version=version, limit=1)
+ eoi.processversion = processversion.to_pydantic()
try:
tips_manufacturer, tipsref, lot = [item if item != "" else None for item in tips.split("-")]
tips = TipsLot.query(manufacturer=tips_manufacturer, ref=tipsref, lot=lot)
diff --git a/src/submissions/templates/procedure_details.html b/src/submissions/templates/procedure_details.html
index 7f8c615..e9b92dc 100644
--- a/src/submissions/templates/procedure_details.html
+++ b/src/submissions/templates/procedure_details.html
@@ -56,10 +56,15 @@
{% endif %}
{% if procedure['sample'] %}
+ {% if procedure['platemap']|length > 5 %}
+
+ {{ procedure['platemap'] }}
+ {% else %}
{% for sample in procedure['sample'] %}
- {{ sample['sample_id']}}
+ {{ sample['sample_id']}}
{% endfor %}