Switching to pydantic as primary object for Omni-manager

This commit is contained in:
lwark
2025-02-05 10:06:04 -06:00
parent 05421cc1d4
commit f57aa3c3f0
6 changed files with 117 additions and 89 deletions

View File

@@ -53,6 +53,7 @@ class BaseClass(Base):
singles = ['id']
omni_removes = ["id", 'submissions', "omnigui_class_dict", "omnigui_instance_dict"]
omni_sort = ["name"]
omni_inheritable = []
@classproperty
def skip_on_edit(cls):
@@ -330,7 +331,15 @@ class BaseClass(Base):
query_kwargs = {relationship_instance.query_alias: relationship_instance}
return cls.query(**query_kwargs)
def check_all_attributes(self, attributes: dict):
def check_all_attributes(self, attributes: dict) -> bool:
"""
Args:
attributes (dict): A dictionary of attributes to be check for equivalence
Returns:
bool: If a single unequivocal value is found will be false, else true.
"""
logger.debug(f"Incoming attributes: {attributes}")
for key, value in attributes.items():
# print(getattr(self.__class__, key).property)
@@ -365,7 +374,11 @@ class BaseClass(Base):
return True
def __setattr__(self, key, value):
logger.debug(f"Attempting to set {key} to {pformat(value)}")
"""
Custom dunder method to handle potential list relationship issues.
"""
if key != "_sa_instance_state":
logger.debug(f"Attempting to set {key} to {pformat(value)}")
try:
field_type = getattr(self.__class__, key)
except AttributeError:
@@ -395,8 +408,10 @@ class BaseClass(Base):
else:
raise ValueError("Object is too long to parse a single value.")
return super().__setattr__(key, value)
case _:
return super().__setattr__(key, value)
else:
super().__setattr__(key, value)
return super().__setattr__(key, value)
class ConfigItem(BaseClass):

View File

@@ -380,6 +380,9 @@ class KitType(BaseClass):
new_process.equipment_roles.append(new_role)
return new_kit
def to_pydantic(self):
pass
class ReagentRole(BaseClass):
"""
@@ -1232,6 +1235,7 @@ class KitTypeReagentRoleAssociation(BaseClass):
omni_removes = BaseClass.omni_removes + ["submission_type_id", "kits_id", "reagent_roles_id", "last_used"]
omni_sort = ["submission_type", "kit_type", "reagent_role", "required", "uses"]
omni_inheritable = ["submission_type", "kit_type"]
reagent_roles_id = Column(INTEGER, ForeignKey("_reagentrole.id"),
primary_key=True) #: id of associated reagent type

View File

@@ -417,9 +417,11 @@ class BasicSubmission(BaseClass, LogMixin):
except Exception as e:
logger.error(f"Column count error: {e}")
# NOTE: Get kit associated with this submission
logger.debug(f"Checking associations with submission type: {self.submission_type_name}")
assoc = next((item for item in self.extraction_kit.kit_submissiontype_associations if
item.submission_type == self.submission_type),
None)
logger.debug(f"Got association: {assoc}")
# NOTE: If every individual cost is 0 this is probably an old plate.
if all(item == 0.0 for item in [assoc.constant_cost, assoc.mutable_cost_column, assoc.mutable_cost_sample]):
try:
@@ -1325,7 +1327,10 @@ class BasicSubmission(BaseClass, LogMixin):
if dlg.exec():
equipment = dlg.parse_form()
for equip in equipment:
logger.debug(f"Parsed equipment: {equip}")
_, assoc = equip.to_sql(submission=self)
logger.debug(f"Got equipment association: {assoc.__dict__}")
try:
assoc.save()
except AttributeError as e:

View File

@@ -355,12 +355,14 @@ class PydEquipment(BaseModel, extra='ignore'):
# TODO: This seems precarious. What if there is more than one process?
# NOTE: It looks like the way fetching the processes is done in the SQL model, this shouldn't be a problem, but I'll include a failsafe.
# NOTE: I need to find a way to filter this by the kit involved.
if len(self.processes) > 1:
process = Process.query(submissiontype=submission.get_submission_type(), kittype=extraction_kit, equipmentrole=self.role)
else:
process = Process.query(name=self.processes[0])
if process is None:
logger.error(f"Found unknown process: {process}.")
logger.debug(f"Using process: {process}")
assoc.process = process
assoc.role = self.role
else:
@@ -779,8 +781,10 @@ class PydSubmission(BaseModel, extra='allow'):
"""
report = Report()
dicto = self.improved_dict()
logger.debug(f"Pydantic submission type: {self.submission_type['value']}")
instance, result = BasicSubmission.query_or_create(submission_type=self.submission_type['value'],
rsl_plate_num=self.rsl_plate_num['value'])
logger.debug(f"Created or queried instance: {instance}")
report.add_result(result)
self.handle_duplicate_samples()
for key, value in dicto.items():
@@ -864,6 +868,7 @@ class PydSubmission(BaseModel, extra='allow'):
continue
else:
logger.warning(f"{key} already == {value} so no updating.")
logger.debug(f"Entering cost calculation for {instance}")
try:
instance.calculate_base_cost()
except (TypeError, AttributeError) as e: