Pydantic switchover for omni is largely complete. Will need some debugging.

This commit is contained in:
lwark
2025-02-27 10:00:43 -06:00
parent f57aa3c3f0
commit abcdbac5b8
8 changed files with 431 additions and 33 deletions

View File

@@ -333,6 +333,7 @@ class BaseClass(Base):
def check_all_attributes(self, attributes: dict) -> bool: def check_all_attributes(self, attributes: dict) -> bool:
""" """
Checks this instance against a dictionary of attributes to determine if they are a match.
Args: Args:
attributes (dict): A dictionary of attributes to be check for equivalence attributes (dict): A dictionary of attributes to be check for equivalence
@@ -345,9 +346,16 @@ class BaseClass(Base):
# print(getattr(self.__class__, key).property) # print(getattr(self.__class__, key).property)
if value.lower() == "none": if value.lower() == "none":
value = None value = None
logger.debug(f"Attempting to grab attribute: {key}")
self_value = getattr(self, key) self_value = getattr(self, key)
class_attr = getattr(self.__class__, key) class_attr = getattr(self.__class__, key)
match class_attr.property: logger.debug(f"Self value: {self_value}, class attr: {class_attr} of type: {type(class_attr)}")
if isinstance(class_attr, property):
filter = "property"
else:
filter = class_attr.property
match filter:
# match class_attr:
case ColumnProperty(): case ColumnProperty():
match class_attr.type: match class_attr.type:
case INTEGER(): case INTEGER():
@@ -359,13 +367,25 @@ class BaseClass(Base):
value = int(value) value = int(value)
case FLOAT(): case FLOAT():
value = float(value) value = float(value)
case "property":
pass
case _RelationshipDeclared(): case _RelationshipDeclared():
logger.debug(f"Checking {self_value}")
try: try:
self_value = self_value.name self_value = self_value.name
except AttributeError: except AttributeError:
pass pass
if class_attr.property.uselist: if class_attr.property.uselist:
self_value = self_value.__str__() self_value = self_value.__str__()
try:
logger.debug(f"Check if {self_value.__class__} is subclass of {self.__class__}")
check = issubclass(self_value.__class__, self.__class__)
except TypeError as e:
logger.error(f"Couldn't check if {self_value.__class__} is subclass of {self.__class__} due to {e}")
check = False
if check:
logger.debug(f"Checking for subclass name.")
self_value = self_value.name
logger.debug(f"Checking self_value {self_value} of type {type(self_value)} against attribute {value} of type {type(value)}") logger.debug(f"Checking self_value {self_value} of type {type(self_value)} against attribute {value} of type {type(value)}")
if self_value != value: if self_value != value:
output = False output = False
@@ -393,13 +413,15 @@ class BaseClass(Base):
logger.debug(f"Setting _RelationshipDeclared to {value}") logger.debug(f"Setting _RelationshipDeclared to {value}")
if field_type.property.uselist: if field_type.property.uselist:
logger.debug(f"Setting with uselist") logger.debug(f"Setting with uselist")
if self.__getattribute__(key) is not None: existing = self.__getattribute__(key)
if existing is not None:
if isinstance(value, list): if isinstance(value, list):
value = self.__getattribute__(key) + value value = existing + value
else: else:
value = self.__getattribute__(key) + [value] value = existing + [value]
else: else:
value = [value] value = [value]
value = list(set(value))
return super().__setattr__(key, value) return super().__setattr__(key, value)
else: else:
if isinstance(value, list): if isinstance(value, list):

View File

@@ -15,6 +15,7 @@ from pandas import ExcelFile
from pathlib import Path from pathlib import Path
from . import Base, BaseClass, Organization, LogMixin from . import Base, BaseClass, Organization, LogMixin
from io import BytesIO from io import BytesIO
from inspect import getouterframes, currentframe
logger = logging.getLogger(f'submissions.{__name__}') logger = logging.getLogger(f'submissions.{__name__}')
@@ -227,6 +228,20 @@ class KitType(BaseClass):
# logger.debug(f"Output: {output}") # logger.debug(f"Output: {output}")
return output, new_kit return output, new_kit
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[KitType, bool]:
from backend.validators.pydant import PydKitType
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = PydKitType(**kwargs)
new = True
instance = instance.to_sql()
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -380,8 +395,27 @@ class KitType(BaseClass):
new_process.equipment_roles.append(new_role) new_process.equipment_roles.append(new_role)
return new_kit return new_kit
def to_pydantic(self): def to_omni(self, expand: bool = False) -> "OmniKitType":
pass from backend.validators.omni_gui_objects import OmniKitType
# logger.debug(f"self.name: {self.name}")
# level = len(getouterframes(currentframe()))
# logger.warning(f"Function level is {level}")
if expand:
processes = [item.to_omni() for item in self.processes]
kit_reagentrole_associations = [item.to_omni() for item in self.kit_reagentrole_associations]
kit_submissiontype_associations = [item.to_omni() for item in self.kit_submissiontype_associations]
else:
processes = [item.name for item in self.processes]
kit_reagentrole_associations = [item.name for item in self.kit_reagentrole_associations]
kit_submissiontype_associations = [item.name for item in self.kit_submissiontype_associations]
data = dict(
name=self.name,
processes=processes,
kit_reagentrole_associations=kit_reagentrole_associations,
kit_submissiontype_associations=kit_submissiontype_associations
)
logger.debug(f"Creating omni for {pformat(data)}")
return OmniKitType(instance_object=self, **data)
class ReagentRole(BaseClass): class ReagentRole(BaseClass):
@@ -413,6 +447,20 @@ class ReagentRole(BaseClass):
""" """
return f"<ReagentRole({self.name})>" return f"<ReagentRole({self.name})>"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[ReagentRole, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -496,6 +544,11 @@ class ReagentRole(BaseClass):
def save(self): def save(self):
super().save() super().save()
def to_omni(self, expand: bool=False):
from backend.validators.omni_gui_objects import OmniReagentRole
logger.debug(f"Constructing OmniReagentRole with name {self.name}")
return OmniReagentRole(instance_object=self, name=self.name, eol_ext=self.eol_ext)
class Reagent(BaseClass, LogMixin): class Reagent(BaseClass, LogMixin):
""" """
@@ -1010,6 +1063,20 @@ class SubmissionType(BaseClass):
from .submissions import BasicSubmission from .submissions import BasicSubmission
return BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.name) return BasicSubmission.find_polymorphic_subclass(polymorphic_identity=self.name)
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[SubmissionType, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -1112,6 +1179,43 @@ class SubmissionType(BaseClass):
Organization.import_from_yml(filepath=filepath) Organization.import_from_yml(filepath=filepath)
return submission_type return submission_type
def to_omni(self, expand: bool = False):
from backend.validators.omni_gui_objects import OmniSubmissionType
# level = len(getouterframes(currentframe()))
# logger.warning(f"Function level is {level}")
# try:
# info_map = self.submission_type.info_map
# except AttributeError:
# info_map = {}
# try:
# defaults = self.submission_type.defaults
# except AttributeError:
# defaults = {}
# try:
# sample_map = self.submission_type.sample_map
# except AttributeError:
# sample_map = {}
try:
template_file = self.template_file
except AttributeError:
template_file = bytes()
if expand:
try:
processes = [item.to_omni() for item in self.processes]
except AttributeError:
processes = []
else:
processes = [item.name for item in self.processes]
return OmniSubmissionType(
instance_object=self,
name=self.name,
info_map=self.info_map,
defaults=self.defaults,
template_file=template_file,
processes=processes,
sample_map=self.sample_map
)
class SubmissionTypeKitTypeAssociation(BaseClass): class SubmissionTypeKitTypeAssociation(BaseClass):
""" """
@@ -1164,10 +1268,18 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
def kittype(self): def kittype(self):
return self.kit_type return self.kit_type
@kittype.setter
def kittype(self, value):
self.kit_type = value
@hybrid_property @hybrid_property
def submissiontype(self): def submissiontype(self):
return self.submission_type return self.submission_type
@submissiontype.setter
def submissiontype(self, value):
self.submission_type = value
@property @property
def name(self): def name(self):
try: try:
@@ -1175,6 +1287,20 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
except AttributeError: except AttributeError:
return "Blank SubmissionTypeKitTypeAssociation" return "Blank SubmissionTypeKitTypeAssociation"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[SubmissionTypeKitTypeAssociation, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -1226,6 +1352,36 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type) base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type)
return base_dict return base_dict
def to_omni(self, expand: bool = False):
from backend.validators.omni_gui_objects import OmniSubmissionTypeKitTypeAssociation
# level = len(getouterframes(currentframe()))
# logger.warning(f"Function level is {level}")
if expand:
try:
submissiontype = self.submission_type.to_omni()
except AttributeError:
submissiontype = ""
try:
kittype = self.kit_type.to_omni()
except AttributeError:
kittype = ""
else:
submissiontype = self.submission_type.name
kittype = self.kit_type.name
# try:
# processes = [item.to_omni() for item in self.submission_type.processes]
# except AttributeError:
# processes = []
return OmniSubmissionTypeKitTypeAssociation(
instance_object=self,
submissiontype=submissiontype,
kittype=kittype,
mutable_cost_column=self.mutable_cost_column,
mutable_cost_sample=self.mutable_cost_sample,
constant_cost=self.constant_cost
# processes=processes,
)
class KitTypeReagentRoleAssociation(BaseClass): class KitTypeReagentRoleAssociation(BaseClass):
""" """
@@ -1312,11 +1468,26 @@ class KitTypeReagentRoleAssociation(BaseClass):
raise ValueError(f'{value} is not a reagentrole') raise ValueError(f'{value} is not a reagentrole')
return value return value
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[KitTypeReagentRoleAssociation, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
kittype: KitType | str | None = None, kittype: KitType | str | None = None,
reagentrole: ReagentRole | str | None = None, reagentrole: ReagentRole | str | None = None,
submissiontype: SubmissionType | str | None = None,
limit: int = 0, limit: int = 0,
**kwargs **kwargs
) -> KitTypeReagentRoleAssociation | List[KitTypeReagentRoleAssociation]: ) -> KitTypeReagentRoleAssociation | List[KitTypeReagentRoleAssociation]:
@@ -1346,6 +1517,14 @@ class KitTypeReagentRoleAssociation(BaseClass):
query = query.join(ReagentRole).filter(ReagentRole.name == reagentrole) query = query.join(ReagentRole).filter(ReagentRole.name == reagentrole)
case _: case _:
pass pass
match submissiontype:
case SubmissionType():
query = query.filter(cls.submission_type == submissiontype)
case str():
query = query.join(SubmissionType).filter(SubmissionType.name == submissiontype)
case _:
pass
pass
if kittype is not None and reagentrole is not None: if kittype is not None and reagentrole is not None:
limit = 1 limit = 1
return cls.execute_query(query=query, limit=limit) return cls.execute_query(query=query, limit=limit)
@@ -1395,6 +1574,39 @@ class KitTypeReagentRoleAssociation(BaseClass):
) )
return dicto return dicto
def to_omni(self, expand: bool = False) -> "OmniReagentRole":
from backend.validators.omni_gui_objects import OmniKitTypeReagentRoleAssociation
try:
eol_ext = self.reagent_role.eol_ext
except AttributeError:
eol_ext = timedelta(days=0)
if expand:
try:
submission_type = self.submission_type.to_omni()
except AttributeError:
submission_type = ""
try:
kit_type = self.kit_type.to_omni()
except AttributeError:
kit_type = ""
try:
reagent_role = self.reagent_role.to_omni()
except AttributeError:
reagent_role = ""
else:
submission_type = self.submission_type.name
kit_type = self.kit_type.name
reagent_role = self.reagent_role.name
return OmniKitTypeReagentRoleAssociation(
instance_object=self,
reagent_role=reagent_role,
eol_ext=eol_ext,
required=self.required,
submission_type=submission_type,
kit_type=kit_type,
uses=self.uses
)
class SubmissionReagentAssociation(BaseClass): class SubmissionReagentAssociation(BaseClass):
""" """
@@ -1716,9 +1928,28 @@ class EquipmentRole(BaseClass):
pyd_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit) pyd_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit)
return PydEquipmentRole(equipment=equipment, **pyd_dict) return PydEquipmentRole(equipment=equipment, **pyd_dict)
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[EquipmentRole, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, name: str | None = None, id: int | None = None, limit: int = 0) -> EquipmentRole | List[ def query(cls,
name: str | None = None,
id: int | None = None,
limit: int = 0,
**kwargs
) -> EquipmentRole | List[
EquipmentRole]: EquipmentRole]:
""" """
Lookup Equipment roles. Lookup Equipment roles.
@@ -1779,6 +2010,10 @@ class EquipmentRole(BaseClass):
processes = self.get_processes(submission_type=submission_type, extraction_kit=kit_type) processes = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)
return dict(role=self.name, processes=[item for item in processes]) return dict(role=self.name, processes=[item for item in processes])
def to_omni(self, expand: bool = False) -> "OmniEquipmentRole":
from backend.validators.omni_gui_objects import OmniEquipmentRole
return OmniEquipmentRole(instance_object=self, name=self.name)
class SubmissionEquipmentAssociation(BaseClass): class SubmissionEquipmentAssociation(BaseClass):
""" """
@@ -1949,6 +2184,20 @@ class Process(BaseClass):
if value not in field: if value not in field:
field.append(value) field.append(value)
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Process, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -2013,7 +2262,15 @@ class Process(BaseClass):
def save(self): def save(self):
super().save() super().save()
# @classmethod def to_omni(self, expand: bool = False):
from backend.validators.omni_gui_objects import OmniProcess
return OmniProcess(
instance_object=self,
name=self.name,
submission_types=[item.to_omni() for item in self.submission_types],
equipment_roles=[item.to_omni() for item in self.equipment_roles],
tip_roles=[item.to_omni() for item in self.tip_roles]
)
class TipRole(BaseClass): class TipRole(BaseClass):
@@ -2034,13 +2291,51 @@ class TipRole(BaseClass):
submission_types = association_proxy("tiprole_submissiontype_associations", "submission_type") submission_types = association_proxy("tiprole_submissiontype_associations", "submission_type")
@hybrid_property
def tips(self):
return self.instances
def __repr__(self): def __repr__(self):
return f"<TipRole({self.name})>" return f"<TipRole({self.name})>"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[TipRole, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod
@setup_lookup
def query(cls, name: str | None = None, limit: int = 0, **kwargs) -> TipRole | List[TipRole]:
query = cls.__database_session__.query(cls)
match name:
case str():
query = query.filter(cls.name == name)
limit = 1
case _:
pass
return cls.execute_query(query=query, limit=limit)
@check_authorization @check_authorization
def save(self): def save(self):
super().save() super().save()
def to_omni(self, expand: bool = False):
from backend.validators.omni_gui_objects import OmniTipRole
return OmniTipRole(
instance_object=self,
name=self.name,
tips=[item.to_omni() for item in self.tips]
)
class Tips(BaseClass, LogMixin): class Tips(BaseClass, LogMixin):
""" """
@@ -2070,6 +2365,20 @@ class Tips(BaseClass, LogMixin):
def __repr__(self): def __repr__(self):
return f"<Tips({self.name})>" return f"<Tips({self.name})>"
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Tips, bool]:
new = False
disallowed = ['expiry']
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod @classmethod
def query(cls, name: str | None = None, lot: str | None = None, limit: int = 0, **kwargs) -> Tips | List[Tips]: def query(cls, name: str | None = None, lot: str | None = None, limit: int = 0, **kwargs) -> Tips | List[Tips]:
""" """
@@ -2101,6 +2410,13 @@ class Tips(BaseClass, LogMixin):
def save(self): def save(self):
super().save() super().save()
def to_omni(self, expand: bool = True):
from backend.validators.omni_gui_objects import OmniTips
return OmniTips(
instance_object=self,
name=self.name
)
class SubmissionTypeTipRoleAssociation(BaseClass): class SubmissionTypeTipRoleAssociation(BaseClass):
""" """
@@ -2129,6 +2445,9 @@ class SubmissionTypeTipRoleAssociation(BaseClass):
def save(self): def save(self):
super().save() super().save()
def to_omni(self):
pass
class SubmissionTipsAssociation(BaseClass): class SubmissionTipsAssociation(BaseClass):
""" """

View File

@@ -10,9 +10,7 @@ from zipfile import ZipFile, BadZipfile
from tempfile import TemporaryDirectory, TemporaryFile from tempfile import TemporaryDirectory, TemporaryFile
from operator import itemgetter from operator import itemgetter
from pprint import pformat from pprint import pformat
from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.hybrid import hybrid_property
from . import BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, LogMixin, SubmissionReagentAssociation from . import BaseClass, Reagent, SubmissionType, KitType, Organization, Contact, LogMixin, SubmissionReagentAssociation
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func
from sqlalchemy.orm import relationship, validates, Query from sqlalchemy.orm import relationship, validates, Query
@@ -24,7 +22,7 @@ from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as S
from openpyxl import Workbook from openpyxl import Workbook
from openpyxl.drawing.image import Image as OpenpyxlImage from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \ from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \
report_result, create_holidays_for_year report_result, create_holidays_for_year, check_dictionary_inclusion_equality
from datetime import datetime, date, timedelta from datetime import datetime, date, timedelta
from typing import List, Any, Tuple, Literal, Generator, Type from typing import List, Any, Tuple, Literal, Generator, Type
from dateutil.parser import parse from dateutil.parser import parse
@@ -558,12 +556,14 @@ class BasicSubmission(BaseClass, LogMixin):
existing = value existing = value
case _: case _:
existing = self.__getattribute__(key) existing = self.__getattribute__(key)
logger.debug(f"Existing value is {pformat(existing)}")
if value in ['', 'null', None]: if value in ['', 'null', None]:
logger.error(f"No value given, not setting.") logger.error(f"No value given, not setting.")
return return
if existing is None: if existing is None:
existing = [] existing = []
if value in existing: # if value in existing:
if check_dictionary_inclusion_equality(existing, value):
logger.warning("Value already exists. Preventing duplicate addition.") logger.warning("Value already exists. Preventing duplicate addition.")
return return
else: else:
@@ -572,6 +572,7 @@ class BasicSubmission(BaseClass, LogMixin):
else: else:
if value: if value:
existing.append(value) existing.append(value)
self.__setattr__(key, existing) self.__setattr__(key, existing)
# NOTE: Make sure this gets updated by telling SQLAlchemy it's been modified. # NOTE: Make sure this gets updated by telling SQLAlchemy it's been modified.
flag_modified(self, key) flag_modified(self, key)
@@ -1223,10 +1224,19 @@ class BasicSubmission(BaseClass, LogMixin):
if "submitted_date" not in kwargs.keys(): if "submitted_date" not in kwargs.keys():
instance.submitted_date = date.today() instance.submitted_date = date.today()
else: else:
from frontend.widgets.pop_ups import QuestionAsker
logger.warning(f"Found existing instance: {instance}, asking to overwrite.") logger.warning(f"Found existing instance: {instance}, asking to overwrite.")
# code = 1
# msg = "This submission already exists.\nWould you like to overwrite?"
# report.add_result(Result(msg=msg, code=code))
dlg = QuestionAsker(title="Overwrite?", message="This submission already exists.\nWould you like to overwrite?")
if dlg.exec():
pass
else:
code = 1 code = 1
msg = "This submission already exists.\nWould you like to overwrite?" msg = "This submission already exists.\nWould you like to overwrite?"
report.add_result(Result(msg=msg, code=code)) report.add_result(Result(msg=msg, code=code))
return None, report
return instance, report return instance, report
# NOTE: Custom context events for the ui # NOTE: Custom context events for the ui
@@ -1528,6 +1538,7 @@ class Wastewater(BasicSubmission):
dummy_samples.append(thing) dummy_samples.append(thing)
output['origin_plate'] = self.__class__.make_plate_map(sample_list=dummy_samples, plate_rows=4, output['origin_plate'] = self.__class__.make_plate_map(sample_list=dummy_samples, plate_rows=4,
plate_columns=6) plate_columns=6)
# logger.debug(f"PCR info: {output['pcr_info']}")
return output return output
@classmethod @classmethod

