MVP working (AFAIK)
This commit is contained in:
@@ -672,7 +672,8 @@ class BaseClass(Base):
|
||||
pyd = getattr(pydant, pyd_model_name)
|
||||
except AttributeError:
|
||||
raise AttributeError(f"Could not get pydantic class {pyd_model_name}")
|
||||
logger.debug(f"Kwargs: {kwargs}")
|
||||
# logger.debug(f"Kwargs: {kwargs}")
|
||||
# logger.debug(f"Dict: {pformat(self.details_dict())}")
|
||||
return pyd(**self.details_dict(**kwargs))
|
||||
|
||||
def show_details(self, obj):
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -340,24 +340,24 @@ class PydSample(PydBaseClass):
|
||||
class PydTips(PydBaseClass):
|
||||
name: str
|
||||
lot: str | None = Field(default=None)
|
||||
tiprole: str
|
||||
# tips: str
|
||||
|
||||
@field_validator('tiprole', mode='before')
|
||||
@classmethod
|
||||
def get_role_name(cls, value):
|
||||
if isinstance(value, list):
|
||||
output = []
|
||||
for tiprole in value:
|
||||
if isinstance(tiprole, TipRole):
|
||||
tiprole = tiprole.name
|
||||
return tiprole
|
||||
value = output
|
||||
if isinstance(value, TipRole):
|
||||
value = value.name
|
||||
return value
|
||||
# @field_validator('tips', mode='before')
|
||||
# @classmethod
|
||||
# def get_role_name(cls, value):
|
||||
# if isinstance(value, list):
|
||||
# output = []
|
||||
# for tips in value:
|
||||
# if isinstance(tips, Tips):
|
||||
# tips = tips.name
|
||||
# output.append(tips)
|
||||
# value = output
|
||||
# if isinstance(value, Tips):
|
||||
# value = value.name
|
||||
# return value
|
||||
|
||||
@report_result
|
||||
def to_sql(self, procedure: Run) -> ProcedureTipsAssociation:
|
||||
def to_sql(self, procedure: Run) -> Tuple[Tips, Report]:
|
||||
"""
|
||||
Convert this object to the SQL version for database storage.
|
||||
|
||||
@@ -368,11 +368,11 @@ class PydTips(PydBaseClass):
|
||||
SubmissionTipsAssociation: Association between queried tips and procedure
|
||||
"""
|
||||
report = Report()
|
||||
tips = Tips.query(name=self.name, limit=1)
|
||||
tips = TipsLot.query(name=self.name, limit=1)
|
||||
# logger.debug(f"Tips query has yielded: {tips}")
|
||||
assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1)
|
||||
logger.debug(f"Got association: {assoc}")
|
||||
return assoc, report
|
||||
# assoc = ProcedureTipsAssociation.query_or_create(tips=tips, procedure=procedure, tiprole=self.tiprole, limit=1)
|
||||
# logger.debug(f"Got association: {assoc}")
|
||||
return tips, report
|
||||
|
||||
|
||||
class PydEquipment(PydBaseClass):
|
||||
@@ -382,7 +382,7 @@ class PydEquipment(PydBaseClass):
|
||||
# process: List[dict] | None
|
||||
process: List[PydProcess] | PydProcess | None
|
||||
equipmentrole: str | PydEquipmentRole | None
|
||||
tips: List[PydTips] | PydTips | None = Field(default=[])
|
||||
# tips: List[PydTips] | PydTips | None = Field(default=[])
|
||||
|
||||
@field_validator('equipmentrole', mode='before')
|
||||
@classmethod
|
||||
@@ -408,42 +408,42 @@ class PydEquipment(PydBaseClass):
|
||||
value = convert_nans_to_nones(value)
|
||||
if not value:
|
||||
value = []
|
||||
logger.debug(value)
|
||||
try:
|
||||
# value = [item.strip() for item in value]
|
||||
d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None)
|
||||
# logger.debug(f"Next process: {d.details_dict()}")
|
||||
value = d.to_pydantic()
|
||||
print(f"Next process: {d.details_dict()}")
|
||||
if d:
|
||||
# value = PydProcess(**d.details_dict())
|
||||
value = d.to_pydantic()
|
||||
# value = next((process.to_pydantic() for process in value))
|
||||
except AttributeError as e:
|
||||
logger.error(f"Process Validation error due to {e}")
|
||||
pass
|
||||
return value
|
||||
|
||||
@field_validator('tips', mode='before')
|
||||
@classmethod
|
||||
def tips_to_pydantic(cls, value):
|
||||
match value:
|
||||
case list():
|
||||
output = []
|
||||
for tips in value:
|
||||
match tips:
|
||||
case Tips():
|
||||
tips = tips.to_pydantic()
|
||||
case dict():
|
||||
tips = PydTips(**tips)
|
||||
case _:
|
||||
continue
|
||||
output.append(tips)
|
||||
case _:
|
||||
output = value
|
||||
return output
|
||||
|
||||
@field_validator('tips')
|
||||
@classmethod
|
||||
def single_out_tips(cls, value, values):
|
||||
return value
|
||||
# @field_validator('tips', mode='before')
|
||||
# @classmethod
|
||||
# def tips_to_pydantic(cls, value):
|
||||
# match value:
|
||||
# case list():
|
||||
# output = []
|
||||
# for tips in value:
|
||||
# match tips:
|
||||
# case Tips():
|
||||
# tips = tips.to_pydantic()
|
||||
# case dict():
|
||||
# tips = PydTips(**tips)
|
||||
# case _:
|
||||
# continue
|
||||
# output.append(tips)
|
||||
# case _:
|
||||
# output = value
|
||||
# return output
|
||||
#
|
||||
# @field_validator('tips')
|
||||
# @classmethod
|
||||
# def single_out_tips(cls, value, values):
|
||||
# return value
|
||||
|
||||
@report_result
|
||||
def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[
|
||||
@@ -483,10 +483,10 @@ class PydEquipment(PydBaseClass):
|
||||
if len(self.processes) > 1:
|
||||
process = Process.query(proceduretype=procedure.get_submission_type(), equipmentrole=self.role)
|
||||
else:
|
||||
process = Process.query(name=self.processes[0])
|
||||
process = Process.query(name=self.processes[0], limit=1)
|
||||
if process is None:
|
||||
logger.error(f"Found unknown process: {process}.")
|
||||
# logger.debug(f"Using process: {process}")
|
||||
logger.debug(f"Using process: {process}")
|
||||
assoc.process = process
|
||||
assoc.equipmentrole = self.equipmentrole
|
||||
else:
|
||||
@@ -1173,13 +1173,10 @@ class PydEquipmentRole(BaseModel):
|
||||
class PydProcess(PydBaseClass, extra="allow"):
|
||||
name: str
|
||||
version: str = Field(default="1")
|
||||
proceduretype: List[str]
|
||||
equipment: List[str]
|
||||
equipmentrole: List[str]
|
||||
# kittype: List[str]
|
||||
tiprole: List[str]
|
||||
# equipment: List[str]
|
||||
tips: List[PydTips]
|
||||
|
||||
@field_validator("proceduretype", "equipment", "equipmentrole", "tiprole", mode="before")
|
||||
@field_validator("tips", mode="before")
|
||||
@classmethod
|
||||
def enforce_list(cls, value):
|
||||
# logger.debug(f"Validating field: {value}")
|
||||
@@ -1193,6 +1190,14 @@ class PydProcess(PydBaseClass, extra="allow"):
|
||||
output.append(v)
|
||||
return output
|
||||
|
||||
@field_validator("tips", mode="before")
|
||||
@classmethod
|
||||
def validate_tips(cls, value):
|
||||
if not value:
|
||||
return []
|
||||
value = [item for item in value if item]
|
||||
return value
|
||||
|
||||
@report_result
|
||||
def to_sql(self):
|
||||
report = Report()
|
||||
@@ -1202,30 +1207,6 @@ class PydProcess(PydBaseClass, extra="allow"):
|
||||
logger.debug(f"Got instance: {instance}")
|
||||
if not instance:
|
||||
instance = ProcessVersion()
|
||||
# fields = [item for item in self.model_fields]
|
||||
# for field in fields:
|
||||
# logger.debug(f"Field: {field}")
|
||||
# try:
|
||||
# field_type = getattr(instance.__class__, field).property
|
||||
# except AttributeError:
|
||||
# logger.error(f"No attribute: {field} in {instance.__class__}")
|
||||
# continue
|
||||
# match field_type:
|
||||
# case _RelationshipDeclared():
|
||||
# logger.debug(f"{field} is a relationship with {field_type.entity.class_}")
|
||||
# query_str = getattr(self, field)
|
||||
# if isinstance(query_str, list):
|
||||
# query_str = query_str[0]
|
||||
# if query_str in ["", " ", None]:
|
||||
# continue
|
||||
# logger.debug(f"Querying {field_type.entity.class_} with name {query_str}")
|
||||
# field_value = field_type.entity.class_.query(name=query_str)
|
||||
# logger.debug(f"{field} query result: {field_value}")
|
||||
# case ColumnProperty():
|
||||
# logger.debug(f"{field} is a property.")
|
||||
# field_value = getattr(self, field)
|
||||
# # instance.set_attribute(key=field, value=field_value)
|
||||
# setattr(instance, field, field_value)
|
||||
return instance, report
|
||||
|
||||
|
||||
@@ -1570,6 +1551,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
|
||||
# equip = Equipment.query(name=equipment.name)
|
||||
equip, _ = equipment.to_sql()
|
||||
logger.debug(f"Process: {equipment.process}")
|
||||
if isinstance(equipment.process, list):
|
||||
equipment.process = equipment.process[0]
|
||||
if equip not in sql.equipment:
|
||||
equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql,
|
||||
equipmentrole=equip.equipmentrole[0])
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
'''
|
||||
Creates forms that the user can enter equipment info into.
|
||||
'''
|
||||
import sys
|
||||
from pprint import pformat
|
||||
from PyQt6.QtCore import Qt, QSignalBlocker, pyqtSlot
|
||||
from PyQt6.QtWebChannel import QWebChannel
|
||||
@@ -98,6 +99,7 @@ class EquipmentUsage(QDialog):
|
||||
|
||||
@pyqtSlot(str, str, str, str)
|
||||
def update_equipment(self, equipmentrole: str, equipment: str, process: str, tips: str):
|
||||
|
||||
try:
|
||||
equipment_of_interest = next(
|
||||
(item for item in self.procedure.equipment if item.equipmentrole == equipmentrole))
|
||||
|
||||
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Any, List
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.db.models import Run, Procedure
|
||||
from backend.validators import PydProcedure
|
||||
from backend.validators import PydProcedure, PydEquipment
|
||||
from tools import jinja_template_loading, get_application_from_parent, render_details_template, sanitize_object_for_json
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
@@ -92,7 +92,7 @@ class ProcedureCreation(QDialog):
|
||||
equipmentrole['equipment'].index(item_in_er_list)))
|
||||
proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True)
|
||||
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
|
||||
|
||||
logger.debug(proceduretype_dict['equipment'])
|
||||
self.update_equipment = EquipmentUsage.update_equipment
|
||||
regex = re.compile(r".*R\d$")
|
||||
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
|
||||
@@ -124,18 +124,26 @@ class ProcedureCreation(QDialog):
|
||||
if equipment_of_interest:
|
||||
eoi = self.procedure.equipment.pop(self.procedure.equipment.index(equipment_of_interest))
|
||||
else:
|
||||
eoi = equipment.to_pydantic(equipmentrole=equipmentrole, proceduretype=self.procedure.proceduretype)
|
||||
eoi: PydEquipment = equipment.to_pydantic(equipmentrole=equipmentrole)
|
||||
eoi.name = equipment.name
|
||||
eoi.asset_number = equipment.asset_number
|
||||
eoi.nickname = equipment.nickname
|
||||
process = next((prcss for prcss in equipment.process if prcss.name == process), None)
|
||||
if process:
|
||||
eoi.process = process.to_pydantic()
|
||||
tips = next((tps for tps in equipment.tips if tps.name == tips), None)
|
||||
if tips:
|
||||
eoi.tips = tips.to_pydantic()
|
||||
logger.warning("Setting processes.")
|
||||
eoi.process = [process for process in equipment.get_processes(equipmentrole=equipmentrole)]
|
||||
# process = next((prcss for prcss in equipment.process if prcss.name == process), None)
|
||||
# if process:
|
||||
# for process in equipment.process:
|
||||
# logger.debug(f"Retrieved process: {process}")
|
||||
# # tips = [tip.to_pydantic() for tip in process.tips]
|
||||
# # if tips:
|
||||
# # process.tips = tips
|
||||
# # else:
|
||||
# # process.tips = [""]
|
||||
# eoi.process.append(process.to_pydantic())
|
||||
# # tips = next((tps for tps in process.tips if tps.name == tips), None)
|
||||
|
||||
self.procedure.equipment.append(eoi)
|
||||
logger.debug(f"Updated equipment: {self.procedure.equipment}")
|
||||
logger.debug(f"Updated equipment: {pformat(self.procedure.equipment)}")
|
||||
|
||||
@pyqtSlot(str, str)
|
||||
def text_changed(self, key: str, new_value: str):
|
||||
|
||||
@@ -62,7 +62,7 @@ class SubmissionDetails(QDialog):
|
||||
css = f.read()
|
||||
key = object.__class__.__name__.lower()
|
||||
d = {key: details}
|
||||
logger.debug(f"Using details: {pformat(d)}")
|
||||
logger.debug(f"Using details: {pformat(d['procedure']['equipment'])}")
|
||||
html = template.render(**d, css=[css])
|
||||
self.webview.setHtml(html)
|
||||
self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}")
|
||||
|
||||
@@ -12,7 +12,8 @@ from PyQt6.QtGui import QAction, QCursor, QStandardItemModel, QStandardItem, QIc
|
||||
from typing import Dict, List
|
||||
|
||||
# from backend import Procedure
|
||||
from backend.db.models import Run, ClientSubmission, Procedure
|
||||
from backend.db.models.submissions import Run, ClientSubmission
|
||||
from backend.db.models.procedures import Procedure
|
||||
from tools import Report, Result, report_result, get_application_from_parent
|
||||
from .functions import select_open_file
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
<tr>
|
||||
<td style="border: 1px solid black;">{{ equipment['equipmentrole'] }}</td>
|
||||
<td style="border: 1px solid black;">{{ equipment['name'] }}</td>
|
||||
<td style="border: 1px solid black;">{{ equipment['process']['name'] }}</td>
|
||||
<td style="border: 1px solid black;">{{ equipment['processversion']['name'] }}</td>
|
||||
{% if equipment['tips'] %}
|
||||
<td style="border: 1px solid black;">{{ equipment['tips']['name'] }}</td>
|
||||
{% else %}
|
||||
|
||||
Reference in New Issue
Block a user