Updating method to get dict for details.

This commit is contained in:
lwark
2025-06-10 15:05:08 -05:00
parent 58c669bc30
commit 592073c2a1
5 changed files with 124 additions and 45 deletions

View File

@@ -234,10 +234,9 @@ class BaseClass(Base):
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Any, bool]:
new = False
allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute)
and not isinstance(v.property, _RelationshipDeclared)]
allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute)]
# and not isinstance(v.property, _RelationshipDeclared)]
sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed}
logger.debug(f"Sanitized kwargs: {sanitized_kwargs}")
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
@@ -554,6 +553,8 @@ class BaseClass(Base):
output_date = datetime.combine(output_date, addition_time).strftime("%Y-%m-%d %H:%M:%S")
return output_date
def details_dict(self):
dicto = {k:v for k,v in self.__dict__.items() if not k.startswith("_")}

View File

@@ -1082,7 +1082,7 @@ class ProcedureType(BaseClass):
equipment = association_proxy("proceduretypeequipmentroleassociation", "equipmentrole",
creator=lambda eq: ProcedureTypeEquipmentRoleAssociation(
equipment_role=eq)) #: Proxy of equipmentrole associations
equipmentrole=eq)) #: Proxy of equipmentrole associations
kittypereagentroleassociation = relationship(
"KitTypeReagentRoleAssociation",
@@ -1330,10 +1330,34 @@ class Procedure(BaseClass):
rs = results_class(procedure=self, parent=obj)
def add_equipment(self, obj):
"""
Creates widget for adding equipment to this submission
Args:
obj (_type_): parent widget
"""
logger.debug(f"Add equipment")
from frontend.widgets.equipment_usage import EquipmentUsage
dlg = EquipmentUsage(parent=obj, procedure=self)
if dlg.exec():
pass
equipment = dlg.parse_form()
for equip in equipment:
logger.debug(f"Parsed equipment: {equip}")
_, assoc = equip.to_sql(procedure=self)
logger.debug(f"Got equipment association: {assoc} for {equip}")
try:
assoc.save()
except AttributeError as e:
logger.error(f"Couldn't save association with {equip} due to {e}")
if equip.tips:
for tips in equip.tips:
# logger.debug(f"Attempting to add tips assoc: {tips} (pydantic)")
tassoc, _ = tips.to_sql(procedure=self)
# logger.debug(f"Attempting to add tips assoc: {tips.__dict__} (sql)")
if tassoc not in self.proceduretipsassociation:
tassoc.save()
else:
logger.error(f"Tips already found in submission, skipping.")
def edit(self, obj):
logger.debug("Edit!")
@@ -1348,8 +1372,6 @@ class Procedure(BaseClass):
logger.debug("Delete!")
class ProcedureTypeKitTypeAssociation(BaseClass):
"""
Abstract of relationship between kits and their procedure type.
@@ -1860,7 +1882,7 @@ class Equipment(BaseClass, LogMixin):
proceduretype = ProcedureType.query(name=proceduretype)
if isinstance(kittype, str):
kittype = KitType.query(name=kittype)
for process in self.processes:
for process in self.process:
if proceduretype not in process.proceduretype:
continue
if kittype and kittype not in process.kittype:
@@ -1872,6 +1894,7 @@ class Equipment(BaseClass, LogMixin):
@classmethod
@setup_lookup
def query(cls,
id: int | None = None,
name: str | None = None,
nickname: str | None = None,
asset_number: str | None = None,
@@ -1890,6 +1913,12 @@ class Equipment(BaseClass, LogMixin):
Equipment|List[Equipment]: Equipment or list of equipment matching query parameters.
"""
query = cls.__database_session__.query(cls)
match id:
case int():
query = query.filter(cls.id == id)
limit = 1
case _:
pass
match name:
case str():
query = query.filter(cls.name == name)
@@ -1925,7 +1954,8 @@ class Equipment(BaseClass, LogMixin):
from backend.validators.pydant import PydEquipment
processes = self.get_processes(proceduretype=proceduretype, kittype=kittype,
equipmentrole=equipmentrole)
return PydEquipment(processes=processes, role=equipmentrole,
logger.debug(f"EquipmentRole: {equipmentrole}")
return PydEquipment(processes=processes, equipmentrole=equipmentrole,
**self.to_dict(processes=False))
@classproperty
@@ -2046,7 +2076,7 @@ class EquipmentRole(BaseClass):
Returns:
dict: This EquipmentRole dict
"""
return {key: value for key, value in self.__dict__.items() if key != "process"}
return {key: value for key, value in self.__dict__.items() if key != "process" and key != "equipment"}
def to_pydantic(self, proceduretype: ProcedureType,
kittype: str | KitType | None = None) -> "PydEquipmentRole":
@@ -2061,7 +2091,7 @@ class EquipmentRole(BaseClass):
PydEquipmentRole: This EquipmentRole as PydEquipmentRole
"""
from backend.validators.pydant import PydEquipmentRole
equipment = [item.to_pydantic(proceduretype=proceduretype, kittype=kittype) for item in
equipment = [item.to_pydantic(proceduretype=proceduretype, kittype=kittype, equipmentrole=self) for item in
self.equipment]
pyd_dict = self.to_dict()
pyd_dict['process'] = self.get_processes(proceduretype=proceduretype, kittype=kittype)
@@ -2131,7 +2161,7 @@ class EquipmentRole(BaseClass):
proceduretype = SubmissionType.query(name=proceduretype)
if isinstance(kittype, str):
kittype = KitType.query(name=kittype)
for process in self.processes:
for process in self.process:
if proceduretype and proceduretype not in process.proceduretype:
continue
if kittype and kittype not in process.kittype:
@@ -2163,10 +2193,23 @@ class ProcedureEquipmentAssociation(BaseClass):
equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment
def __repr__(self) -> str:
return f"<ProcedureEquipmentAssociation({self.procedure.name} & {self.equipment.name})>"
try:
return f"<ProcedureEquipmentAssociation({self.procedure.name} & {self.equipment.name})>"
except AttributeError:
return "<ProcedureEquipmentAssociation(Unknown)>"
def __init__(self, procedure, equipment, equipmentrole: str = "None"):
self.run = procedure
def __init__(self, procedure=None, equipment=None, procedure_id:int|None=None, equipment_id:int|None=None, equipmentrole: str = "None"):
if not procedure:
if procedure_id:
procedure = Procedure.query(id=procedure_id)
else:
logger.error("Creation error")
self.procedure = procedure
if not equipment:
if equipment_id:
equipment = Equipment.query(id=equipment_id)
else:
logger.error("Creation error")
self.equipment = equipment
self.equipmentrole = equipmentrole
@@ -2201,12 +2244,27 @@ class ProcedureEquipmentAssociation(BaseClass):
@classmethod
@setup_lookup
def query(cls, equipment_id: int | None = None, run_id: int | None = None, equipmentrole: str | None = None,
def query(cls,
equipment: int | Equipment | None = None,
procedure: int | Procedure | None = None,
equipmentrole: str | None = None,
limit: int = 0, **kwargs) \
-> Any | List[Any]:
query: Query = cls.__database_session__.query(cls)
query = query.filter(cls.equipment_id == equipment_id)
query = query.filter(cls.run_id == run_id)
match equipment:
case int():
query = query.filter(cls.equipment_id == equipment)
case Equipment():
query = query.filter(cls.equipment == equipment)
case _:
pass
match procedure:
case int():
query = query.filter(cls.procedure_id == procedure)
case Procedure():
query = query.filter(cls.procedure == procedure)
case _:
pass
if equipmentrole is not None:
query = query.filter(cls.equipmentrole == equipmentrole)
return cls.execute_query(query=query, limit=limit, **kwargs)
@@ -2632,7 +2690,7 @@ class ProcedureTipsAssociation(BaseClass):
back_populates="proceduretipsassociation") #: associated procedure
tips = relationship(Tips,
back_populates="tipsprocedureassociation") #: associated equipment
role_name = Column(String(32), primary_key=True) #, ForeignKey("_tiprole.name"))
tiprole = Column(String(32), primary_key=True) #, ForeignKey("_tiprole.name"))
def to_sub_dict(self) -> dict:
"""
@@ -2645,23 +2703,34 @@ class ProcedureTipsAssociation(BaseClass):
@classmethod
@setup_lookup
def query(cls, tips_id: int, role_name: str, procedure_id: int | None = None, limit: int = 0, **kwargs) \
def query(cls, tips: int|Tips, tiprole: str, procedure: int | Procedure | None = None, limit: int = 0, **kwargs) \
-> Any | List[Any]:
query: Query = cls.__database_session__.query(cls)
query = query.filter(cls.tips_id == tips_id)
if procedure_id is not None:
query = query.filter(cls.procedure_id == procedure_id)
query = query.filter(cls.role_name == role_name)
match tips:
case int():
query = query.filter(cls.tips_id == tips)
case Tips():
query = query.filter(cls.tips == tips)
case _:
pass
match procedure:
case int():
query = query.filter(cls.procedure_id == procedure)
case Procedure():
query = query.filter(cls.procedure == procedure)
case _:
pass
query = query.filter(cls.tiprole == tiprole)
return cls.execute_query(query=query, limit=limit, **kwargs)
# TODO: fold this into the BaseClass.query_or_create ?
@classmethod
def query_or_create(cls, tips, run, role: str, **kwargs):
kwargs['limit'] = 1
instance = cls.query(tips_id=tips.id, role_name=role, procedure_id=run.id, **kwargs)
if instance is None:
instance = cls(run=run, tips=tips, role_name=role)
return instance
# @classmethod
# def query_or_create(cls, tips, procedure, role: str, **kwargs):
# kwargs['limit'] = 1
# instance = cls.query(tips_id=tips.id, role_name=role, procedure_id=procedure.id, **kwargs)
# if instance is None:
# instance = cls(procedure=procedure, tips=tips, role_name=role)
# return instance
def to_pydantic(self):
from backend.validators import PydTips