View File

@@ -483,7 +483,6 @@ class PydSubmission(BaseModel, extra='allow'):
value['value'] = output.replace(tzinfo=timezone) value['value'] = output.replace(tzinfo=timezone)
return value return value
@field_validator("submitting_lab", mode="before") @field_validator("submitting_lab", mode="before")
@classmethod @classmethod
def rescue_submitting_lab(cls, value): def rescue_submitting_lab(cls, value):
@@ -772,7 +771,7 @@ class PydSubmission(BaseModel, extra='allow'):
return missing_info, missing_reagents return missing_info, missing_reagents
@report_result @report_result
def to_sql(self) -> Tuple[BasicSubmission, Report]: def to_sql(self) -> Tuple[BasicSubmission | None, Report]:
""" """
Converts this instance into a backend.db.models.submissions.BasicSubmission instance Converts this instance into a backend.db.models.submissions.BasicSubmission instance
@@ -782,12 +781,19 @@ class PydSubmission(BaseModel, extra='allow'):
report = Report() report = Report()
dicto = self.improved_dict() dicto = self.improved_dict()
logger.debug(f"Pydantic submission type: {self.submission_type['value']}") logger.debug(f"Pydantic submission type: {self.submission_type['value']}")
logger.debug(f"Pydantic improved_dict: {pformat(dicto)}")
# At this point, pcr_info is not duplicated
instance, result = BasicSubmission.query_or_create(submission_type=self.submission_type['value'], instance, result = BasicSubmission.query_or_create(submission_type=self.submission_type['value'],
rsl_plate_num=self.rsl_plate_num['value']) rsl_plate_num=self.rsl_plate_num['value'])
logger.debug(f"Created or queried instance: {instance}") # logger.debug(f"Created or queried instance: {instance}")
if instance is None:
report.add_result(Result(msg="Overwrite Cancelled."))
return None, report
report.add_result(result) report.add_result(result)
self.handle_duplicate_samples() self.handle_duplicate_samples()
for key, value in dicto.items(): for key, value in dicto.items():
logger.debug(f"Checking key {key}, value {value}")
# At this point, pcr_info is not duplicated.
if isinstance(value, dict): if isinstance(value, dict):
try: try:
value = value['value'] value = value['value']
@@ -843,6 +849,8 @@ class PydSubmission(BaseModel, extra='allow'):
value = value value = value
instance.set_attribute(key=key, value=value) instance.set_attribute(key=key, value=value)
case item if item in instance.jsons: case item if item in instance.jsons:
# At this point pcr_info is not duplicated
logger.debug(f"Validating json value: {item} to value:{pformat(value)}")
try: try:
ii = value.items() ii = value.items()
except AttributeError: except AttributeError:
@@ -851,7 +859,9 @@ class PydSubmission(BaseModel, extra='allow'):
if isinstance(v, datetime): if isinstance(v, datetime):
value[k] = v.strftime("%Y-%m-%d %H:%M:%S") value[k] = v.strftime("%Y-%m-%d %H:%M:%S")
else: else:
value[k] = v pass
logger.debug(f"Setting json value: {item} to value:{pformat(value)}")
# At this point, pcr_info is not duplicated.
instance.set_attribute(key=key, value=value) instance.set_attribute(key=key, value=value)
case _: case _:
try: try:
@@ -899,6 +909,10 @@ class PydSubmission(BaseModel, extra='allow'):
SubmissionFormWidget: Submission form widget SubmissionFormWidget: Submission form widget
""" """
from frontend.widgets.submission_widget import SubmissionFormWidget from frontend.widgets.submission_widget import SubmissionFormWidget
try:
logger.debug(f"PCR info: {self.pcr_info}")
except AttributeError:
pass
return SubmissionFormWidget(parent=parent, submission=self, disable=disable) return SubmissionFormWidget(parent=parent, submission=self, disable=disable)
def to_writer(self) -> "SheetWriter": def to_writer(self) -> "SheetWriter":
@@ -1124,7 +1138,7 @@ class PydReagentRole(BaseModel):
class PydKitType(BaseModel): class PydKitType(BaseModel):
name: str name: str
reagent_roles: List[PydReagentRole] = [] reagent_roles: List[PydReagent] = []
@report_result @report_result
def to_sql(self) -> Tuple[KitType, Report]: def to_sql(self) -> Tuple[KitType, Report]:

View File

@@ -9,6 +9,7 @@ from PyQt6.QtWidgets import (
QHBoxLayout, QScrollArea, QMainWindow, QHBoxLayout, QScrollArea, QMainWindow,
QToolBar QToolBar
) )
import pickle
from PyQt6.QtGui import QAction from PyQt6.QtGui import QAction
from pathlib import Path from pathlib import Path
from markdown import markdown from markdown import markdown
@@ -238,11 +239,17 @@ class App(QMainWindow):
@under_development @under_development
def manage_kits(self, *args, **kwargs): def manage_kits(self, *args, **kwargs):
dlg = ManagerWindow(parent=self, object_type=KitType, extras=[], add_edit='edit', managers=set()) from frontend.widgets.omni_manager_pydant import ManagerWindow as ManagerWindowPyd
dlg = ManagerWindowPyd(parent=self, object_type=KitType, extras=[], add_edit='edit', managers=set())
if dlg.exec(): if dlg.exec():
output = dlg.parse_form() output = dlg.parse_form()
assert isinstance(output, KitType) # assert isinstance(output, KitType)
output.save() # output.save()
logger.debug(f"Kit output: {pformat(output.__dict__)}")
# output.to_sql()
with open(f"{output.name}.obj", "wb") as f:
pickle.dump(output, f)
class AddSubForm(QWidget): class AddSubForm(QWidget):

View File

@@ -211,7 +211,7 @@ class ManagerWindow(QDialog):
else: else:
value = current_value + [data] value = current_value + [data]
setattr(self.instance, name, value) setattr(self.instance, name, value)
self.instance.save() # self.instance.save()
def toggle_textedit(self, caller_child=None): def toggle_textedit(self, caller_child=None):
already_exists = self.findChildren(LargeTextEdit) already_exists = self.findChildren(LargeTextEdit)
@@ -369,7 +369,7 @@ class EditRelationship(QWidget):
new_instance = dlg.parse_form() new_instance = dlg.parse_form()
# NOTE: My custom __setattr__ should take care of any list problems. # NOTE: My custom __setattr__ should take care of any list problems.
self.parent().instance.__setattr__(self.objectName(), new_instance) self.parent().instance.__setattr__(self.objectName(), new_instance)
self.parent().instance.save() # self.parent().instance.save()
self.parent().update_data() self.parent().update_data()
def add_existing(self): def add_existing(self):
@@ -381,7 +381,7 @@ class EditRelationship(QWidget):
instance = self.class_object.query(**row) instance = self.class_object.query(**row)
# NOTE: My custom __setattr__ should take care of any list problems. # NOTE: My custom __setattr__ should take care of any list problems.
self.parent().instance.__setattr__(self.objectName(), instance) self.parent().instance.__setattr__(self.objectName(), instance)
self.parent().instance.save() # self.parent().instance.save()
self.parent().update_data() self.parent().update_data()
def set_data(self) -> None: def set_data(self) -> None:

View File

@@ -371,6 +371,9 @@ class SubmissionFormWidget(QWidget):
case _: case _:
pass pass
# NOTE: add reagents to submission object # NOTE: add reagents to submission object
if base_submission is None:
# self.app.table_widget.sub_wid.setData()
return
for reagent in base_submission.reagents: for reagent in base_submission.reagents:
reagent.update_last_used(kit=base_submission.extraction_kit) reagent.update_last_used(kit=base_submission.extraction_kit)
save_output = base_submission.save() save_output = base_submission.save()

View File

@@ -821,7 +821,7 @@ def check_object_in_manager(manager: list, object_name: object) -> Tuple[Any, bo
# for manager in managers: # for manager in managers:
if manager is None: if manager is None:
return None, False return None, False
logger.debug(f"Manager: {manager}, aliases: {manager.aliases}, Key: {object_name}") # logger.debug(f"Manager: {manager}, aliases: {manager.aliases}, Key: {object_name}")
if object_name in manager.aliases: if object_name in manager.aliases:
return manager, True return manager, True
relationships = [getattr(manager.__class__, item) for item in dir(manager.__class__) relationships = [getattr(manager.__class__, item) for item in dir(manager.__class__)
@@ -1113,7 +1113,7 @@ def report_result(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
logger.info(f"Report result being called by {func.__name__}") # logger.info(f"Report result being called by {func.__name__}")
output = func(*args, **kwargs) output = func(*args, **kwargs)
match output: match output:
case Report(): case Report():
@@ -1201,6 +1201,28 @@ def create_holidays_for_year(year: int | None = None) -> List[date]:
return sorted(holidays) return sorted(holidays)
def check_dictionary_inclusion_equality(listo: List[dict] | dict, dicto: dict) -> bool:
"""
Determines if a dictionary is in a list of dictionaries (possible ordering issue with just using dict in list)
Args:
listo (List[dict): List of dictionaries to compare to.
dicto (dict): Dictionary to compare.
Returns:
bool: True if dicto is equal to any dictionary in the list.
"""
logger.debug(f"Comparing: {listo} and {dicto}")
if isinstance(dicto, list) and isinstance(listo, list):
return listo == dicto
elif isinstance(dicto, dict) and isinstance(listo, dict):
return listo == dicto
elif isinstance(dicto, dict) and isinstance(listo, list):
return any([dicto == d for d in listo])
else:
raise TypeError(f"Unsupported variable: {type(listo)}")
class classproperty(property): class classproperty(property):
def __get__(self, owner_self, owner_cls): def __get__(self, owner_self, owner_cls):
return self.fget(owner_cls) return self.fget(owner_cls)