From ba4912cab7404acbf0c1d77e8afdb90b3c88e22c Mon Sep 17 00:00:00 2001 From: lwark Date: Wed, 10 Sep 2025 12:39:02 -0500 Subject: [PATCH] ReagentLot add/edit updated. --- src/submissions/backend/db/models/__init__.py | 48 +- .../backend/db/models/procedures.py | 33 +- .../backend/db/models/submissions.py | 7 +- .../backend/excel/writers/__init__.py | 21 +- .../writers/procedure_writers/__init__.py | 20 +- .../backend/managers/procedures.py | 6 +- .../backend/validators/omni_gui_objects.py | 1626 ++++++++--------- src/submissions/backend/validators/pydant.py | 50 +- .../visualizations/concentrations_chart.py | 2 +- src/submissions/frontend/widgets/app.py | 4 +- .../frontend/widgets/omni_add_edit.py | 42 +- .../frontend/widgets/omni_manager_pydant.py | 1526 ++++++++-------- .../frontend/widgets/omni_search.py | 13 +- .../frontend/widgets/procedure_creation.py | 3 +- .../frontend/widgets/submission_details.py | 3 +- .../frontend/widgets/submission_widget.py | 6 +- src/submissions/tools/__init__.py | 158 +- submissions.spec | 28 +- 18 files changed, 1862 insertions(+), 1734 deletions(-) diff --git a/src/submissions/backend/db/models/__init__.py b/src/submissions/backend/db/models/__init__.py index af8f025..4672295 100644 --- a/src/submissions/backend/db/models/__init__.py +++ b/src/submissions/backend/db/models/__init__.py @@ -2,11 +2,12 @@ Contains all models for sqlalchemy """ from __future__ import annotations -import sys, logging, json +import sys, logging, json, inspect from dateutil.parser import parse +from jinja2 import TemplateNotFound from pandas import DataFrame from pydantic import BaseModel -from sqlalchemy import Column, INTEGER, String, JSON +from sqlalchemy import Column, INTEGER, String, JSON, DATETIME from sqlalchemy.ext.associationproxy import AssociationProxy from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session, InstrumentedAttribute, ColumnProperty from sqlalchemy.ext.declarative import declared_attr @@ -36,10 +37,6 @@ class BaseClass(Base): __table_args__ = {'extend_existing': True} #: NOTE Will only add new columns singles = ['id'] - # omni_removes = ["id", 'run', "omnigui_class_dict", "omnigui_instance_dict"] - # omni_sort = ["name"] - # omni_inheritable = [] - searchables = [] _misc_info = Column(JSON) @@ -155,6 +152,33 @@ class BaseClass(Base): except AttributeError: return [] + @classmethod + def get_omni_sort(cls): + output = [item[0] for item in inspect.getmembers(cls, lambda a: not (inspect.isroutine(a))) + if isinstance(item[1], InstrumentedAttribute)] # and not isinstance(item[1].property, _RelationshipDeclared)] + output = [item for item in output if item not in ['_misc_info']] + return output + + @classmethod + def get_searchables(cls): + output = [] + for item in inspect.getmembers(cls, lambda a: not (inspect.isroutine(a))): + if item[0] in ["_misc_info"]: + continue + if not isinstance(item[1], InstrumentedAttribute): + continue + if not isinstance(item[1].property, ColumnProperty): + continue + # if isinstance(item[1], _RelationshipDeclared): + # if "association" in item[0]: + # continue + if len(item[1].foreign_keys) > 0: + continue + if item[1].type.__class__.__name__ not in ["String"]: + continue + output.append(item[0]) + return output + @classmethod def get_default_info(cls, *args) -> dict | list | str: """ @@ -296,7 +320,6 @@ class BaseClass(Base): except AttributeError: check = False if check: - logger.debug("Got uselist") try: query = query.filter(attr.contains(v)) except ArgumentError: @@ -358,16 +381,14 @@ class BaseClass(Base): dict: Dictionary of object minus _sa_instance_state with id at the front. """ dicto = {key: dict(class_attr=getattr(self.__class__, key), instance_attr=getattr(self, key)) - for key in dir(self.__class__) if - isinstance(getattr(self.__class__, key), InstrumentedAttribute) and key not in self.omni_removes - } + for key in self.get_omni_sort()} for k, v in dicto.items(): try: v['instance_attr'] = v['instance_attr'].name except AttributeError: continue try: - dicto = list_sort_dict(input_dict=dicto, sort_list=self.__class__.omni_sort) + dicto = list_sort_dict(input_dict=dicto, sort_list=self.__class__.get_omni_sort()) except TypeError as e: logger.error(f"Could not sort {self.__class__.__name__} by list due to :{e}") try: @@ -418,7 +439,7 @@ class BaseClass(Base): try: template = env.get_template(temp_name) except TemplateNotFound as e: - logger.error(f"Couldn't find template {e}") + # logger.error(f"Couldn't find template {e}") template = env.get_template("details.html") return template @@ -505,7 +526,6 @@ class BaseClass(Base): existing = self.__getattribute__(key) # NOTE: This is causing problems with removal of items from lists. Have to overhaul it. if existing is not None: - logger.debug(f"{key} Existing: {existing}, incoming: {value}") if isinstance(value, list): value = value else: @@ -626,7 +646,7 @@ class BaseClass(Base): from backend.validators import pydant if not pyd_model_name: pyd_model_name = f"Pyd{self.__class__.__name__}" - logger.debug(f"Looking for pydant model {pyd_model_name}") + logger.info(f"Looking for pydant model {pyd_model_name}") try: pyd = getattr(pydant, pyd_model_name) except AttributeError: diff --git a/src/submissions/backend/db/models/procedures.py b/src/submissions/backend/db/models/procedures.py index 6404c8c..198d891 100644 --- a/src/submissions/backend/db/models/procedures.py +++ b/src/submissions/backend/db/models/procedures.py @@ -194,6 +194,7 @@ class Reagent(BaseClass, LogMixin): Concrete reagent instance """ + skip_on_edit = False id = Column(INTEGER, primary_key=True) #: primary key reagentrole = relationship("ReagentRole", back_populates="reagent", secondary=reagentrole_reagent) #: joined parent ReagentRole @@ -319,15 +320,7 @@ class Reagent(BaseClass, LogMixin): except AttributeError as e: logger.error(f"Could not set {key} due to {e}") - @check_authorization - def edit_from_search(self, obj, **kwargs): - from frontend.widgets.omni_add_edit import AddEdit - dlg = AddEdit(parent=None, instance=self) - if dlg.exec(): - pyd = dlg.parse_form() - for field in pyd.model_fields: - self.set_attribute(field, pyd.__getattribute__(field)) - self.save() + @classproperty def add_edit_tooltips(self): @@ -418,6 +411,15 @@ class ReagentLot(BaseClass): pass setattr(self, key, value) + @check_authorization + def edit_from_search(self, obj, **kwargs): + from frontend.widgets.omni_add_edit import AddEdit + dlg = AddEdit(parent=None, instance=self, disabled=['reagent']) + if dlg.exec(): + pyd = dlg.parse_form() + for field in pyd.model_fields: + self.set_attribute(field, pyd.__getattribute__(field)) + self.save() class Discount(BaseClass): """ @@ -952,6 +954,7 @@ class Procedure(BaseClass): output['sample'] = active_samples + inactive_samples output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']] output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']] + logger.debug(f"equipment: {pformat([item for item in output['equipment']])}") output['repeat'] = self.repeat output['run'] = self.run.name output['excluded'] += self.get_default_info("details_ignore") @@ -963,14 +966,6 @@ class Procedure(BaseClass): def to_pydantic(self, **kwargs): from backend.validators.pydant import PydReagent output = super().to_pydantic() - try: - output.kittype = dict(value=output.kittype['name'], missing=False) - except KeyError: - try: - output.kittype = dict(value=output.kittype['value'], missing=False) - except KeyError as e: - logger.error(f"Output.kittype: {output.kittype}") - raise e output.sample = [item.to_pydantic() for item in output.proceduresampleassociation] reagents = [] for reagent in output.reagent: @@ -985,6 +980,7 @@ class Procedure(BaseClass): output.result = [item.to_pydantic() for item in self.results] output.sample_results = flatten_list( [[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation]) + return output def create_proceduresampleassociations(self, sample): @@ -1607,7 +1603,7 @@ class Equipment(BaseClass, LogMixin): from backend.validators.pydant import PydEquipment creation_dict = self.details_dict() processes = self.get_processes(equipmentrole=equipmentrole) - creation_dict['process'] = processes + creation_dict['processes'] = processes creation_dict['equipmentrole'] = equipmentrole or creation_dict['equipmentrole'] return PydEquipment(**creation_dict) @@ -2187,6 +2183,7 @@ class ProcedureEquipmentAssociation(BaseClass): output.update(relevant) output['misc_info'] = misc output['equipment_role'] = self.equipmentrole + output['processes'] = [item for item in self.equipment.get_processes(equipmentrole=output['equipment_role'])] try: output['processversion'] = self.processversion.details_dict() except AttributeError: diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index d9b4be1..d132f4a 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -116,7 +116,6 @@ class ClientSubmission(BaseClass, LogMixin): if start_date is not None: start_date = cls.rectify_query_date(start_date) end_date = cls.rectify_query_date(end_date, eod=True) - logger.debug(f"Start date: {start_date}, end date: {end_date}") query = query.filter(cls.submitted_date.between(start_date, end_date)) # NOTE: by rsl number (returns only a single value) match submitter_plate_id: @@ -303,7 +302,6 @@ class ClientSubmission(BaseClass, LogMixin): return {item: self.__getattribute__(item.lower().replace(" ", "_")) for item in names} def add_run(self, obj): - logger.debug("Add Run") from frontend.widgets.sample_checker import SampleChecker samples = [sample.to_pydantic() for sample in self.clientsubmissionsampleassociation] checker = SampleChecker(parent=None, title="Create Run", samples=samples, clientsubmission=self) @@ -455,13 +453,14 @@ class Run(BaseClass, LogMixin): output = {k: v for k, v in dicto.items() if k in args} else: output = {k: v for k, v in dicto.items()} - logger.debug(f"Submission type for get default info: {submissiontype}") + # logger.debug(f"Submission type for get default info: {submissiontype}") if isinstance(submissiontype, SubmissionType): st = submissiontype else: st = cls.get_submission_type(submissiontype) if st is None: - logger.error("No default info for Run.") + # logger.error("No default info for Run.") + pass else: output['submissiontype'] = st.name for k, v in st.defaults.items(): diff --git a/src/submissions/backend/excel/writers/__init__.py b/src/submissions/backend/excel/writers/__init__.py index e3097c5..89f1d39 100644 --- a/src/submissions/backend/excel/writers/__init__.py +++ b/src/submissions/backend/excel/writers/__init__.py @@ -36,16 +36,17 @@ class DefaultWriter(object): value = value['name'] except (KeyError, ValueError): return + # logger.debug(f"Value type: {type(value)}") match value: case x if issubclass(value.__class__, BaseClass): value = value.name case x if issubclass(value.__class__, PydBaseClass): + logger.warning(f"PydBaseClass: {value}") value = value.name case bytes() | list(): value = None case datetime() | date(): value = value.strftime("%Y-%m-%d") - case _: value = str(value) return value @@ -80,9 +81,19 @@ class DefaultWriter(object): self.worksheet = self.prewrite(self.worksheet, start_row=start_row) self.start_row = self.delineate_start_row(start_row=start_row) self.end_row = self.delineate_end_row(start_row=start_row) + logger.debug(f"Rows for {self.__class__.__name__}:\tstart: {self.start_row}, end: {self.end_row}") return workbook - def delineate_start_row(self, start_row: int = 1): + def delineate_start_row(self, start_row: int = 1) -> int: + """ + Gets the first black row. + Args: + start_row (int): row to start looking at. + + Returns: + int + """ + logger.debug(f"{self.__class__.__name__} will start looking for blank rows at {start_row}") for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row): if all([item.value is None for item in row]): return iii @@ -135,6 +146,7 @@ class DefaultKEYVALUEWriter(DefaultWriter): dictionary = sort_dict_by_list(dictionary=dictionary, order_list=self.key_order) for ii, (k, v) in enumerate(dictionary.items(), start=self.start_row): value = self.stringify_value(value=v) + logger.debug(f"{self.__class__.__name__} attempting to write {value}") if value is None: continue self.worksheet.cell(column=1, row=ii, value=self.prettify_key(k)) @@ -159,7 +171,9 @@ class DefaultTABLEWriter(DefaultWriter): return row_count def delineate_end_row(self, start_row: int = 1) -> int: - return start_row + len(self.pydant_obj) + 1 + end_row = start_row + len(self.pydant_obj) + 1 + logger.debug(f"End row has been delineated as {start_row} + {len(self.pydant_obj)} + 1 = {end_row}") + return end_row def pad_samples_to_length(self, row_count, mode: Literal["submission", "procedure"] = "submission"): #, column_names): @@ -206,6 +220,7 @@ class DefaultTABLEWriter(DefaultWriter): value = object.improved_dict()[header.lower().replace(" ", "_")] except (AttributeError, KeyError): value = "" + # logger.debug(f"{self.__class__.__name__} attempting to write {value}") self.worksheet.cell(row=write_row, column=column, value=self.stringify_value(value)) self.worksheet = self.postwrite(self.worksheet) return workbook diff --git a/src/submissions/backend/excel/writers/procedure_writers/__init__.py b/src/submissions/backend/excel/writers/procedure_writers/__init__.py index 519d168..62cec8c 100644 --- a/src/submissions/backend/excel/writers/procedure_writers/__init__.py +++ b/src/submissions/backend/excel/writers/procedure_writers/__init__.py @@ -15,7 +15,7 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter): header_order = [] exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits', 'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', - 'reagentrole', 'results', 'sample', 'tips'] + 'reagentrole', 'results', 'sample', 'tips', 'reagentlot'] def __init__(self, pydant_obj, *args, **kwargs): @@ -25,23 +25,23 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter): def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, start_row: int = 1, *args, **kwargs) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.pydant_obj.proceduretype.name} Quality") + workbook = super().write_to_workbook(workbook=workbook, sheet=f"{self.pydant_obj.proceduretype.name[:20]} Quality") return workbook class ProcedureReagentWriter(DefaultTABLEWriter): - exclude = ["id", "comments", "missing"] - header_order = ["reagentrole", "name", "lot", "expiry"] + exclude = ["id", "comments", "missing", "active", "name"] + header_order = ["reagentrole", "reagent_name", "lot", "expiry"] def __init__(self, pydant_obj, *args, **kwargs): super().__init__(pydant_obj=pydant_obj, *args, **kwargs) - self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" + self.sheet = f"{self.pydant_obj.proceduretype.name[:20]} Quality" self.pydant_obj = self.pydant_obj.reagent def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, start_row: int = 1, *args, **kwargs) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet, start_row=start_row) return workbook @@ -52,12 +52,12 @@ class ProcedureEquipmentWriter(DefaultTABLEWriter): def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" + self.sheet = f"{self.pydant_obj.proceduretype.name[:20]} Quality" self.pydant_obj = self.pydant_obj.equipment def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, start_row: int = 1, *args, **kwargs) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet, start_row=start_row) return workbook @@ -68,10 +68,10 @@ class ProcedureSampleWriter(DefaultTABLEWriter): def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): super().__init__(pydant_obj=pydant_obj, range_dict=range_dict, *args, **kwargs) - self.sheet = f"{self.pydant_obj.proceduretype.name} Quality" + self.sheet = f"{self.pydant_obj.proceduretype.name[:20]} Quality" self.pydant_obj = self.pad_samples_to_length(row_count=pydant_obj.max_sample_rank, mode="procedure") def write_to_workbook(self, workbook: Workbook, sheet: str | None = None, start_row: int = 1, *args, **kwargs) -> Workbook: - workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet) + workbook = super().write_to_workbook(workbook=workbook, sheet=self.sheet, start_row=start_row) return workbook diff --git a/src/submissions/backend/managers/procedures.py b/src/submissions/backend/managers/procedures.py index 946775a..69d56da 100644 --- a/src/submissions/backend/managers/procedures.py +++ b/src/submissions/backend/managers/procedures.py @@ -66,19 +66,19 @@ class DefaultProcedureManager(DefaultManager): except AttributeError: reagent_writer = procedure_writers.ProcedureReagentWriter self.reagent_writer = reagent_writer(pydant_obj=self.pyd) - workbook = self.reagent_writer.write_to_workbook(workbook) + workbook = self.reagent_writer.write_to_workbook(workbook, start_row=self.info_writer.end_row) try: equipment_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}EquipmentWriter") except AttributeError: equipment_writer = procedure_writers.ProcedureEquipmentWriter self.equipment_writer = equipment_writer(pydant_obj=self.pyd) - workbook = self.equipment_writer.write_to_workbook(workbook) + workbook = self.equipment_writer.write_to_workbook(workbook, start_row=self.reagent_writer.end_row) try: sample_writer = getattr(procedure_writers, f"{self.proceduretype.name.replace(' ', '')}SampleWriter") except AttributeError: sample_writer = procedure_writers.ProcedureSampleWriter self.sample_writer = sample_writer(pydant_obj=self.pyd) - workbook = self.sample_writer.write_to_workbook(workbook) + workbook = self.sample_writer.write_to_workbook(workbook, start_row=self.equipment_writer.end_row) # # TODO: Find way to group results by result_type. for result in self.pyd.result: Writer = getattr(results_writers, f"{result.result_type}InfoWriter") diff --git a/src/submissions/backend/validators/omni_gui_objects.py b/src/submissions/backend/validators/omni_gui_objects.py index e14687a..02a3cec 100644 --- a/src/submissions/backend/validators/omni_gui_objects.py +++ b/src/submissions/backend/validators/omni_gui_objects.py @@ -1,813 +1,813 @@ -""" -Collection of pydantic objects to be used in the Gui system. -""" - -from __future__ import annotations -import logging -from pydantic import BaseModel, field_validator, Field -from typing import List, ClassVar -from backend.db.models import * -from sqlalchemy.orm.properties import ColumnProperty -from sqlalchemy.orm.relationships import _RelationshipDeclared - -logger = logging.getLogger(f"submissions.{__name__}") - - -class BaseOmni(BaseModel): - - instance_object: Any | None = Field(default=None) - - def __repr__(self): - try: - return f"<{self.__class__.__name__}({self.name})>" - except AttributeError: - return f"<{self.__class__.__name__}({self.__repr_name__})>" - - @classproperty - def aliases(cls) -> List[str]: - """ - Gets other names the sql object of this class might go by. - - Returns: - List[str]: List of names - """ - return cls.class_object.aliases - - def check_all_attributes(self, attributes: dict) -> bool: - """ - Compares this pobject to dictionary of attributes to determine equality. - - Args: - attributes (dict): - - Returns: - bool: result - """ - # logger.debug(f"Incoming attributes: {attributes}") - attributes = {k : v for k, v in attributes.items() if k in self.list_searchables.keys()} - for key, value in attributes.items(): - try: - # logger.debug(f"Check if {value.__class__} is subclass of {BaseOmni}") - check = issubclass(value.__class__, BaseOmni) - except TypeError as e: - logger.error(f"Couldn't check if {value.__class__} is subclass of {BaseOmni} due to {e}") - check = False - if check: - # logger.debug(f"Checking for subclass name.") - value = value.name - self_value = self.list_searchables[key] - if value != self_value: - # logger.debug(f"Value {key} is False, these are not the same object.") - return False - # logger.debug("Everything checks out, these are the same object.") - return True - - def __setattr__(self, key: str, value: Any): - """ - Overrides built in dunder method - - Args: - key (str): - value (Any): - """ - try: - class_value = getattr(self.class_object, key) - except AttributeError: - return super().__setattr__(key, value) - try: - new_key = class_value.impl.key - except AttributeError: - new_key = None - # logger.debug(f"Class value before new key: {class_value.property}") - if new_key and new_key != key: - class_value = getattr(self.class_object, new_key) - # logger.debug(f"Class value after new key: {class_value.property}") - if isinstance(class_value, InstrumentedAttribute): - # logger.debug(f"{key} is an InstrumentedAttribute with class_value.property: {class_value.property}.") - match class_value.property: - case ColumnProperty(): - # logger.debug(f"Setting ColumnProperty to {value}") - return super().__setattr__(key, value) - case _RelationshipDeclared(): - # logger.debug(f" {self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}") - if class_value.property.uselist: - # logger.debug(f"Setting {key} with uselist") - existing = self.__getattribute__(key) - if existing is not None: - # NOTE: Getting some really weird duplicates for OmniSubmissionTypeKitTypeAssociation here. - # logger.debug(f"Existing: {existing}, incoming: {value}") - if isinstance(value, list): - if value != existing: - value = existing + value - else: - value = existing - else: - if issubclass(value.__class__, self.__class__): - value = value.to_sql() - value = existing + [value] - else: - if issubclass(value.__class__, self.__class__): - value = value.to_sql() - value = [value] - # logger.debug(f"Final value for {key}: {value}") - return super().__setattr__(key, value) - else: - if isinstance(value, list): - if len(value) == 1: - value = value[0] - else: - raise ValueError("Object is too long to parse a single value.") - return super().__setattr__(key, value) - case _: - return super().__setattr__(key, value) - else: - return super().__setattr__(key, value) - - -class OmniSubmissionType(BaseOmni): - - class_object: ClassVar[Any] = SubmissionType - - name: str = Field(default="", description="property") - info_map: dict = Field(default={}, description="property") - defaults: dict = Field(default={}, description="property") - template_file: bytes = Field(default=bytes(), description="property") - sample_map: dict = Field(default={}, description="property") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - @field_validator("sample_map", mode="before") - @classmethod - def rescue_sample_map_none(cls, value): - if not value: - return {} - return value - - @field_validator("defaults", mode="before") - @classmethod - def rescue_defaults_none(cls, value): - if not value: - return {} - return value - - @field_validator("info_map", mode="before") - @classmethod - def rescue_info_map_none(cls, value): - if not value: - return {} - return value - - @field_validator("template_file", mode="before") - @classmethod - def provide_blank_template_file(cls, value): - if value is None: - value = bytes() - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - instance, new = self.class_object.query_or_create(name=self.name) - instance.info_map = self.info_map - instance.defaults = self.defaults - instance.sample_map = self.sample_map - if self.template_file: - instance.template_file = self.template_file - return instance - - -class OmniReagentRole(BaseOmni): - - class_object: ClassVar[Any] = ReagentRole - - name: str = Field(default="", description="property") - eol_ext: timedelta = Field(default=timedelta(days=0), description="property") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - @field_validator("eol_ext", mode="before") - @classmethod - def rescue_eol_ext(cls, value): - if not value: - value = timedelta(days=0) - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - instance, new = self.class_object.query_or_create(name=self.name) - if new: - instance.eol_ext = self.eol_ext - return instance - - -class OmniSubmissionTypeKitTypeAssociation(BaseOmni): - - class_object: ClassVar[Any] = SubmissionTypeKitTypeAssociation - - submissiontype: str | OmniSubmissionType = Field(default="", description="relationship", title="SubmissionType") - kittype: str | OmniKitType = Field(default="", description="relationship", title="KitType") - mutable_cost_column: float = Field(default=0.0, description="property") - mutable_cost_sample: float = Field(default=0.0, description="property") - constant_cost: float = Field(default=0.0, description="property") - - def __repr__(self): - if isinstance(self.submissiontype, str): - submissiontype = self.submissiontype - else: - submissiontype = self.submissiontype.name - if isinstance(self.kittype, str): - kittype = self.kittype - else: - kittype = self.kittype.name - try: - return f"<{self.__class__.__name__}({submissiontype}&{kittype})>" - except AttributeError: - return f"<{self.__class__.__name__}(NO NAME)>" - - @field_validator("proceduretype", mode="before") - @classmethod - def rescue_submissiontype_none(cls, value): - if not value: - return "" - return value - - @field_validator("kittype", mode="before") - @classmethod - def rescue_kittype_none(cls, value): - if not value: - return "" - return value - - @field_validator("kittype") - @classmethod - def no_list_please(cls, value): - if isinstance(value, list): - raise ValueError("List is not allowed for kittype.") - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - if isinstance(self.submissiontype, OmniSubmissionType): - submissiontype = self.submissiontype.name - else: - submissiontype = self.submissiontype - if isinstance(self.kittype, OmniKitType): - kittype = self.kittype.name - else: - kittype = self.kittype - return dict( - submissiontype=submissiontype, - kittype=kittype, - mutable_cost_column=self.mutable_cost_column, - mutable_cost_sample=self.mutable_cost_sample, - constant_cost=self.constant_cost - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - # logger.debug(f"Self kittype: {self.proceduretype}") - if issubclass(self.submissiontype.__class__, BaseOmni): - submissiontype = SubmissionType.query(name=self.submissiontype.name) - else: - submissiontype = SubmissionType.query(name=self.submissiontype) - if issubclass(self.kittype.__class__, BaseOmni): - kittype = KitType.query(name=self.kittype.name) - else: - kittype = KitType.query(name=self.kittype) - # logger.debug(f"Self kittype: {self.kittype}") - # logger.debug(f"Query or create with {kittype}, {proceduretype}") - instance, is_new = self.class_object.query_or_create(kittype=kittype, submissiontype=submissiontype) - instance.mutable_cost_column = self.mutable_cost_column - instance.mutable_cost_sample = self.mutable_cost_sample - instance.constant_cost = self.constant_cost - return instance - - @property - def list_searchables(self) -> dict: - """ - Provides attributes for checking this object against a dictionary. - - Returns: - dict: result - """ - if isinstance(self.kittype, OmniKitType): - kit = self.kittype.name - else: - kit = self.kittype - if isinstance(self.submissiontype, OmniSubmissionType): - subtype = self.submissiontype.name - else: - subtype = self.submissiontype - return dict(kittype=kit, submissiontype=subtype) - - -class OmniKitTypeReagentRoleAssociation(BaseOmni): - - class_object: ClassVar[Any] = KitTypeReagentRoleAssociation - - reagent_role: str | OmniReagentRole = Field(default="", description="relationship", title="ReagentRole") - uses: dict = Field(default={}, description="property") - required: bool = Field(default=True, description="property") - submission_type: str | OmniSubmissionType = Field(default="", description="relationship", title="SubmissionType") - kit_type: str | OmniKitType = Field(default="", description="relationship", title="KitType") - - def __repr__(self): - try: - return f"" - except AttributeError: - return f"" - - @field_validator("uses", mode="before") - @classmethod - def rescue_uses_none(cls, value): - if not value: - return {} - return value - - @field_validator("required", mode="before") - @classmethod - def rescue_required_none(cls, value): - if not value: - value = 1 - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - if isinstance(self.submission_type, OmniSubmissionType): - submission_type = self.submission_type.name - else: - submission_type = self.submission_type - if isinstance(self.kit_type, OmniKitType): - kit_type = self.kit_type.name - else: - kit_type = self.kit_type - # logger.debug(f"Using name: {name}") - if isinstance(self.reagent_role, OmniReagentRole): - reagent_role = self.reagent_role.name - else: - reagent_role = self.reagent_role - return dict( - reagentrole=reagent_role, - submissiontype=submission_type, - kittype=kit_type - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - if isinstance(self.reagent_role, OmniReagentRole): - reagent_role = self.reagent_role.name - else: - reagent_role = self.reagent_role - if issubclass(self.submission_type.__class__, BaseOmni): - submissiontype = self.submission_type.name - else: - submissiontype = self.submission_type - if issubclass(self.kit_type.__class__, BaseOmni): - kittype = self.kit_type.name - else: - kittype = self.kit_type - instance, new = self.class_object.query_or_create( - reagentrole=reagent_role, - kittype=kittype, - submissiontype=submissiontype - ) - # logger.debug(f"KitTypeReagentRoleAssociation coming out of query_or_create: {instance.__dict__}\nnew: {new}") - if new: - logger.warning(f"This is a new instance: {instance.__dict__}") - try: - reagent_role = self.reagent_role.to_sql() - except AttributeError: - reagent_role = ReagentRole.query(name=self.reagent_role) - instance.reagent_role = reagent_role - # logger.debug(f"KTRRAssoc uses: {self.uses}") - instance.uses = self.uses - instance.required = int(self.required) - # logger.debug(f"KitTypeReagentRoleAssociation: {pformat(instance.__dict__)}") - return instance - - @property - def list_searchables(self) -> dict: - """ - Provides attributes for checking this object against a dictionary. - - Returns: - dict: result - """ - if isinstance(self.kit_type, OmniKitType): - kit = self.kit_type.name - else: - kit = self.kit_type - if isinstance(self.submission_type, OmniSubmissionType): - subtype = self.submission_type.name - else: - subtype = self.submission_type - if isinstance(self.reagent_role, OmniReagentRole): - reagentrole = self.reagent_role.name - else: - reagentrole = self.reagent_role - return dict(kit_type=kit, submission_type=subtype, reagent_role=reagentrole) - - -class OmniEquipmentRole(BaseOmni): - - class_object: ClassVar[Any] = EquipmentRole - - name: str = Field(default="", description="property") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - instance, new = self.class_object.query_or_create(name=self.name) - return instance - - -class OmniTips(BaseOmni): - - class_object: ClassVar[Any] = Tips - - name: str = Field(default="", description="property") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - instance, new = self.class_object.query_or_create(name=self.name) - return instance - - -class OmniTipRole(BaseOmni): - - class_object: ClassVar[Any] = TipRole - - name: str = Field(default="", description="property") - tips: List[OmniTips] = Field(default=[], description="relationship", title="Tips") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name, - tips=[item.name for item in self.tips] - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - instance, new = self.class_object.query_or_create(name=self.name) - for tips in self.tips: - tips.to_sql() - return instance - - -class OmniProcess(BaseOmni): - - class_object: ClassVar[Any] = Process - - # NOTE: How am I going to figure out relatioinships without getting into recursion issues? - name: str = Field(default="", description="property") #: Process name - submission_types: List[OmniSubmissionType] | List[str] = Field(default=[], description="relationship", - title="SubmissionType") - equipment_roles: List[OmniEquipmentRole] | List[str] = Field(default=[], description="relationship", - title="EquipmentRole") - tip_roles: List[OmniTipRole] | List[str] = Field(default=[], description="relationship", title="TipRole") - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - submissiontypes = [item if isinstance(item, str) else item.name for item in self.submission_types] - logger.debug(f"Submission Types: {submissiontypes}") - equipmentroles = [item if isinstance(item, str) else item.name for item in self.equipment_roles] - logger.debug(f"Equipment Roles: {equipmentroles}") - return dict( - name=self.name, - submission_types=submissiontypes, - equipment_roles=equipmentroles - ) - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - instance, new = self.class_object.query_or_create(name=self.name) - for st in self.submission_types: - try: - new_assoc = st.to_sql() - except AttributeError: - new_assoc = SubmissionType.query(name=st) - if new_assoc not in instance.proceduretype: - instance.proceduretype.append(new_assoc) - for er in self.equipment_roles: - try: - new_assoc = er.to_sql() - except AttributeError: - new_assoc = EquipmentRole.query(name=er) - if new_assoc not in instance.equipmentrole: - instance.equipmentrole.append(new_assoc) - for tr in self.tip_roles: - try: - new_assoc = tr.to_sql() - except AttributeError: - new_assoc = TipRole.query(name=tr) - if new_assoc not in instance.tiprole: - instance.tiprole.append(new_assoc) - return instance - - @property - def list_searchables(self) -> dict: - """ - Provides attributes for checking this object against a dictionary. - - Returns: - dict: result - """ - return dict(name=self.name) - - -class OmniKitType(BaseOmni): - - class_object: ClassVar[Any] = KitType - - name: str = Field(default="", description="property") - kit_submissiontype_associations: List[OmniSubmissionTypeKitTypeAssociation] | List[str] = Field(default=[], description="relationship", title="SubmissionTypeKitTypeAssociation") - kit_reagentrole_associations: List[OmniKitTypeReagentRoleAssociation] | List[str] = Field(default=[], description="relationship", title="KitTypeReagentRoleAssociation") - processes: List[OmniProcess] | List[str] = Field(default=[], description="relationship", title="Process") - - @field_validator("name", mode="before") - @classmethod - def rescue_name_none(cls, value): - if not value: - return "" - return value - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name - ) - - def to_sql(self) -> KitType: - """ - Convert this object to an instance of its class object. - """ - - kit, is_new = KitType.query_or_create(name=self.name) - new_rr = [] - for rr_assoc in self.kit_reagentrole_associations: - new_assoc = rr_assoc.to_sql() - if new_assoc not in new_rr: - # logger.debug(f"Adding {new_assoc} to kit_reagentrole_associations") - new_rr.append(new_assoc) - # logger.debug(f"Setting kit_reagentrole_associations to {pformat([item.__dict__ for item in new_rr])}") - kit.kit_reagentrole_associations = new_rr - new_st = [] - for st_assoc in self.kit_submissiontype_associations: - new_assoc = st_assoc.to_sql() - if new_assoc not in new_st: - new_st.append(new_assoc) - kit.kit_submissiontype_associations = new_st - new_processes = [] - for process in self.processes: - new_process = process.to_sql() - if new_process not in new_processes: - new_processes.append(new_process) - kit.processes = new_processes - return kit - - -class OmniOrganization(BaseOmni): - - class_object: ClassVar[Any] = Organization - - name: str = Field(default="", description="property") - cost_centre: str = Field(default="", description="property") - contact: List[str] | List[OmniContact] = Field(default=[], description="relationship", title="Contact") - - def __init__(self, instance_object: Any, **data): - # logger.debug(f"Incoming data: {data}") - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name, - cost_centre=self.cost_centre, - contacts=self.contact - ) - - -class OmniContact(BaseOmni): - - class_object: ClassVar[Any] = Contact - - name: str = Field(default="", description="property") - email: str = Field(default="", description="property") - phone: str = Field(default="", description="property") - - @property - def list_searchables(self) -> dict: - """ - Provides attributes for checking this object against a dictionary. - - Returns: - dict: result - """ - return dict(name=self.name, email=self.email) - - def __init__(self, instance_object: Any, **data): - super().__init__(**data) - self.instance_object = instance_object - - @property - def dataframe_dict(self) -> dict: - """ - Dictionary of gui relevant values. - - Returns: - dict: result - """ - return dict( - name=self.name, - email=self.email, - phone=self.phone - ) - - def to_sql(self): - """ - Convert this object to an instance of its class object. - """ - - contact, is_new = Contact.query_or_create(name=self.name, email=self.email, phone=self.phone) - return contact +# """ +# Collection of pydantic objects to be used in the Gui system. +# """ +# +# from __future__ import annotations +# import logging +# from pydantic import BaseModel, field_validator, Field +# from typing import List, ClassVar +# from backend.db.models import * +# from sqlalchemy.orm.properties import ColumnProperty +# from sqlalchemy.orm.relationships import _RelationshipDeclared +# +# logger = logging.getLogger(f"submissions.{__name__}") +# +# +# class BaseOmni(BaseModel): +# +# instance_object: Any | None = Field(default=None) +# +# def __repr__(self): +# try: +# return f"<{self.__class__.__name__}({self.name})>" +# except AttributeError: +# return f"<{self.__class__.__name__}({self.__repr_name__})>" +# +# @classproperty +# def aliases(cls) -> List[str]: +# """ +# Gets other names the sql object of this class might go by. +# +# Returns: +# List[str]: List of names +# """ +# return cls.class_object.aliases +# +# def check_all_attributes(self, attributes: dict) -> bool: +# """ +# Compares this pobject to dictionary of attributes to determine equality. +# +# Args: +# attributes (dict): +# +# Returns: +# bool: result +# """ +# # logger.debug(f"Incoming attributes: {attributes}") +# attributes = {k : v for k, v in attributes.items() if k in self.list_searchables.keys()} +# for key, value in attributes.items(): +# try: +# # logger.debug(f"Check if {value.__class__} is subclass of {BaseOmni}") +# check = issubclass(value.__class__, BaseOmni) +# except TypeError as e: +# logger.error(f"Couldn't check if {value.__class__} is subclass of {BaseOmni} due to {e}") +# check = False +# if check: +# # logger.debug(f"Checking for subclass name.") +# value = value.name +# self_value = self.list_searchables[key] +# if value != self_value: +# # logger.debug(f"Value {key} is False, these are not the same object.") +# return False +# # logger.debug("Everything checks out, these are the same object.") +# return True +# +# def __setattr__(self, key: str, value: Any): +# """ +# Overrides built in dunder method +# +# Args: +# key (str): +# value (Any): +# """ +# try: +# class_value = getattr(self.class_object, key) +# except AttributeError: +# return super().__setattr__(key, value) +# try: +# new_key = class_value.impl.key +# except AttributeError: +# new_key = None +# # logger.debug(f"Class value before new key: {class_value.property}") +# if new_key and new_key != key: +# class_value = getattr(self.class_object, new_key) +# # logger.debug(f"Class value after new key: {class_value.property}") +# if isinstance(class_value, InstrumentedAttribute): +# # logger.debug(f"{key} is an InstrumentedAttribute with class_value.property: {class_value.property}.") +# match class_value.property: +# case ColumnProperty(): +# # logger.debug(f"Setting ColumnProperty to {value}") +# return super().__setattr__(key, value) +# case _RelationshipDeclared(): +# # logger.debug(f" {self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}") +# if class_value.property.uselist: +# # logger.debug(f"Setting {key} with uselist") +# existing = self.__getattribute__(key) +# if existing is not None: +# # NOTE: Getting some really weird duplicates for OmniSubmissionTypeKitTypeAssociation here. +# # logger.debug(f"Existing: {existing}, incoming: {value}") +# if isinstance(value, list): +# if value != existing: +# value = existing + value +# else: +# value = existing +# else: +# if issubclass(value.__class__, self.__class__): +# value = value.to_sql() +# value = existing + [value] +# else: +# if issubclass(value.__class__, self.__class__): +# value = value.to_sql() +# value = [value] +# # logger.debug(f"Final value for {key}: {value}") +# return super().__setattr__(key, value) +# else: +# if isinstance(value, list): +# if len(value) == 1: +# value = value[0] +# else: +# raise ValueError("Object is too long to parse a single value.") +# return super().__setattr__(key, value) +# case _: +# return super().__setattr__(key, value) +# else: +# return super().__setattr__(key, value) +# +# +# class OmniSubmissionType(BaseOmni): +# +# class_object: ClassVar[Any] = SubmissionType +# +# name: str = Field(default="", description="property") +# info_map: dict = Field(default={}, description="property") +# defaults: dict = Field(default={}, description="property") +# template_file: bytes = Field(default=bytes(), description="property") +# sample_map: dict = Field(default={}, description="property") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# @field_validator("sample_map", mode="before") +# @classmethod +# def rescue_sample_map_none(cls, value): +# if not value: +# return {} +# return value +# +# @field_validator("defaults", mode="before") +# @classmethod +# def rescue_defaults_none(cls, value): +# if not value: +# return {} +# return value +# +# @field_validator("info_map", mode="before") +# @classmethod +# def rescue_info_map_none(cls, value): +# if not value: +# return {} +# return value +# +# @field_validator("template_file", mode="before") +# @classmethod +# def provide_blank_template_file(cls, value): +# if value is None: +# value = bytes() +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# instance, new = self.class_object.query_or_create(name=self.name) +# instance.info_map = self.info_map +# instance.defaults = self.defaults +# instance.sample_map = self.sample_map +# if self.template_file: +# instance.template_file = self.template_file +# return instance +# +# +# class OmniReagentRole(BaseOmni): +# +# class_object: ClassVar[Any] = ReagentRole +# +# name: str = Field(default="", description="property") +# eol_ext: timedelta = Field(default=timedelta(days=0), description="property") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# @field_validator("eol_ext", mode="before") +# @classmethod +# def rescue_eol_ext(cls, value): +# if not value: +# value = timedelta(days=0) +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# instance, new = self.class_object.query_or_create(name=self.name) +# if new: +# instance.eol_ext = self.eol_ext +# return instance +# +# +# class OmniSubmissionTypeKitTypeAssociation(BaseOmni): +# +# class_object: ClassVar[Any] = SubmissionTypeKitTypeAssociation +# +# submissiontype: str | OmniSubmissionType = Field(default="", description="relationship", title="SubmissionType") +# kittype: str | OmniKitType = Field(default="", description="relationship", title="KitType") +# mutable_cost_column: float = Field(default=0.0, description="property") +# mutable_cost_sample: float = Field(default=0.0, description="property") +# constant_cost: float = Field(default=0.0, description="property") +# +# def __repr__(self): +# if isinstance(self.submissiontype, str): +# submissiontype = self.submissiontype +# else: +# submissiontype = self.submissiontype.name +# if isinstance(self.kittype, str): +# kittype = self.kittype +# else: +# kittype = self.kittype.name +# try: +# return f"<{self.__class__.__name__}({submissiontype}&{kittype})>" +# except AttributeError: +# return f"<{self.__class__.__name__}(NO NAME)>" +# +# @field_validator("proceduretype", mode="before") +# @classmethod +# def rescue_submissiontype_none(cls, value): +# if not value: +# return "" +# return value +# +# @field_validator("kittype", mode="before") +# @classmethod +# def rescue_kittype_none(cls, value): +# if not value: +# return "" +# return value +# +# @field_validator("kittype") +# @classmethod +# def no_list_please(cls, value): +# if isinstance(value, list): +# raise ValueError("List is not allowed for kittype.") +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# if isinstance(self.submissiontype, OmniSubmissionType): +# submissiontype = self.submissiontype.name +# else: +# submissiontype = self.submissiontype +# if isinstance(self.kittype, OmniKitType): +# kittype = self.kittype.name +# else: +# kittype = self.kittype +# return dict( +# submissiontype=submissiontype, +# kittype=kittype, +# mutable_cost_column=self.mutable_cost_column, +# mutable_cost_sample=self.mutable_cost_sample, +# constant_cost=self.constant_cost +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# # logger.debug(f"Self kittype: {self.proceduretype}") +# if issubclass(self.submissiontype.__class__, BaseOmni): +# submissiontype = SubmissionType.query(name=self.submissiontype.name) +# else: +# submissiontype = SubmissionType.query(name=self.submissiontype) +# if issubclass(self.kittype.__class__, BaseOmni): +# kittype = KitType.query(name=self.kittype.name) +# else: +# kittype = KitType.query(name=self.kittype) +# # logger.debug(f"Self kittype: {self.kittype}") +# # logger.debug(f"Query or create with {kittype}, {proceduretype}") +# instance, is_new = self.class_object.query_or_create(kittype=kittype, submissiontype=submissiontype) +# instance.mutable_cost_column = self.mutable_cost_column +# instance.mutable_cost_sample = self.mutable_cost_sample +# instance.constant_cost = self.constant_cost +# return instance +# +# @property +# def list_searchables(self) -> dict: +# """ +# Provides attributes for checking this object against a dictionary. +# +# Returns: +# dict: result +# """ +# if isinstance(self.kittype, OmniKitType): +# kit = self.kittype.name +# else: +# kit = self.kittype +# if isinstance(self.submissiontype, OmniSubmissionType): +# subtype = self.submissiontype.name +# else: +# subtype = self.submissiontype +# return dict(kittype=kit, submissiontype=subtype) +# +# +# class OmniKitTypeReagentRoleAssociation(BaseOmni): +# +# class_object: ClassVar[Any] = KitTypeReagentRoleAssociation +# +# reagent_role: str | OmniReagentRole = Field(default="", description="relationship", title="ReagentRole") +# uses: dict = Field(default={}, description="property") +# required: bool = Field(default=True, description="property") +# submission_type: str | OmniSubmissionType = Field(default="", description="relationship", title="SubmissionType") +# kit_type: str | OmniKitType = Field(default="", description="relationship", title="KitType") +# +# def __repr__(self): +# try: +# return f"" +# except AttributeError: +# return f"" +# +# @field_validator("uses", mode="before") +# @classmethod +# def rescue_uses_none(cls, value): +# if not value: +# return {} +# return value +# +# @field_validator("required", mode="before") +# @classmethod +# def rescue_required_none(cls, value): +# if not value: +# value = 1 +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# if isinstance(self.submission_type, OmniSubmissionType): +# submission_type = self.submission_type.name +# else: +# submission_type = self.submission_type +# if isinstance(self.kit_type, OmniKitType): +# kit_type = self.kit_type.name +# else: +# kit_type = self.kit_type +# # logger.debug(f"Using name: {name}") +# if isinstance(self.reagent_role, OmniReagentRole): +# reagent_role = self.reagent_role.name +# else: +# reagent_role = self.reagent_role +# return dict( +# reagentrole=reagent_role, +# submissiontype=submission_type, +# kittype=kit_type +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# if isinstance(self.reagent_role, OmniReagentRole): +# reagent_role = self.reagent_role.name +# else: +# reagent_role = self.reagent_role +# if issubclass(self.submission_type.__class__, BaseOmni): +# submissiontype = self.submission_type.name +# else: +# submissiontype = self.submission_type +# if issubclass(self.kit_type.__class__, BaseOmni): +# kittype = self.kit_type.name +# else: +# kittype = self.kit_type +# instance, new = self.class_object.query_or_create( +# reagentrole=reagent_role, +# kittype=kittype, +# submissiontype=submissiontype +# ) +# # logger.debug(f"KitTypeReagentRoleAssociation coming out of query_or_create: {instance.__dict__}\nnew: {new}") +# if new: +# logger.warning(f"This is a new instance: {instance.__dict__}") +# try: +# reagent_role = self.reagent_role.to_sql() +# except AttributeError: +# reagent_role = ReagentRole.query(name=self.reagent_role) +# instance.reagent_role = reagent_role +# # logger.debug(f"KTRRAssoc uses: {self.uses}") +# instance.uses = self.uses +# instance.required = int(self.required) +# # logger.debug(f"KitTypeReagentRoleAssociation: {pformat(instance.__dict__)}") +# return instance +# +# @property +# def list_searchables(self) -> dict: +# """ +# Provides attributes for checking this object against a dictionary. +# +# Returns: +# dict: result +# """ +# if isinstance(self.kit_type, OmniKitType): +# kit = self.kit_type.name +# else: +# kit = self.kit_type +# if isinstance(self.submission_type, OmniSubmissionType): +# subtype = self.submission_type.name +# else: +# subtype = self.submission_type +# if isinstance(self.reagent_role, OmniReagentRole): +# reagentrole = self.reagent_role.name +# else: +# reagentrole = self.reagent_role +# return dict(kit_type=kit, submission_type=subtype, reagent_role=reagentrole) +# +# +# class OmniEquipmentRole(BaseOmni): +# +# class_object: ClassVar[Any] = EquipmentRole +# +# name: str = Field(default="", description="property") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# instance, new = self.class_object.query_or_create(name=self.name) +# return instance +# +# +# class OmniTips(BaseOmni): +# +# class_object: ClassVar[Any] = Tips +# +# name: str = Field(default="", description="property") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# instance, new = self.class_object.query_or_create(name=self.name) +# return instance +# +# +# class OmniTipRole(BaseOmni): +# +# class_object: ClassVar[Any] = TipRole +# +# name: str = Field(default="", description="property") +# tips: List[OmniTips] = Field(default=[], description="relationship", title="Tips") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name, +# tips=[item.name for item in self.tips] +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# instance, new = self.class_object.query_or_create(name=self.name) +# for tips in self.tips: +# tips.to_sql() +# return instance +# +# +# class OmniProcess(BaseOmni): +# +# class_object: ClassVar[Any] = Process +# +# # NOTE: How am I going to figure out relatioinships without getting into recursion issues? +# name: str = Field(default="", description="property") #: Process name +# submission_types: List[OmniSubmissionType] | List[str] = Field(default=[], description="relationship", +# title="SubmissionType") +# equipment_roles: List[OmniEquipmentRole] | List[str] = Field(default=[], description="relationship", +# title="EquipmentRole") +# tip_roles: List[OmniTipRole] | List[str] = Field(default=[], description="relationship", title="TipRole") +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# submissiontypes = [item if isinstance(item, str) else item.name for item in self.submission_types] +# logger.debug(f"Submission Types: {submissiontypes}") +# equipmentroles = [item if isinstance(item, str) else item.name for item in self.equipment_roles] +# logger.debug(f"Equipment Roles: {equipmentroles}") +# return dict( +# name=self.name, +# submission_types=submissiontypes, +# equipment_roles=equipmentroles +# ) +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# instance, new = self.class_object.query_or_create(name=self.name) +# for st in self.submission_types: +# try: +# new_assoc = st.to_sql() +# except AttributeError: +# new_assoc = SubmissionType.query(name=st) +# if new_assoc not in instance.proceduretype: +# instance.proceduretype.append(new_assoc) +# for er in self.equipment_roles: +# try: +# new_assoc = er.to_sql() +# except AttributeError: +# new_assoc = EquipmentRole.query(name=er) +# if new_assoc not in instance.equipmentrole: +# instance.equipmentrole.append(new_assoc) +# for tr in self.tip_roles: +# try: +# new_assoc = tr.to_sql() +# except AttributeError: +# new_assoc = TipRole.query(name=tr) +# if new_assoc not in instance.tiprole: +# instance.tiprole.append(new_assoc) +# return instance +# +# @property +# def list_searchables(self) -> dict: +# """ +# Provides attributes for checking this object against a dictionary. +# +# Returns: +# dict: result +# """ +# return dict(name=self.name) +# +# +# class OmniKitType(BaseOmni): +# +# class_object: ClassVar[Any] = KitType +# +# name: str = Field(default="", description="property") +# kit_submissiontype_associations: List[OmniSubmissionTypeKitTypeAssociation] | List[str] = Field(default=[], description="relationship", title="SubmissionTypeKitTypeAssociation") +# kit_reagentrole_associations: List[OmniKitTypeReagentRoleAssociation] | List[str] = Field(default=[], description="relationship", title="KitTypeReagentRoleAssociation") +# processes: List[OmniProcess] | List[str] = Field(default=[], description="relationship", title="Process") +# +# @field_validator("name", mode="before") +# @classmethod +# def rescue_name_none(cls, value): +# if not value: +# return "" +# return value +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name +# ) +# +# def to_sql(self) -> KitType: +# """ +# Convert this object to an instance of its class object. +# """ +# +# kit, is_new = KitType.query_or_create(name=self.name) +# new_rr = [] +# for rr_assoc in self.kit_reagentrole_associations: +# new_assoc = rr_assoc.to_sql() +# if new_assoc not in new_rr: +# # logger.debug(f"Adding {new_assoc} to kit_reagentrole_associations") +# new_rr.append(new_assoc) +# # logger.debug(f"Setting kit_reagentrole_associations to {pformat([item.__dict__ for item in new_rr])}") +# kit.kit_reagentrole_associations = new_rr +# new_st = [] +# for st_assoc in self.kit_submissiontype_associations: +# new_assoc = st_assoc.to_sql() +# if new_assoc not in new_st: +# new_st.append(new_assoc) +# kit.kit_submissiontype_associations = new_st +# new_processes = [] +# for process in self.processes: +# new_process = process.to_sql() +# if new_process not in new_processes: +# new_processes.append(new_process) +# kit.processes = new_processes +# return kit +# +# +# class OmniOrganization(BaseOmni): +# +# class_object: ClassVar[Any] = Organization +# +# name: str = Field(default="", description="property") +# cost_centre: str = Field(default="", description="property") +# contact: List[str] | List[OmniContact] = Field(default=[], description="relationship", title="Contact") +# +# def __init__(self, instance_object: Any, **data): +# # logger.debug(f"Incoming data: {data}") +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name, +# cost_centre=self.cost_centre, +# contacts=self.contact +# ) +# +# +# class OmniContact(BaseOmni): +# +# class_object: ClassVar[Any] = Contact +# +# name: str = Field(default="", description="property") +# email: str = Field(default="", description="property") +# phone: str = Field(default="", description="property") +# +# @property +# def list_searchables(self) -> dict: +# """ +# Provides attributes for checking this object against a dictionary. +# +# Returns: +# dict: result +# """ +# return dict(name=self.name, email=self.email) +# +# def __init__(self, instance_object: Any, **data): +# super().__init__(**data) +# self.instance_object = instance_object +# +# @property +# def dataframe_dict(self) -> dict: +# """ +# Dictionary of gui relevant values. +# +# Returns: +# dict: result +# """ +# return dict( +# name=self.name, +# email=self.email, +# phone=self.phone +# ) +# +# def to_sql(self): +# """ +# Convert this object to an instance of its class object. +# """ +# +# contact, is_new = Contact.query_or_create(name=self.name, email=self.email, phone=self.phone) +# return contact diff --git a/src/submissions/backend/validators/pydant.py b/src/submissions/backend/validators/pydant.py index 491701f..1cea0b5 100644 --- a/src/submissions/backend/validators/pydant.py +++ b/src/submissions/backend/validators/pydant.py @@ -321,7 +321,7 @@ class PydTips(PydBaseClass): SubmissionTipsAssociation: Association between queried tips and procedure """ report = Report() - tips = TipsLot.query(name=self.name, limit=1) + tips = TipsLot.query(lot=self.lot, limit=1) return tips, report @@ -329,7 +329,8 @@ class PydEquipment(PydBaseClass): asset_number: str name: str nickname: str | None - process: List[PydProcess] | PydProcess | None + processes: List[PydProcess] | PydProcess | None + processversion: PydProcess | None equipmentrole: str | PydEquipmentRole | None tips: List[PydTips] | PydTips | None = Field(default=[]) @@ -347,7 +348,7 @@ class PydEquipment(PydBaseClass): value = value.name return value - @field_validator('process', mode='before') + @field_validator('processes', mode='before') @classmethod def process_to_pydantic(cls, value, values): if isinstance(value, GeneratorType): @@ -355,16 +356,25 @@ class PydEquipment(PydBaseClass): value = convert_nans_to_nones(value) if not value: value = [] - if isinstance(value, ProcessVersion): - value = value.to_pydantic(pyd_model_name="PydProcess") - else: - try: - d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None) - if d: - value = d.to_pydantic() - except AttributeError as e: - logger.error(f"Process Validation error due to {e}") - pass + match value: + case ProcessVersion(): + value = value.to_pydantic(pyd_model_name="PydProcess") + case _: + try: + # d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None) + for process in value: + match process: + case Process(): + if values.data['name'] in [item.name for item in process.equipment]: + return process.to_pydantic() + return None + case str(): + return process + # else: + # value = [] + except AttributeError as e: + logger.error(f"Process Validation error due to {e}") + value = [] return value @field_validator('tips', mode='before') @@ -386,7 +396,7 @@ class PydEquipment(PydBaseClass): value = d.to_pydantic() except AttributeError as e: logger.error(f"Process Validation error due to {e}") - pass + value = [] return value @report_result @@ -1347,6 +1357,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): def to_sql(self, new: bool = False): from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation + logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}") if new: sql = Procedure() else: @@ -1408,6 +1419,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): procedure_rank=sample.procedure_rank) for equipment in self.equipment: equip, _ = equipment.to_sql() + logger.debug(f"Equipment:\n{pformat(equip.__dict__)}") if isinstance(equipment.process, list): equipment.process = equipment.process[0] if isinstance(equipment.tips, list): @@ -1420,10 +1432,13 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): equipmentrole=equip.equipmentrole[0]) process = equipment.process.to_sql() equip_assoc.processversion = process + logger.debug(f"Tips: {type(equipment.tips)}") try: tipslot = equipment.tips.to_sql() + logger.debug(f"Tipslot: {tipslot.__dict__}") except AttributeError: tipslot = None + equip_assoc.tipslot = tipslot return sql, None @@ -1560,6 +1575,13 @@ class PydClientSubmission(PydBaseClass): value = int(value) return value + @field_validator("cost_centre", mode="before") + @classmethod + def str_to_dict(cls, value): + if isinstance(value, str): + value = dict(value=value) + return value + def to_form(self, parent: QWidget, samples: List = [], disable: list | None = None): """ Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget diff --git a/src/submissions/frontend/visualizations/concentrations_chart.py b/src/submissions/frontend/visualizations/concentrations_chart.py index bf248e9..d539b1a 100644 --- a/src/submissions/frontend/visualizations/concentrations_chart.py +++ b/src/submissions/frontend/visualizations/concentrations_chart.py @@ -35,7 +35,7 @@ class ConcentrationsChart(CustomFigure): color="positive", color_discrete_map={"positive": "red", "negative": "green", "sample":"orange"} ) except (ValueError, AttributeError) as e: - logger.error(f"Error constructing chart: {e}") + # logger.error(f"Error constructing chart: {e}") scatter = px.scatter() # NOTE: For some reason if data is allowed to sort itself it leads to wrong ordering of x axis. traces = sorted(scatter.data, key=itemgetter("name")) diff --git a/src/submissions/frontend/widgets/app.py b/src/submissions/frontend/widgets/app.py index ad7960f..635681b 100644 --- a/src/submissions/frontend/widgets/app.py +++ b/src/submissions/frontend/widgets/app.py @@ -13,7 +13,7 @@ from PyQt6.QtGui import QAction from pathlib import Path from markdown import markdown from pandas import ExcelWriter -from backend.db.models import Reagent +from backend.db.models import ReagentLot from tools import ( check_if_app, Settings, Report, jinja_template_loading, check_authorization, page_size, is_power_user, under_development @@ -181,7 +181,7 @@ class App(QMainWindow): @check_authorization def edit_reagent(self, *args, **kwargs): - dlg = SearchBox(parent=self, object_type=Reagent, extras=[dict(name='Role', field="reagentrole")]) + dlg = SearchBox(parent=self, object_type=ReagentLot, extras=[dict(name='Role', field="reagentrole")]) dlg.exec() def update_data(self): diff --git a/src/submissions/frontend/widgets/omni_add_edit.py b/src/submissions/frontend/widgets/omni_add_edit.py index bbe1b4c..9940262 100644 --- a/src/submissions/frontend/widgets/omni_add_edit.py +++ b/src/submissions/frontend/widgets/omni_add_edit.py @@ -3,7 +3,7 @@ A widget to handle adding/updating any database object. """ from datetime import date from pprint import pformat -from typing import Any, Tuple +from typing import Any, Tuple, List from pydantic import BaseModel from PyQt6.QtWidgets import ( QLabel, QDialog, QWidget, QLineEdit, QGridLayout, QComboBox, QDialogButtonBox, QDateEdit, QSpinBox, QDoubleSpinBox, @@ -13,6 +13,8 @@ from sqlalchemy import String, TIMESTAMP, INTEGER, FLOAT, JSON, BLOB from sqlalchemy.orm import ColumnProperty import logging from sqlalchemy.orm.relationships import _RelationshipDeclared +from backend.db.models import BaseClass +from backend.validators.pydant import PydBaseClass from tools import Report, report_result logger = logging.getLogger(f"submissions.{__name__}") @@ -20,20 +22,19 @@ logger = logging.getLogger(f"submissions.{__name__}") class AddEdit(QDialog): - def __init__(self, parent, instance: Any | None = None, managers: set = set()): + def __init__(self, parent, instance: Any | None = None, managers: set = set(), disabled: List[str] = []): super().__init__(parent) - # logger.debug(f"Managers: {managers}") + logger.debug(f"Disable = {disabled}") self.instance = instance self.object_type = instance.__class__ self.managers = managers - # logger.debug(f"Managers: {managers}") self.layout = QGridLayout(self) QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel self.buttonBox = QDialogButtonBox(QBtn) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) - # logger.debug(f"Fields: {pformat(self.instance.omnigui_instance_dict)}") fields = {k: v for k, v in self.instance.omnigui_instance_dict.items() if "id" not in k} + logger.debug(f"Fields: {pformat(fields)}") # NOTE: Move 'name' to the front try: fields = {'name': fields.pop('name'), **fields} @@ -41,13 +42,13 @@ class AddEdit(QDialog): pass height_counter = 0 for key, field in fields.items(): + disable = key in disabled try: value = getattr(self.instance, key) except AttributeError: value = None try: - logger.debug(f"{key} property: {type(field['class_attr'].property)}") - widget = EditProperty(self, key=key, column_type=field, value=value) + widget = EditProperty(self, key=key, column_type=field, value=value, disable=disable) except AttributeError as e: logger.error(f"Problem setting widget {key}: {e}") continue @@ -55,7 +56,7 @@ class AddEdit(QDialog): self.layout.addWidget(widget, self.layout.rowCount(), 0) height_counter += 1 self.layout.addWidget(self.buttonBox) - self.setWindowTitle(f"Add/Edit {self.object_type.__name__} - Manager: {self.managers}") + self.setWindowTitle(f"Add/Edit {self.object_type.__name__}")# - Manager: {self.managers}") self.setMinimumSize(600, 50 * height_counter) self.setLayout(self.layout) @@ -64,11 +65,8 @@ class AddEdit(QDialog): report = Report() parsed = {result[0].strip(":"): result[1] for result in [item.parse_form() for item in self.findChildren(EditProperty)] if result[0]} - # logger.debug(f"Parsed form: {parsed}") model = self.object_type.pydantic_model - # logger.debug(f"Model type: {model.__name__}") if model.__name__ == "PydElastic": - # logger.debug(f"We have an elastic model.") parsed['instance'] = self.instance # NOTE: Hand-off to pydantic model for validation. # NOTE: Also, why am I not just using the toSQL method here. I could write one for contact. @@ -78,8 +76,9 @@ class AddEdit(QDialog): class EditProperty(QWidget): - def __init__(self, parent: AddEdit, key: str, column_type: Any, value): + def __init__(self, parent: AddEdit, key: str, column_type: Any, value, disable: bool): super().__init__(parent) + logger.debug(f"Widget column type for {key}: {column_type}") self.name = key self.label = QLabel(key.title().replace("_", " ")) self.layout = QGridLayout() @@ -88,6 +87,7 @@ class EditProperty(QWidget): self.property_class = column_type['class_attr'].property.entity.class_ except AttributeError: self.property_class = None + logger.debug(f"Property class: {self.property_class}") try: self.is_list = column_type['class_attr'].property.uselist except AttributeError: @@ -96,23 +96,26 @@ class EditProperty(QWidget): case ColumnProperty(): self.column_property_set(column_type, value=value) case _RelationshipDeclared(): - if not self.property_class.skip_on_edit: + try: + check = self.property_class.skip_on_edit + except AttributeError: + check = False + if not check: self.relationship_property_set(column_type, value=value) else: return case _: logger.error(f"{column_type} not a supported type.") return + self.widget.setDisabled(disable) self.layout.addWidget(self.label, 0, 0, 1, 1) self.layout.addWidget(self.widget, 0, 1, 1, 3) self.setLayout(self.layout) def relationship_property_set(self, relationship, value=None): self.widget = QComboBox() - # logger.debug(self.parent().managers) for manager in self.parent().managers: if self.name in manager.aliases: - # logger.debug(f"Name: {self.name} is in aliases: {manager.aliases}") choices = [manager.name] self.widget.setEnabled(False) break @@ -127,11 +130,17 @@ class EditProperty(QWidget): if isinstance(instance_value, list): instance_value = next((item.name for item in instance_value), None) if instance_value: + match instance_value: + case x if issubclass(instance_value.__class__, BaseClass): + instance_value = instance_value.name + case x if issubclass(instance_value.__class__, PydBaseClass): + instance_value = instance_value.name + case _: + pass choices.insert(0, choices.pop(choices.index(instance_value))) self.widget.addItems(choices) def column_property_set(self, column_property, value=None): - # logger.debug(f"Column Property: {column_property['class_attr'].expression} {column_property}, Value: {value}") match column_property['class_attr'].expression.type: case String(): if value is None: @@ -176,7 +185,6 @@ class EditProperty(QWidget): check = self.widget except AttributeError: return None, None - # match self.widget match check: case QLineEdit(): value = self.widget.text() diff --git a/src/submissions/frontend/widgets/omni_manager_pydant.py b/src/submissions/frontend/widgets/omni_manager_pydant.py index 033fba0..8165e79 100644 --- a/src/submissions/frontend/widgets/omni_manager_pydant.py +++ b/src/submissions/frontend/widgets/omni_manager_pydant.py @@ -1,763 +1,763 @@ -""" -Provides a screen for managing all attributes of a database object. -""" -import json, logging, sys -from json.decoder import JSONDecodeError -from datetime import datetime, timedelta -from pprint import pformat -from typing import Any, List, Literal -from PyQt6.QtCore import QSortFilterProxyModel, Qt, QModelIndex -from PyQt6.QtGui import QAction, QCursor -from PyQt6.QtWidgets import ( - QLabel, QDialog, - QTableView, QWidget, QLineEdit, QGridLayout, QComboBox, QPushButton, QDialogButtonBox, QDateEdit, QMenu, - QDoubleSpinBox, QSpinBox, QCheckBox, QTextEdit, QVBoxLayout, QHBoxLayout -) -from pandas import DataFrame -from backend import db -from tools import check_object_in_manager -from .omni_search import SearchBox -from frontend.widgets.submission_table import pandasModel - -logger = logging.getLogger(f"submissions.{__name__}") - - -class ManagerWindow(QDialog): - """ - Initially this is a window to manage Organization Contacts, but hope to abstract it more later. - """ - - def __init__(self, parent, - extras: List[str], - instance: Any | None = None, - object_type: Any | None = None, - manager: Any | None = None, - add_edit: Literal['add', 'edit'] = 'edit', - **kwargs): - super().__init__(parent) - # NOTE: Should I pass in an instance? - self.instance = instance - # logger.debug(f"Setting instance: {self.instance}") - if not self.instance: - self.class_object = self.original_type = object_type - else: - self.class_object = self.original_type = self.instance.__class__ - self.add_edit = add_edit - if manager is None: - try: - self.manager = self.parent().omni_object - except AttributeError: - self.manager = None - else: - self.manager = manager - # logger.debug(f"Manager: {manager}") - self.extras = extras - self.context = kwargs - self.layout = QGridLayout(self) - QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel - self.buttonBox = QDialogButtonBox(QBtn) - self.buttonBox.accepted.connect(self.accept) - self.buttonBox.rejected.connect(self.reject) - self.setMinimumSize(600, 600) - sub_classes = ["Any"] + [cls.__name__ for cls in self.class_object.__subclasses__()] - if len(sub_classes) > 1: - self.sub_class = QComboBox(self) - self.sub_class.setObjectName("sub_class") - self.sub_class.addItems(sub_classes) - self.sub_class.currentTextChanged.connect(self.update_options) - self.sub_class.setEditable(False) - self.sub_class.setMinimumWidth(self.minimumWidth()) - self.layout.addWidget(self.sub_class, 0, 0) - else: - self.sub_class = None - self.update_instance(initial=True) - if self.add_edit == "edit": - self.options = QComboBox(self) - self.options.setObjectName("options") - self.update_options() - else: - self.update_data() - self.setLayout(self.layout) - self.setWindowTitle(f"Manage {self.class_object.__name__} - Manager: {self.manager}") - - def update_options(self) -> None: - """ - Changes form inputs based on sample type - """ - # logger.debug(f"Instance: {self.instance}") - if self.sub_class: - self.class_object = getattr(db, self.sub_class.currentText()) - # logger.debug(f"From update options, managers: {self.managers}") - try: - query_kwargs = {self.parent().instance.query_alias: self.parent().instance} - except AttributeError as e: - # logger.debug(f"Couldn't set query kwargs due to: {e}") - query_kwargs = {} - # logger.debug(f"Query kwargs: {query_kwargs}") - # logger.debug(f"self.class_object: {self.class_object}") - options = [item.name for item in self.class_object.query(**query_kwargs)] - if self.instance: - try: - inserter = options.pop(options.index(self.instance.name)) - except ValueError: - inserter = self.instance.name - options.insert(0, inserter) - self.options.clear() - self.options.addItems(options) - self.options.setEditable(False) - self.options.setMinimumWidth(self.minimumWidth()) - self.layout.addWidget(self.options, 1, 0, 1, 1) - self.add_button = QPushButton("Add New") - self.layout.addWidget(self.add_button, 1, 1, 1, 1) - self.add_button.clicked.connect(self.add_new) - self.options.currentTextChanged.connect(self.update_instance) - # logger.debug(f"Instance: {self.instance}") - self.update_data() - - def update_instance(self, initial: bool = False) -> None: - """ - Gets the proper instance of this object's class object. - - Args: - initial (bool): Whether this is the initial creation of this object. - - Returns: - None - """ - if self.add_edit == "edit" or initial: - try: - # logger.debug(f"Querying with {self.options.currentText()}") - self.instance = self.class_object.query(name=self.options.currentText(), limit=1) - except AttributeError: - pass - # logger.debug(f"Instance: {self.instance}") - if not self.instance: - logger.warning(f"Instance not found, creating blank instance.") - self.instance = self.class_object() - # logger.debug(f"self.instance: {self.instance}") - if issubclass(self.instance.__class__, db.BaseClass): - self.omni_object = self.instance.to_omni(expand=True) - else: - self.omni_object = self.instance - # logger.debug(f"Created omni_object: {self.omni_object.__dict__}") - self.update_data() - - def update_data(self) -> None: - """ - Performs updating of widgets on first procedure and after options change. - - Returns: - None - """ - # NOTE: Remove all old widgets. - deletes = [item for item in self.findChildren(EditProperty)] + \ - [item for item in self.findChildren(EditRelationship)] + \ - [item for item in self.findChildren(QDialogButtonBox)] - for item in deletes: - item.setParent(None) - logger.debug(f"Self.omni_object: {self.omni_object}") - fields = self.omni_object.__class__.model_fields - for key, info in fields.items(): - # logger.debug(f"Attempting to set {key}, {info} widget") - try: - value = getattr(self.omni_object, key) - except AttributeError: - value = None - # logger.debug(f"Got value {value} for key {key}") - match info.description: - # NOTE: ColumnProperties will be directly edited. - case "property": - # NOTE: field.property.expression.type gives db column type eg. STRING or TIMESTAMP - # logger.debug(f"Creating property widget with value: {value}") - widget = EditProperty(self, key=key, column_type=info, value=value) - # NOTE: RelationshipDeclareds will be given a list of existing related objects. - case "relationship": - # NOTE: field.comparator.class_object.class_ gives the relationship class - widget = EditRelationship(self, key=key, class_object=info.title, value=value) - case _: - continue - if widget: - self.layout.addWidget(widget, self.layout.rowCount(), 0, 1, 2) - # NOTE: Add OK|Cancel to bottom of dialog. - self.layout.addWidget(self.buttonBox, self.layout.rowCount(), 0, 1, 2) - - def parse_form(self) -> Any: - """ - Returns the instance associated with this window. - - Returns: - Any: The instance with updated fields. - """ - # TODO: Need Relationship property here too? - results = [item.parse_form() for item in self.findChildren(EditProperty)] - for result in results: - # logger.debug(f"Incoming property result: {result}") - setattr(self.omni_object, result['field'], result['value']) - # NOTE: Getting 'None' back here. - # logger.debug(f"Set result: {getattr(self.instance, result['field'])}") - results = [item.parse_form() for item in self.findChildren(EditRelationship)] - for result in results: - # logger.debug(f"Incoming relationship result: {result}") - setattr(self.omni_object, result['field'], result['value']) - # logger.debug(f"Set result: {getattr(self.omni_object, result['field'])}") - # logger.debug(f"Instance coming from parsed form: {self.omni_object.__dict__}") - return self.omni_object - - def add_new(self) -> None: - """ - Creates a new instance of this object's class object. - - Returns: - None - """ - new_instance = self.class_object() - self.instance = new_instance - self.update_options() - - -class EditProperty(QWidget): - """ - Class to manage info items of SQL objects. - """ - def __init__(self, parent: ManagerWindow, key: str, column_type: Any, value): - super().__init__(parent) - self.label = QLabel(key.title().replace("_", " ")) - self.layout = QGridLayout() - self.layout.addWidget(self.label, 0, 0, 1, 1) - self.setObjectName(key) - # logger.debug(f"Column type for {key}: {type(column_type.default)}") - match column_type.default: - case str(): - self.widget = QLineEdit(self) - self.widget.setText(value) - case bool(): - if isinstance(column_type.default, bool): - self.widget = QCheckBox() - self.widget.setChecked(value) - else: - if value is None: - value = 0 - self.widget = QSpinBox() - self.widget.setMaximum(1) - self.widget.setValue(value) - case float(): - if not value: - value = 0.0 - self.widget = QDoubleSpinBox() - self.widget.setMaximum(999.99) - self.widget.setValue(value) - case datetime(): - self.widget = QDateEdit(self) - self.widget.setDate(value) - case timedelta(): - self.widget = QSpinBox() - self.widget.setMaximum(9999) - self.widget.setToolTip("This time interval is measured in days.") - self.widget.setValue(value.days) - case dict(): - self.widget = JsonEditButton(parent=self, key=key, value=value) - case bytes(): - self.widget = QLabel("BLOB Under construction") - case _: - self.widget = None - self.layout.addWidget(self.widget, 0, 1, 1, 3) - self.setLayout(self.layout) - - def parse_form(self) -> dict: - """ - Gets values from this EditProperty form. - - Returns: - dict: Dictionary of values. - """ - # logger.debug(f"Parsing widget {self.objectName()}: {type(self.widget)}") - match self.widget: - case QLineEdit(): - value = self.widget.text() - case QDateEdit(): - value = self.widget.date() - case QSpinBox() | QDoubleSpinBox(): - value = self.widget.value() - case QCheckBox(): - value = self.widget.isChecked() - case JsonEditButton(): - value = self.widget.data - case _: - value = None - return dict(field=self.objectName(), value=value) - - -class EditRelationship(QWidget): - - def __init__(self, parent, key: str, class_object: Any, value): - from backend.db import models - super().__init__(parent) - self.class_object = getattr(models, class_object) - # logger.debug(f"Attempt value: {value}") - # logger.debug(f"Class object: {self.class_object}") - self.setParent(parent) - # logger.debug(f"Edit relationship class_object: {self.class_object}") - self.label = QLabel(key.title().replace("_", " ")) - self.setObjectName(key) #: key is the name of the relationship this represents - # logger.debug(f"Checking relationship for {self.parent().class_object}: {key}") - self.relationship = getattr(self.parent().class_object, key) - self.widget = QTableView() - self.add_button = QPushButton("Add New") - self.add_button.clicked.connect(self.add_new) - self.existing_button = QPushButton("Add Existing") - self.existing_button.clicked.connect(self.add_existing) - if not isinstance(value, list): - if value not in [None, ""]: - value = [value] - else: - value = [] - self.data = value - # logger.debug(f"Set data: {self.data}") - # logger.debug(f"Parent manager: {self.parent().manager}") - checked_manager, is_primary = check_object_in_manager(self.parent().manager, self.objectName()) - if checked_manager: - if not self.data: - self.data = [checked_manager] - try: - # logger.debug(f"Relationship {key} uses list: {self.relationship.property.uselist}") - check = not self.relationship.property.uselist and len(self.data) >= 1 - except AttributeError: - check = True - if check: - self.add_button.setEnabled(False) - self.existing_button.setEnabled(False) - if is_primary: - self.widget.setEnabled(False) - else: - self.add_button.setEnabled(True) - self.existing_button.setEnabled(True) - if is_primary: - self.widget.setEnabled(True) - self.layout = QGridLayout() - self.layout.addWidget(self.label, 0, 0, 1, 5) - self.layout.addWidget(self.widget, 1, 0, 1, 8) - self.layout.addWidget(self.add_button, 0, 6, 1, 1, alignment=Qt.AlignmentFlag.AlignRight) - self.layout.addWidget(self.existing_button, 0, 7, 1, 1, alignment=Qt.AlignmentFlag.AlignRight) - self.setLayout(self.layout) - self.set_data() - - def update_buttons(self) -> None: - """ - Enables/disables buttons based on whether property is a list and has data. - - Returns: - None - """ - if not self.relationship.property.uselist and len(self.data) >= 1: - # logger.debug(f"Property {self.relationship} doesn't use list and data is of length: {len(self.data)}") - self.add_button.setEnabled(False) - self.existing_button.setEnabled(False) - else: - self.add_button.setEnabled(True) - self.existing_button.setEnabled(True) - - def parse_row(self, x: QModelIndex) -> None: - """ - Gets instance of class object based on gui row values. - - Args: - x (QModelIndex): Row object. - - Returns: - None - """ - context = {item: x.sibling(x.row(), self.df.columns.get_loc(item)).data() for item in self.df.columns} - # logger.debug(f"Context: {pformat(context)}") - try: - object = self.class_object.query(**context) - except KeyError: - object = None - self.widget.doubleClicked.disconnect() - self.add_new(instance=object) - - def add_new(self, instance: Any = None, add_edit: Literal["add", "edit"] = "add"): - """ - Allows addition or new instance or edit of existing one. - - Args: - instance (Any): instance to be added - add_edit (Literal["add", "edit"]): Whether this will be a new or existing instance. - - Returns: - - """ - if add_edit == "edit": - logger.info(f"\n\nEditing instance: {instance.__dict__}\n\n") - # NOTE: if an existing instance is not being edited, create a new instance - if not instance: - # logger.debug(f"Creating new instance of {self.class_object}") - instance = self.class_object() - # logger.debug(f"Creating manager window for {instance}") - manager = self.parent().manager - # logger.debug(f"Managers going into add new: {managers}") - dlg = ManagerWindow(self.parent(), instance=instance, extras=[], manager=manager, add_edit=add_edit) - if dlg.exec(): - new_instance = dlg.parse_form() - # logger.debug(f"New instance: {pformat(new_instance.__dict__)}") - # NOTE: Somewhere between this and the next logger, I'm losing the uses data. - if add_edit == "add": - # logger.debug("Setting as new object") - self.parent().omni_object.__setattr__(self.objectName(), new_instance) - else: - # logger.debug("Updating dictionary") - obj = getattr(self.parent().omni_object, self.objectName()) - if isinstance(obj, list): - # logger.debug(f"This is a list") - try: - # NOTE: Okay, this will not work for editing, since by definition not all attributes will line up. - # NOTE: Set items to search by in the Omni object itself? - obj = next((item for item in obj if item.check_all_attributes(new_instance.__dict__))) - except StopIteration: - logger.error(f"Couldn't find object in list.") - return - # logger.debug(f"Updating \n{pformat(obj)} with \n{pformat(new_instance.__dict__)}") - obj.__dict__.update(new_instance.__dict__) - # logger.debug(f"Final instance: {pformat(self.parent().omni_object.__dict__)}") - # NOTE: somewhere in the update_data I'm losing changes. - self.parent().update_data() - - def add_existing(self): - """ - Method to add association already existing in the database. - - Returns: - None - """ - dlg = SearchBox(self, object_type=self.class_object, returnable=True, extras=[]) - if dlg.exec(): - rows = dlg.return_selected_rows() - for row in rows: - # logger.debug(f"Querying with {row}") - instance = self.class_object.query(**row) - # NOTE: My custom __setattr__ should take care of any list problems. - if isinstance(instance, list): - instance = instance[0] - self.parent().omni_object.__setattr__(self.objectName(), instance.to_omni()) - self.parent().update_data() - - def set_data(self) -> None: - """ - sets data in model - """ - logger.debug(f"Self.data: {self.data}") - try: - records = [item.dataframe_dict for item in self.data] - except AttributeError as e: - logger.error(e) - records = [] - logger.debug(f"Records: {records}") - self.df = DataFrame.from_records(records) - try: - self.columns_of_interest = [dict(name=item, column=self.df.columns.get_loc(item)) for item in self.extras] - except (KeyError, AttributeError): - self.columns_of_interest = [] - try: - self.df['id'] = self.df['id'].apply(str) - self.df['id'] = self.df['id'].str.zfill(4) - except KeyError as e: - logger.error(f"Could not alter id to string due to KeyError: {e}") - proxy_model = QSortFilterProxyModel() - proxy_model.setSourceModel(pandasModel(self.df)) - self.widget.setModel(proxy_model) - self.widget.resizeColumnsToContents() - self.widget.resizeRowsToContents() - self.widget.setSortingEnabled(True) - self.widget.doubleClicked.connect(self.parse_row) - - def contextMenuEvent(self, event): - """ - Creates actions for right click menu events. - - Args: - event (_type_): the item of interest - """ - if not self.widget.isEnabled(): - logger.warning(f"{self.objectName()} is disabled.") - return - id = self.widget.selectionModel().currentIndex() - # logger.debug(f"Row id: {id.row()}") - # NOTE: the overly complicated {column_name: row_value} dictionary construction - row_data = {self.df.columns[column]: self.widget.model().index(id.row(), column).data() for column in - range(self.widget.model().columnCount())} - # logger.debug(f"Row data: {row_data}") - # logger.debug(f"Attempting to grab {self.objectName()} from {self.parent().omni_object}") - object = getattr(self.parent().omni_object, self.objectName()) - # logger.debug(f"Initial object: {object}") - if isinstance(object, list): - try: - object = next((item for item in object if item.check_all_attributes(attributes=row_data))) - except StopIteration: - logger.warning(f"Failed to find all attributes equal, getting row {id.row()}") - object = object[id.row()] - object.instance_object = object.to_sql() - # logger.debug(f"Object of interest: {pformat(object.__dict__)}") - self.menu = QMenu(self) - try: - remove_action = QAction(f"Remove {object.name}", self) - except AttributeError: - remove_action = QAction(f"Remove object", self) - remove_action.triggered.connect(lambda: self.remove_item(object=object)) - self.menu.addAction(remove_action) - try: - edit_action = QAction(f"Edit {object.name}", self) - except AttributeError: - edit_action = QAction(f"Edit object", self) - edit_action.triggered.connect( - lambda: self.add_new(instance=object.instance_object, add_edit="edit")) - self.menu.addAction(edit_action) - self.menu.popup(QCursor.pos()) - - def remove_item(self, object): - """ - Remove a relationship from a list. - - Args: - object (Any): Object to be removed. - - Returns: - None - """ - # logger.debug(f"Attempting to remove {object} from {self.parent().instance.__dict__}") - editor = getattr(self.parent().omni_object, self.objectName().lower()) - # logger.debug(f"Editor: {editor}") - try: - # logger.debug(f"Using remove technique") - editor.remove(object) - except AttributeError as e: - logger.error(f"Remove failed using set to None for {self.objectName().lower()}.") - setattr(self.parent().omni_object, self.objectName().lower(), None) - except ValueError as e: - logger.error(f"Remove failed for {self.objectName().lower()} due to {e}.") - # logger.debug(f"Setting {self.objectName()} to {editor}") - setattr(self.parent().omni_object, self.objectName().lower(), editor) - # logger.debug(f"After set: {getattr(self.parent().omni_object, self.objectName().lower())}") - self.set_data() - self.update_buttons() - - def parse_form(self) -> dict: - """ - Gets values from this EditRelationship form. - - Returns: - dict: Dictionary of values. - """ - # logger.debug(f"Returning parsed form data from {self.objectName()}: {self.data}") - try: - check = self.relationship.property.uselist - except AttributeError: - check = False - if check and isinstance(self.data, list): - try: - output_data = self.data[0] - except IndexError: - output_data = [] - else: - output_data = self.data - return dict(field=self.objectName(), value=output_data) - - -class JsonEditButton(QWidget): - - def __init__(self, parent, key: str, value: str = ""): - super().__init__(parent) - # logger.debug(f"Setting jsonedit data to: {value}") - self.data = value - self.setParent(parent) - self.setObjectName(key) - self.addButton = QPushButton("Add Entry", parent=self) - self.addButton.clicked.connect(self.add_to_json) - self.viewButton = QPushButton("View >>>", parent=self) - self.viewButton.clicked.connect(self.toggle_textedit) - self.layout = QGridLayout() - self.layout.addWidget(self.addButton, 0, 0) - self.layout.addWidget(self.viewButton, 0, 1) - self.setLayout(self.layout) - self.edit_box = LargeTextEdit(parent=self, key=key) - self.parent().parent().layout.addWidget(self.edit_box, 1, self.parent().parent().layout.columnCount(), - self.parent().parent().layout.rowCount() - 1, 1) - self.edit_box.setVisible(False) - self.edit_box.widget.textChanged.connect(self.set_json_to_text) - - def set_json_to_text(self): - """ - Sets this object's data to text. - - Returns: - None - """ - # logger.debug(self.edit_box.widget.toPlainText()) - text = self.edit_box.widget.toPlainText() - try: - jsoner = json.loads(text) - except JSONDecodeError: - jsoner = None - if jsoner: - self.data = jsoner - - def add_to_json(self): - """ - Sets data to jsonedit text. - - Returns: - None - """ - jsonedit = JsonEditScreen(parent=self, parameter=self.objectName()) - if jsonedit.exec(): - data = jsonedit.parse_form() - # logger.debug(f"Data: {pformat(data)}") - self.data = data - - def toggle_textedit(self): - """ - Shows/hides text box. - - Returns: - None - """ - self.edit_box.setVisible(not self.edit_box.isVisible()) - # logger.debug(f"Data: {data}") - data = json.dumps(self.data, indent=4) - self.edit_box.widget.setText(data) - - -class JsonEditScreen(QDialog): - - def __init__(self, parent, parameter: str): - super().__init__(parent) - self.class_obj = parent.parent().parent().class_object - self.layout = QGridLayout() - # logger.debug(f"Parameter: {parameter}") - self.setWindowTitle(parameter) - try: - self.json_field = getattr(self.class_obj, f"{parameter}_json_edit_fields") - except AttributeError: - try: - self.json_field = self.class_obj.json_edit_fields - except AttributeError: - logger.error(f"No json fields to edit.") - return - match self.json_field: - case dict(): - for key, value in self.json_field.items(): - # logger.debug(f"Key: {key}, Value: {value}") - row = self.layout.rowCount() - self.layout.addWidget(QLabel(key), row, 0) - match value: - case "int": - self.widget = QSpinBox() - case "str": - self.widget = QLineEdit() - case dict(): - self.widget = DictionaryJsonSubEdit(parent=self, key=key, dic=value) - case _: - continue - self.widget.setObjectName(key) - self.layout.addWidget(self.widget, row, 1) - QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel - self.buttonBox = QDialogButtonBox(QBtn) - self.buttonBox.accepted.connect(self.accept) - self.buttonBox.rejected.connect(self.reject) - self.layout.addWidget(self.buttonBox, self.layout.rowCount(), 0, 1, 2) - self.setLayout(self.layout) - - def parse_form(self) -> list: - """ - Gets values from this Jsonedit form. - - Returns: - list: List of values. - """ - widgets = [item for item in self.findChildren(QWidget) if item.objectName() in self.json_field.keys()] - # logger.debug(f"Widgets: {widgets}") - # logger.debug(type(self.json_field)) - if isinstance(self.json_field, dict): - output = {} - elif isinstance(self.json_field, list): - output = [] - else: - raise ValueError(f"Inappropriate data type: {type(self.json_field)}") - for widget in widgets: - # logger.debug(f"JsonEditScreen Widget: {widget}") - key = widget.objectName() - match widget: - case QSpinBox(): - value = widget.value() - case QLineEdit(): - value = widget.text() - case DictionaryJsonSubEdit(): - value = widget.parse_form() - case _: - continue - if isinstance(self.json_field, dict): - output[key] = value - elif isinstance(self.json_field, list): - if isinstance(value, list): - output += value - else: - output.append(value) - else: - raise ValueError(f"Inappropriate data type: {type(self.json_field)}") - return output - - -class DictionaryJsonSubEdit(QWidget): - - def __init__(self, parent, key, dic: dict): - super().__init__(parent) - self.layout = QHBoxLayout() - self.setObjectName(key) - self.data = dic - for key, value in self.data.items(): - self.layout.addWidget(QLabel(key)) - match value: - case "int": - self.widget = QSpinBox() - case "str": - self.widget = QLineEdit() - case dict(): - self.widget = DictionaryJsonSubEdit(parent, key=key, dic=value) - self.widget.setObjectName(key) - self.layout.addWidget(self.widget) - self.setLayout(self.layout) - - def parse_form(self) -> dict: - """ - Gets values from this Jsonedit form. - - Returns: - list: List of values. - """ - widgets = [item for item in self.findChildren(QWidget) if item.objectName() in self.data.keys()] - # logger.debug(f"Widgets: {widgets}") - output = {} - for widget in widgets: - # logger.debug(f"DictionaryJsonSubEdit Widget: {widget}") - key = widget.objectName() - match widget: - case QSpinBox(): - value = widget.value() - case QLineEdit(): - value = widget.text() - case DictionaryJsonSubEdit(): - value = widget.parse_form() - case _: - continue - output[key] = value - return output - - -class LargeTextEdit(QWidget): - - def __init__(self, parent, key: str): - super().__init__(parent) - self.setParent(parent) - self.setObjectName(key) - self.widget = QTextEdit() - self.layout = QVBoxLayout() - self.layout.addWidget(self.widget) - self.setLayout(self.layout) +# """ +# Provides a screen for managing all attributes of a database object. +# """ +# import json, logging, sys +# from json.decoder import JSONDecodeError +# from datetime import datetime, timedelta +# from pprint import pformat +# from typing import Any, List, Literal +# from PyQt6.QtCore import QSortFilterProxyModel, Qt, QModelIndex +# from PyQt6.QtGui import QAction, QCursor +# from PyQt6.QtWidgets import ( +# QLabel, QDialog, +# QTableView, QWidget, QLineEdit, QGridLayout, QComboBox, QPushButton, QDialogButtonBox, QDateEdit, QMenu, +# QDoubleSpinBox, QSpinBox, QCheckBox, QTextEdit, QVBoxLayout, QHBoxLayout +# ) +# from pandas import DataFrame +# from backend import db +# from tools import check_object_in_manager +# from .omni_search import SearchBox +# from frontend.widgets.submission_table import pandasModel +# +# logger = logging.getLogger(f"submissions.{__name__}") +# +# +# class ManagerWindow(QDialog): +# """ +# Initially this is a window to manage Organization Contacts, but hope to abstract it more later. +# """ +# +# def __init__(self, parent, +# extras: List[str], +# instance: Any | None = None, +# object_type: Any | None = None, +# manager: Any | None = None, +# add_edit: Literal['add', 'edit'] = 'edit', +# **kwargs): +# super().__init__(parent) +# # NOTE: Should I pass in an instance? +# self.instance = instance +# # logger.debug(f"Setting instance: {self.instance}") +# if not self.instance: +# self.class_object = self.original_type = object_type +# else: +# self.class_object = self.original_type = self.instance.__class__ +# self.add_edit = add_edit +# if manager is None: +# try: +# self.manager = self.parent().omni_object +# except AttributeError: +# self.manager = None +# else: +# self.manager = manager +# # logger.debug(f"Manager: {manager}") +# self.extras = extras +# self.context = kwargs +# self.layout = QGridLayout(self) +# QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel +# self.buttonBox = QDialogButtonBox(QBtn) +# self.buttonBox.accepted.connect(self.accept) +# self.buttonBox.rejected.connect(self.reject) +# self.setMinimumSize(600, 600) +# sub_classes = ["Any"] + [cls.__name__ for cls in self.class_object.__subclasses__()] +# if len(sub_classes) > 1: +# self.sub_class = QComboBox(self) +# self.sub_class.setObjectName("sub_class") +# self.sub_class.addItems(sub_classes) +# self.sub_class.currentTextChanged.connect(self.update_options) +# self.sub_class.setEditable(False) +# self.sub_class.setMinimumWidth(self.minimumWidth()) +# self.layout.addWidget(self.sub_class, 0, 0) +# else: +# self.sub_class = None +# self.update_instance(initial=True) +# if self.add_edit == "edit": +# self.options = QComboBox(self) +# self.options.setObjectName("options") +# self.update_options() +# else: +# self.update_data() +# self.setLayout(self.layout) +# self.setWindowTitle(f"Manage {self.class_object.__name__} - Manager: {self.manager}") +# +# def update_options(self) -> None: +# """ +# Changes form inputs based on sample type +# """ +# # logger.debug(f"Instance: {self.instance}") +# if self.sub_class: +# self.class_object = getattr(db, self.sub_class.currentText()) +# # logger.debug(f"From update options, managers: {self.managers}") +# try: +# query_kwargs = {self.parent().instance.query_alias: self.parent().instance} +# except AttributeError as e: +# # logger.debug(f"Couldn't set query kwargs due to: {e}") +# query_kwargs = {} +# # logger.debug(f"Query kwargs: {query_kwargs}") +# # logger.debug(f"self.class_object: {self.class_object}") +# options = [item.name for item in self.class_object.query(**query_kwargs)] +# if self.instance: +# try: +# inserter = options.pop(options.index(self.instance.name)) +# except ValueError: +# inserter = self.instance.name +# options.insert(0, inserter) +# self.options.clear() +# self.options.addItems(options) +# self.options.setEditable(False) +# self.options.setMinimumWidth(self.minimumWidth()) +# self.layout.addWidget(self.options, 1, 0, 1, 1) +# self.add_button = QPushButton("Add New") +# self.layout.addWidget(self.add_button, 1, 1, 1, 1) +# self.add_button.clicked.connect(self.add_new) +# self.options.currentTextChanged.connect(self.update_instance) +# # logger.debug(f"Instance: {self.instance}") +# self.update_data() +# +# def update_instance(self, initial: bool = False) -> None: +# """ +# Gets the proper instance of this object's class object. +# +# Args: +# initial (bool): Whether this is the initial creation of this object. +# +# Returns: +# None +# """ +# if self.add_edit == "edit" or initial: +# try: +# # logger.debug(f"Querying with {self.options.currentText()}") +# self.instance = self.class_object.query(name=self.options.currentText(), limit=1) +# except AttributeError: +# pass +# # logger.debug(f"Instance: {self.instance}") +# if not self.instance: +# logger.warning(f"Instance not found, creating blank instance.") +# self.instance = self.class_object() +# # logger.debug(f"self.instance: {self.instance}") +# if issubclass(self.instance.__class__, db.BaseClass): +# self.omni_object = self.instance.to_omni(expand=True) +# else: +# self.omni_object = self.instance +# # logger.debug(f"Created omni_object: {self.omni_object.__dict__}") +# self.update_data() +# +# def update_data(self) -> None: +# """ +# Performs updating of widgets on first procedure and after options change. +# +# Returns: +# None +# """ +# # NOTE: Remove all old widgets. +# deletes = [item for item in self.findChildren(EditProperty)] + \ +# [item for item in self.findChildren(EditRelationship)] + \ +# [item for item in self.findChildren(QDialogButtonBox)] +# for item in deletes: +# item.setParent(None) +# logger.debug(f"Self.omni_object: {self.omni_object}") +# fields = self.omni_object.__class__.model_fields +# for key, info in fields.items(): +# # logger.debug(f"Attempting to set {key}, {info} widget") +# try: +# value = getattr(self.omni_object, key) +# except AttributeError: +# value = None +# # logger.debug(f"Got value {value} for key {key}") +# match info.description: +# # NOTE: ColumnProperties will be directly edited. +# case "property": +# # NOTE: field.property.expression.type gives db column type eg. STRING or TIMESTAMP +# # logger.debug(f"Creating property widget with value: {value}") +# widget = EditProperty(self, key=key, column_type=info, value=value) +# # NOTE: RelationshipDeclareds will be given a list of existing related objects. +# case "relationship": +# # NOTE: field.comparator.class_object.class_ gives the relationship class +# widget = EditRelationship(self, key=key, class_object=info.title, value=value) +# case _: +# continue +# if widget: +# self.layout.addWidget(widget, self.layout.rowCount(), 0, 1, 2) +# # NOTE: Add OK|Cancel to bottom of dialog. +# self.layout.addWidget(self.buttonBox, self.layout.rowCount(), 0, 1, 2) +# +# def parse_form(self) -> Any: +# """ +# Returns the instance associated with this window. +# +# Returns: +# Any: The instance with updated fields. +# """ +# # TODO: Need Relationship property here too? +# results = [item.parse_form() for item in self.findChildren(EditProperty)] +# for result in results: +# # logger.debug(f"Incoming property result: {result}") +# setattr(self.omni_object, result['field'], result['value']) +# # NOTE: Getting 'None' back here. +# # logger.debug(f"Set result: {getattr(self.instance, result['field'])}") +# results = [item.parse_form() for item in self.findChildren(EditRelationship)] +# for result in results: +# # logger.debug(f"Incoming relationship result: {result}") +# setattr(self.omni_object, result['field'], result['value']) +# # logger.debug(f"Set result: {getattr(self.omni_object, result['field'])}") +# # logger.debug(f"Instance coming from parsed form: {self.omni_object.__dict__}") +# return self.omni_object +# +# def add_new(self) -> None: +# """ +# Creates a new instance of this object's class object. +# +# Returns: +# None +# """ +# new_instance = self.class_object() +# self.instance = new_instance +# self.update_options() +# +# +# class EditProperty(QWidget): +# """ +# Class to manage info items of SQL objects. +# """ +# def __init__(self, parent: ManagerWindow, key: str, column_type: Any, value): +# super().__init__(parent) +# self.label = QLabel(key.title().replace("_", " ")) +# self.layout = QGridLayout() +# self.layout.addWidget(self.label, 0, 0, 1, 1) +# self.setObjectName(key) +# # logger.debug(f"Column type for {key}: {type(column_type.default)}") +# match column_type.default: +# case str(): +# self.widget = QLineEdit(self) +# self.widget.setText(value) +# case bool(): +# if isinstance(column_type.default, bool): +# self.widget = QCheckBox() +# self.widget.setChecked(value) +# else: +# if value is None: +# value = 0 +# self.widget = QSpinBox() +# self.widget.setMaximum(1) +# self.widget.setValue(value) +# case float(): +# if not value: +# value = 0.0 +# self.widget = QDoubleSpinBox() +# self.widget.setMaximum(999.99) +# self.widget.setValue(value) +# case datetime(): +# self.widget = QDateEdit(self) +# self.widget.setDate(value) +# case timedelta(): +# self.widget = QSpinBox() +# self.widget.setMaximum(9999) +# self.widget.setToolTip("This time interval is measured in days.") +# self.widget.setValue(value.days) +# case dict(): +# self.widget = JsonEditButton(parent=self, key=key, value=value) +# case bytes(): +# self.widget = QLabel("BLOB Under construction") +# case _: +# self.widget = None +# self.layout.addWidget(self.widget, 0, 1, 1, 3) +# self.setLayout(self.layout) +# +# def parse_form(self) -> dict: +# """ +# Gets values from this EditProperty form. +# +# Returns: +# dict: Dictionary of values. +# """ +# # logger.debug(f"Parsing widget {self.objectName()}: {type(self.widget)}") +# match self.widget: +# case QLineEdit(): +# value = self.widget.text() +# case QDateEdit(): +# value = self.widget.date() +# case QSpinBox() | QDoubleSpinBox(): +# value = self.widget.value() +# case QCheckBox(): +# value = self.widget.isChecked() +# case JsonEditButton(): +# value = self.widget.data +# case _: +# value = None +# return dict(field=self.objectName(), value=value) +# +# +# class EditRelationship(QWidget): +# +# def __init__(self, parent, key: str, class_object: Any, value): +# from backend.db import models +# super().__init__(parent) +# self.class_object = getattr(models, class_object) +# # logger.debug(f"Attempt value: {value}") +# # logger.debug(f"Class object: {self.class_object}") +# self.setParent(parent) +# # logger.debug(f"Edit relationship class_object: {self.class_object}") +# self.label = QLabel(key.title().replace("_", " ")) +# self.setObjectName(key) #: key is the name of the relationship this represents +# # logger.debug(f"Checking relationship for {self.parent().class_object}: {key}") +# self.relationship = getattr(self.parent().class_object, key) +# self.widget = QTableView() +# self.add_button = QPushButton("Add New") +# self.add_button.clicked.connect(self.add_new) +# self.existing_button = QPushButton("Add Existing") +# self.existing_button.clicked.connect(self.add_existing) +# if not isinstance(value, list): +# if value not in [None, ""]: +# value = [value] +# else: +# value = [] +# self.data = value +# # logger.debug(f"Set data: {self.data}") +# # logger.debug(f"Parent manager: {self.parent().manager}") +# checked_manager, is_primary = check_object_in_manager(self.parent().manager, self.objectName()) +# if checked_manager: +# if not self.data: +# self.data = [checked_manager] +# try: +# # logger.debug(f"Relationship {key} uses list: {self.relationship.property.uselist}") +# check = not self.relationship.property.uselist and len(self.data) >= 1 +# except AttributeError: +# check = True +# if check: +# self.add_button.setEnabled(False) +# self.existing_button.setEnabled(False) +# if is_primary: +# self.widget.setEnabled(False) +# else: +# self.add_button.setEnabled(True) +# self.existing_button.setEnabled(True) +# if is_primary: +# self.widget.setEnabled(True) +# self.layout = QGridLayout() +# self.layout.addWidget(self.label, 0, 0, 1, 5) +# self.layout.addWidget(self.widget, 1, 0, 1, 8) +# self.layout.addWidget(self.add_button, 0, 6, 1, 1, alignment=Qt.AlignmentFlag.AlignRight) +# self.layout.addWidget(self.existing_button, 0, 7, 1, 1, alignment=Qt.AlignmentFlag.AlignRight) +# self.setLayout(self.layout) +# self.set_data() +# +# def update_buttons(self) -> None: +# """ +# Enables/disables buttons based on whether property is a list and has data. +# +# Returns: +# None +# """ +# if not self.relationship.property.uselist and len(self.data) >= 1: +# # logger.debug(f"Property {self.relationship} doesn't use list and data is of length: {len(self.data)}") +# self.add_button.setEnabled(False) +# self.existing_button.setEnabled(False) +# else: +# self.add_button.setEnabled(True) +# self.existing_button.setEnabled(True) +# +# def parse_row(self, x: QModelIndex) -> None: +# """ +# Gets instance of class object based on gui row values. +# +# Args: +# x (QModelIndex): Row object. +# +# Returns: +# None +# """ +# context = {item: x.sibling(x.row(), self.df.columns.get_loc(item)).data() for item in self.df.columns} +# # logger.debug(f"Context: {pformat(context)}") +# try: +# object = self.class_object.query(**context) +# except KeyError: +# object = None +# self.widget.doubleClicked.disconnect() +# self.add_new(instance=object) +# +# def add_new(self, instance: Any = None, add_edit: Literal["add", "edit"] = "add"): +# """ +# Allows addition or new instance or edit of existing one. +# +# Args: +# instance (Any): instance to be added +# add_edit (Literal["add", "edit"]): Whether this will be a new or existing instance. +# +# Returns: +# +# """ +# if add_edit == "edit": +# logger.info(f"\n\nEditing instance: {instance.__dict__}\n\n") +# # NOTE: if an existing instance is not being edited, create a new instance +# if not instance: +# # logger.debug(f"Creating new instance of {self.class_object}") +# instance = self.class_object() +# # logger.debug(f"Creating manager window for {instance}") +# manager = self.parent().manager +# # logger.debug(f"Managers going into add new: {managers}") +# dlg = ManagerWindow(self.parent(), instance=instance, extras=[], manager=manager, add_edit=add_edit) +# if dlg.exec(): +# new_instance = dlg.parse_form() +# # logger.debug(f"New instance: {pformat(new_instance.__dict__)}") +# # NOTE: Somewhere between this and the next logger, I'm losing the uses data. +# if add_edit == "add": +# # logger.debug("Setting as new object") +# self.parent().omni_object.__setattr__(self.objectName(), new_instance) +# else: +# # logger.debug("Updating dictionary") +# obj = getattr(self.parent().omni_object, self.objectName()) +# if isinstance(obj, list): +# # logger.debug(f"This is a list") +# try: +# # NOTE: Okay, this will not work for editing, since by definition not all attributes will line up. +# # NOTE: Set items to search by in the Omni object itself? +# obj = next((item for item in obj if item.check_all_attributes(new_instance.__dict__))) +# except StopIteration: +# logger.error(f"Couldn't find object in list.") +# return +# # logger.debug(f"Updating \n{pformat(obj)} with \n{pformat(new_instance.__dict__)}") +# obj.__dict__.update(new_instance.__dict__) +# # logger.debug(f"Final instance: {pformat(self.parent().omni_object.__dict__)}") +# # NOTE: somewhere in the update_data I'm losing changes. +# self.parent().update_data() +# +# def add_existing(self): +# """ +# Method to add association already existing in the database. +# +# Returns: +# None +# """ +# dlg = SearchBox(self, object_type=self.class_object, returnable=True, extras=[]) +# if dlg.exec(): +# rows = dlg.return_selected_rows() +# for row in rows: +# # logger.debug(f"Querying with {row}") +# instance = self.class_object.query(**row) +# # NOTE: My custom __setattr__ should take care of any list problems. +# if isinstance(instance, list): +# instance = instance[0] +# self.parent().omni_object.__setattr__(self.objectName(), instance.to_omni()) +# self.parent().update_data() +# +# def set_data(self) -> None: +# """ +# sets data in model +# """ +# logger.debug(f"Self.data: {self.data}") +# try: +# records = [item.dataframe_dict for item in self.data] +# except AttributeError as e: +# logger.error(e) +# records = [] +# logger.debug(f"Records: {records}") +# self.df = DataFrame.from_records(records) +# try: +# self.columns_of_interest = [dict(name=item, column=self.df.columns.get_loc(item)) for item in self.extras] +# except (KeyError, AttributeError): +# self.columns_of_interest = [] +# try: +# self.df['id'] = self.df['id'].apply(str) +# self.df['id'] = self.df['id'].str.zfill(4) +# except KeyError as e: +# logger.error(f"Could not alter id to string due to KeyError: {e}") +# proxy_model = QSortFilterProxyModel() +# proxy_model.setSourceModel(pandasModel(self.df)) +# self.widget.setModel(proxy_model) +# self.widget.resizeColumnsToContents() +# self.widget.resizeRowsToContents() +# self.widget.setSortingEnabled(True) +# self.widget.doubleClicked.connect(self.parse_row) +# +# def contextMenuEvent(self, event): +# """ +# Creates actions for right click menu events. +# +# Args: +# event (_type_): the item of interest +# """ +# if not self.widget.isEnabled(): +# logger.warning(f"{self.objectName()} is disabled.") +# return +# id = self.widget.selectionModel().currentIndex() +# # logger.debug(f"Row id: {id.row()}") +# # NOTE: the overly complicated {column_name: row_value} dictionary construction +# row_data = {self.df.columns[column]: self.widget.model().index(id.row(), column).data() for column in +# range(self.widget.model().columnCount())} +# # logger.debug(f"Row data: {row_data}") +# # logger.debug(f"Attempting to grab {self.objectName()} from {self.parent().omni_object}") +# object = getattr(self.parent().omni_object, self.objectName()) +# # logger.debug(f"Initial object: {object}") +# if isinstance(object, list): +# try: +# object = next((item for item in object if item.check_all_attributes(attributes=row_data))) +# except StopIteration: +# logger.warning(f"Failed to find all attributes equal, getting row {id.row()}") +# object = object[id.row()] +# object.instance_object = object.to_sql() +# # logger.debug(f"Object of interest: {pformat(object.__dict__)}") +# self.menu = QMenu(self) +# try: +# remove_action = QAction(f"Remove {object.name}", self) +# except AttributeError: +# remove_action = QAction(f"Remove object", self) +# remove_action.triggered.connect(lambda: self.remove_item(object=object)) +# self.menu.addAction(remove_action) +# try: +# edit_action = QAction(f"Edit {object.name}", self) +# except AttributeError: +# edit_action = QAction(f"Edit object", self) +# edit_action.triggered.connect( +# lambda: self.add_new(instance=object.instance_object, add_edit="edit")) +# self.menu.addAction(edit_action) +# self.menu.popup(QCursor.pos()) +# +# def remove_item(self, object): +# """ +# Remove a relationship from a list. +# +# Args: +# object (Any): Object to be removed. +# +# Returns: +# None +# """ +# # logger.debug(f"Attempting to remove {object} from {self.parent().instance.__dict__}") +# editor = getattr(self.parent().omni_object, self.objectName().lower()) +# # logger.debug(f"Editor: {editor}") +# try: +# # logger.debug(f"Using remove technique") +# editor.remove(object) +# except AttributeError as e: +# logger.error(f"Remove failed using set to None for {self.objectName().lower()}.") +# setattr(self.parent().omni_object, self.objectName().lower(), None) +# except ValueError as e: +# logger.error(f"Remove failed for {self.objectName().lower()} due to {e}.") +# # logger.debug(f"Setting {self.objectName()} to {editor}") +# setattr(self.parent().omni_object, self.objectName().lower(), editor) +# # logger.debug(f"After set: {getattr(self.parent().omni_object, self.objectName().lower())}") +# self.set_data() +# self.update_buttons() +# +# def parse_form(self) -> dict: +# """ +# Gets values from this EditRelationship form. +# +# Returns: +# dict: Dictionary of values. +# """ +# # logger.debug(f"Returning parsed form data from {self.objectName()}: {self.data}") +# try: +# check = self.relationship.property.uselist +# except AttributeError: +# check = False +# if check and isinstance(self.data, list): +# try: +# output_data = self.data[0] +# except IndexError: +# output_data = [] +# else: +# output_data = self.data +# return dict(field=self.objectName(), value=output_data) +# +# +# class JsonEditButton(QWidget): +# +# def __init__(self, parent, key: str, value: str = ""): +# super().__init__(parent) +# # logger.debug(f"Setting jsonedit data to: {value}") +# self.data = value +# self.setParent(parent) +# self.setObjectName(key) +# self.addButton = QPushButton("Add Entry", parent=self) +# self.addButton.clicked.connect(self.add_to_json) +# self.viewButton = QPushButton("View >>>", parent=self) +# self.viewButton.clicked.connect(self.toggle_textedit) +# self.layout = QGridLayout() +# self.layout.addWidget(self.addButton, 0, 0) +# self.layout.addWidget(self.viewButton, 0, 1) +# self.setLayout(self.layout) +# self.edit_box = LargeTextEdit(parent=self, key=key) +# self.parent().parent().layout.addWidget(self.edit_box, 1, self.parent().parent().layout.columnCount(), +# self.parent().parent().layout.rowCount() - 1, 1) +# self.edit_box.setVisible(False) +# self.edit_box.widget.textChanged.connect(self.set_json_to_text) +# +# def set_json_to_text(self): +# """ +# Sets this object's data to text. +# +# Returns: +# None +# """ +# # logger.debug(self.edit_box.widget.toPlainText()) +# text = self.edit_box.widget.toPlainText() +# try: +# jsoner = json.loads(text) +# except JSONDecodeError: +# jsoner = None +# if jsoner: +# self.data = jsoner +# +# def add_to_json(self): +# """ +# Sets data to jsonedit text. +# +# Returns: +# None +# """ +# jsonedit = JsonEditScreen(parent=self, parameter=self.objectName()) +# if jsonedit.exec(): +# data = jsonedit.parse_form() +# # logger.debug(f"Data: {pformat(data)}") +# self.data = data +# +# def toggle_textedit(self): +# """ +# Shows/hides text box. +# +# Returns: +# None +# """ +# self.edit_box.setVisible(not self.edit_box.isVisible()) +# # logger.debug(f"Data: {data}") +# data = json.dumps(self.data, indent=4) +# self.edit_box.widget.setText(data) +# +# +# class JsonEditScreen(QDialog): +# +# def __init__(self, parent, parameter: str): +# super().__init__(parent) +# self.class_obj = parent.parent().parent().class_object +# self.layout = QGridLayout() +# # logger.debug(f"Parameter: {parameter}") +# self.setWindowTitle(parameter) +# try: +# self.json_field = getattr(self.class_obj, f"{parameter}_json_edit_fields") +# except AttributeError: +# try: +# self.json_field = self.class_obj.json_edit_fields +# except AttributeError: +# logger.error(f"No json fields to edit.") +# return +# match self.json_field: +# case dict(): +# for key, value in self.json_field.items(): +# # logger.debug(f"Key: {key}, Value: {value}") +# row = self.layout.rowCount() +# self.layout.addWidget(QLabel(key), row, 0) +# match value: +# case "int": +# self.widget = QSpinBox() +# case "str": +# self.widget = QLineEdit() +# case dict(): +# self.widget = DictionaryJsonSubEdit(parent=self, key=key, dic=value) +# case _: +# continue +# self.widget.setObjectName(key) +# self.layout.addWidget(self.widget, row, 1) +# QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel +# self.buttonBox = QDialogButtonBox(QBtn) +# self.buttonBox.accepted.connect(self.accept) +# self.buttonBox.rejected.connect(self.reject) +# self.layout.addWidget(self.buttonBox, self.layout.rowCount(), 0, 1, 2) +# self.setLayout(self.layout) +# +# def parse_form(self) -> list: +# """ +# Gets values from this Jsonedit form. +# +# Returns: +# list: List of values. +# """ +# widgets = [item for item in self.findChildren(QWidget) if item.objectName() in self.json_field.keys()] +# # logger.debug(f"Widgets: {widgets}") +# # logger.debug(type(self.json_field)) +# if isinstance(self.json_field, dict): +# output = {} +# elif isinstance(self.json_field, list): +# output = [] +# else: +# raise ValueError(f"Inappropriate data type: {type(self.json_field)}") +# for widget in widgets: +# # logger.debug(f"JsonEditScreen Widget: {widget}") +# key = widget.objectName() +# match widget: +# case QSpinBox(): +# value = widget.value() +# case QLineEdit(): +# value = widget.text() +# case DictionaryJsonSubEdit(): +# value = widget.parse_form() +# case _: +# continue +# if isinstance(self.json_field, dict): +# output[key] = value +# elif isinstance(self.json_field, list): +# if isinstance(value, list): +# output += value +# else: +# output.append(value) +# else: +# raise ValueError(f"Inappropriate data type: {type(self.json_field)}") +# return output +# +# +# class DictionaryJsonSubEdit(QWidget): +# +# def __init__(self, parent, key, dic: dict): +# super().__init__(parent) +# self.layout = QHBoxLayout() +# self.setObjectName(key) +# self.data = dic +# for key, value in self.data.items(): +# self.layout.addWidget(QLabel(key)) +# match value: +# case "int": +# self.widget = QSpinBox() +# case "str": +# self.widget = QLineEdit() +# case dict(): +# self.widget = DictionaryJsonSubEdit(parent, key=key, dic=value) +# self.widget.setObjectName(key) +# self.layout.addWidget(self.widget) +# self.setLayout(self.layout) +# +# def parse_form(self) -> dict: +# """ +# Gets values from this Jsonedit form. +# +# Returns: +# list: List of values. +# """ +# widgets = [item for item in self.findChildren(QWidget) if item.objectName() in self.data.keys()] +# # logger.debug(f"Widgets: {widgets}") +# output = {} +# for widget in widgets: +# # logger.debug(f"DictionaryJsonSubEdit Widget: {widget}") +# key = widget.objectName() +# match widget: +# case QSpinBox(): +# value = widget.value() +# case QLineEdit(): +# value = widget.text() +# case DictionaryJsonSubEdit(): +# value = widget.parse_form() +# case _: +# continue +# output[key] = value +# return output +# +# +# class LargeTextEdit(QWidget): +# +# def __init__(self, parent, key: str): +# super().__init__(parent) +# self.setParent(parent) +# self.setObjectName(key) +# self.widget = QTextEdit() +# self.layout = QVBoxLayout() +# self.layout.addWidget(self.widget) +# self.setLayout(self.layout) diff --git a/src/submissions/frontend/widgets/omni_search.py b/src/submissions/frontend/widgets/omni_search.py index 3a62b24..d556e56 100644 --- a/src/submissions/frontend/widgets/omni_search.py +++ b/src/submissions/frontend/widgets/omni_search.py @@ -72,18 +72,14 @@ class SearchBox(QDialog): self.object_type = self.original_type else: self.object_type = self.original_type.find_regular_subclass(self.sub_class.currentText()) - # logger.debug(f"Object type: {self.object_type} - {self.object_type.searchables}") - # logger.debug(f"Original type: {self.original_type} - {self.original_type.searchables}") - for item in self.object_type.searchables: - if item['field'] in [item['field'] for item in search_fields]: - logger.debug(f"Already have {item['field']}") + for item in self.object_type.get_searchables(): + if item in [thing for thing in search_fields]: continue else: search_fields.append(item) - logger.debug(f"Search fields: {search_fields}") for iii, searchable in enumerate(search_fields): - widget = FieldSearch(parent=self, label=searchable['label'], field_name=searchable['field']) - widget.setObjectName(searchable['field']) + widget = FieldSearch(parent=self, label=searchable, field_name=searchable) + widget.setObjectName(searchable) self.layout.addWidget(widget, 1 + iii, 0) widget.search_widget.textChanged.connect(self.update_data) self.update_data() @@ -172,7 +168,6 @@ class SearchResults(QTableView): self.extras = extras + [item for item in deepcopy(self.object_type.searchables)] except AttributeError: self.extras = extras - # logger.debug(f"Extras: {self.extras}") def setData(self, df: DataFrame) -> None: """ diff --git a/src/submissions/frontend/widgets/procedure_creation.py b/src/submissions/frontend/widgets/procedure_creation.py index 3ec2a32..ec017cf 100644 --- a/src/submissions/frontend/widgets/procedure_creation.py +++ b/src/submissions/frontend/widgets/procedure_creation.py @@ -163,4 +163,5 @@ class ProcedureCreation(QDialog): self.procedure.update_reagents(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) def return_sql(self, new: bool = False): - return self.procedure.to_sql(new=new) + output = self.procedure.to_sql(new=new) + return output diff --git a/src/submissions/frontend/widgets/submission_details.py b/src/submissions/frontend/widgets/submission_details.py index ebbc890..f14887f 100644 --- a/src/submissions/frontend/widgets/submission_details.py +++ b/src/submissions/frontend/widgets/submission_details.py @@ -264,7 +264,6 @@ class SubmissionComment(QDialog): """ def __init__(self, parent, submission: Run) -> None: - logger.debug(parent) super().__init__(parent) self.app = get_application_from_parent(parent) self.submission = submission @@ -284,7 +283,7 @@ class SubmissionComment(QDialog): self.layout.addWidget(self.buttonBox, alignment=Qt.AlignmentFlag.AlignBottom) self.setLayout(self.layout) - def parse_form(self) -> List[dict]: + def parse_form(self) -> dict: """ Adds comment to procedure object. """ diff --git a/src/submissions/frontend/widgets/submission_widget.py b/src/submissions/frontend/widgets/submission_widget.py index 5843526..27305c2 100644 --- a/src/submissions/frontend/widgets/submission_widget.py +++ b/src/submissions/frontend/widgets/submission_widget.py @@ -12,7 +12,7 @@ from pathlib import Path from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent from backend.validators import PydReagent, PydClientSubmission, PydSample from backend.db import ( - ClientLab, SubmissionType, Reagent, + ClientLab, SubmissionType, Reagent, ReagentLot, ReagentRole, ProcedureTypeReagentRoleAssociation, Run, ClientSubmission ) from pprint import pformat @@ -137,7 +137,7 @@ class SubmissionFormContainer(QWidget): return report @report_result - def add_reagent(self, instance: Reagent | None = None): + def add_reagent(self, instance: ReagentLot | None = None): """ Action to create new reagent in DB. @@ -149,7 +149,7 @@ class SubmissionFormContainer(QWidget): """ report = Report() if not instance: - instance = Reagent() + instance = ReagentLot() dlg = AddEdit(parent=self, instance=instance) if dlg.exec(): reagent = dlg.parse_form() diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 2e50faf..92748e1 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -1,18 +1,16 @@ -''' +""" Contains miscellaenous functions used by both frontend and backend. -''' +""" from __future__ import annotations -import builtins, importlib, time, logging, re, yaml, sys, os, stat, platform, getpass, json, numpy as np, pandas as pd -import itertools +import builtins, importlib, time, logging, re, yaml, sys, os, stat, platform, getpass, json, numpy as np, pandas as pd, \ + itertools, openpyxl +from copy import copy from collections import OrderedDict from datetime import date, datetime, timedelta from json import JSONDecodeError from threading import Thread from inspect import getmembers, isfunction, stack -from types import NoneType - from dateutil.easter import easter -from dateutil.parser import parse from jinja2 import Environment, FileSystemLoader from logging import handlers, Logger from pathlib import Path @@ -60,7 +58,6 @@ main_form_style = ''' page_size = 250 -# micro_char = uni_char = "\u03BC" def divide_chunks(input_list: list, chunk_count: int) -> Generator[Any, Any, None]: """ @@ -447,16 +444,18 @@ def jinja_template_loading() -> Environment: return env -def render_details_template(template_name:str, css_in:List[str]|str=[], js_in:List[str]|str=[], **kwargs) -> str: +def render_details_template(template_name: str, css_in: List[str] | str = [], js_in: List[str] | str = [], + **kwargs) -> str: if isinstance(css_in, str): css_in = [css_in] + env = jinja_template_loading() + html_folder = Path(env.loader.__getattribute__("searchpath")[0]) css_in = ["styles"] + css_in - css_in = [project_path.joinpath("src", "submissions", "templates", "css", f"{c}.css") for c in css_in] + css_in = [html_folder.joinpath("css", f"{c}.css") for c in css_in] if isinstance(js_in, str): js_in = [js_in] js_in = ["details"] + js_in - js_in = [project_path.joinpath("src", "submissions", "templates", "js", f"{j}.js") for j in js_in] - env = jinja_template_loading() + js_in = [html_folder.joinpath("js", f"{j}.js") for j in js_in] template = env.get_template(f"{template_name}.html") # template_path = Path(template.environment.loader.__getattribute__("searchpath")[0]) css_out = [] @@ -467,7 +466,6 @@ def render_details_template(template_name:str, css_in:List[str]|str=[], js_in:Li for js in js_in: with open(js, "r") as f: js_out.append(f.read()) - # logger.debug(f"Kwargs: {kwargs}") return template.render(css=css_out, js=js_out, **kwargs) @@ -489,10 +487,10 @@ def convert_well_to_row_column(input_str: str) -> Tuple[int, int]: return None, None return row, column + # Copy a sheet with style, format, layout, ect. from one Excel file to another Excel file # Please add the ..path\\+\\file.. and ..sheet_name.. according to your desire. -import openpyxl -from copy import copy + def copy_xl_sheet(source_sheet, target_sheet): @@ -509,8 +507,8 @@ def copy_sheet_attributes(source_sheet, target_sheet): target_sheet.page_margins = copy(source_sheet.page_margins) target_sheet.freeze_panes = copy(source_sheet.freeze_panes) - # set row dimensions - # So you cannot copy the row_dimensions attribute. Does not work (because of meta data in the attribute I think). So we copy every row's row_dimensions. That seems to work. + # NOTE: set row dimensions + # NOTE: So you cannot copy the row_dimensions attribute. Does not work (because of meta data in the attribute I think). So we copy every row's row_dimensions. That seems to work. for rn in range(len(source_sheet.row_dimensions)): target_sheet.row_dimensions[rn] = copy(source_sheet.row_dimensions[rn]) @@ -519,12 +517,15 @@ def copy_sheet_attributes(source_sheet, target_sheet): else: target_sheet.sheet_format.defaultColWidth = copy(source_sheet.sheet_format.defaultColWidth) - # set specific column width and hidden property - # we cannot copy the entire column_dimensions attribute so we copy selected attributes + # NOTE: set specific column width and hidden property + # NOTE: we cannot copy the entire column_dimensions attribute so we copy selected attributes for key, value in source_sheet.column_dimensions.items(): - target_sheet.column_dimensions[key].min = copy(source_sheet.column_dimensions[key].min) # Excel actually groups multiple columns under 1 key. Use the min max attribute to also group the columns in the targetSheet - target_sheet.column_dimensions[key].max = copy(source_sheet.column_dimensions[key].max) # https://stackoverflow.com/questions/36417278/openpyxl-can-not-read-consecutive-hidden-columns discussed the issue. Note that this is also the case for the width, not onl;y the hidden property - target_sheet.column_dimensions[key].width = copy(source_sheet.column_dimensions[key].width) # set width for every column + target_sheet.column_dimensions[key].min = copy(source_sheet.column_dimensions[ + key].min) # Excel actually groups multiple columns under 1 key. Use the min max attribute to also group the columns in the targetSheet + target_sheet.column_dimensions[key].max = copy(source_sheet.column_dimensions[ + key].max) # https://stackoverflow.com/questions/36417278/openpyxl-can-not-read-consecutive-hidden-columns discussed the issue. Note that this is also the case for the width, not onl;y the hidden property + target_sheet.column_dimensions[key].width = copy( + source_sheet.column_dimensions[key].width) # set width for every column target_sheet.column_dimensions[key].hidden = copy(source_sheet.column_dimensions[key].hidden) @@ -534,11 +535,9 @@ def copy_cells(source_sheet, target_sheet): source_cell = cell if isinstance(source_cell, openpyxl.cell.read_only.EmptyCell): continue - target_cell = target_sheet.cell(column=c+1, row=r+1) - + target_cell = target_sheet.cell(column=c + 1, row=r + 1) target_cell._value = source_cell._value target_cell.data_type = source_cell.data_type - if source_cell.has_style: target_cell.font = copy(source_cell.font) target_cell.border = copy(source_cell.border) @@ -546,15 +545,13 @@ def copy_cells(source_sheet, target_sheet): target_cell.number_format = copy(source_cell.number_format) target_cell.protection = copy(source_cell.protection) target_cell.alignment = copy(source_cell.alignment) - if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.hyperlink: target_cell._hyperlink = copy(source_cell.hyperlink) - if not isinstance(source_cell, openpyxl.cell.ReadOnlyCell) and source_cell.comment: target_cell.comment = copy(source_cell.comment) -def list_str_comparator(input_str:str, listy: List[str], mode: Literal["starts_with", "contains"]) -> bool: +def list_str_comparator(input_str: str, listy: List[str], mode: Literal["starts_with", "contains"]) -> bool: match mode: case "starts_with": if any([input_str.startswith(item) for item in listy]): @@ -567,6 +564,7 @@ def list_str_comparator(input_str:str, listy: List[str], mode: Literal["starts_w else: return False + def sort_dict_by_list(dictionary: dict, order_list: list) -> dict: output = OrderedDict() for item in order_list: @@ -602,14 +600,12 @@ def setup_lookup(func): elif v is not None: sanitized_kwargs[k] = v return func(*args, **sanitized_kwargs) - return wrapper def check_object_in_manager(manager: list, object_name: object) -> Tuple[Any, bool]: if manager is None: return None, False - # logger.debug(f"Manager: {manager}, aliases: {manager.aliases}, Key: {object_name}") if object_name in manager.aliases: return manager, True relationships = [getattr(manager.__class__, item) for item in dir(manager.__class__) @@ -617,21 +613,17 @@ def check_object_in_manager(manager: list, object_name: object) -> Tuple[Any, bo relationships = [item for item in relationships if isinstance(item.property, _RelationshipDeclared)] for relationship in relationships: if relationship.key == object_name and "association" not in relationship.key: - logger.debug(f"Checking {relationship.key}") try: rel_obj = getattr(manager, relationship.key) if rel_obj is not None: - logger.debug(f"Returning {rel_obj}") return rel_obj, False except AttributeError: pass if "association" in relationship.key: try: - logger.debug(f"Checking association {relationship.key}") rel_obj = next((getattr(item, object_name) for item in getattr(manager, relationship.key) if getattr(item, object_name) is not None), None) if rel_obj is not None: - logger.debug(f"Returning {rel_obj}") return rel_obj, False except AttributeError: pass @@ -862,7 +854,6 @@ def check_authorization(func): report.add_result( Result(owner=func.__str__(), code=1, msg=error_msg, status="warning")) return report, kwargs - return wrapper @@ -888,7 +879,6 @@ def under_development(func): Result(owner=func.__str__(), code=1, msg=error_msg, status="warning")) return report - return wrapper @@ -906,7 +896,6 @@ def report_result(func): @wraps(func) def wrapper(*args, **kwargs): - # logger.info(f"Report result being called by {func.__name__}") output = func(*args, **kwargs) match output: case Report(): @@ -931,6 +920,7 @@ def report_result(func): logger.error(f"Problem reporting due to {e}") logger.error(result.msg) if output: + logger.info(f"Report result being called by {func.__name__}") if is_list_etc(output): true_output = tuple(item for item in output if not isinstance(item, Report)) if len(true_output) == 1: @@ -943,7 +933,6 @@ def report_result(func): else: true_output = None return true_output - return wrapper @@ -962,11 +951,32 @@ def is_list_etc(object): def create_holidays_for_year(year: int | None = None) -> List[date]: - def find_nth_monday(year, month, occurence: int | None = None, day: int | None = None): - if not occurence: - occurence = 1 + """ + Gives stat holidays for the input year. + + Args: + year (int | None, optional): The input year as an integer. Defaults to None. + + Returns: + List[date] + """ + def find_nth_monday(year, month, occurrence: int | None = None, day: int | None = None) -> date: + """ + Gets the nth (eg 2nd) monday of the given month. + + Args: + year (int): The year the month occurs in. + month (int): The month of interest. + occurrence (int): The n in nth. + day (int): The day of the month to start after. + + Returns: + date + """ + if not occurrence: + occurrence = 1 if not day: - day = occurence * 7 + day = occurrence * 7 max_days = (date(2012, month + 1, 1) - date(2012, month, 1)).days if day > max_days: day = max_days @@ -977,17 +987,16 @@ def create_holidays_for_year(year: int | None = None) -> List[date]: offset = -d.weekday() # weekday == 0 means Monday output = d + timedelta(offset) return output.date() - if not year: year = date.today().year - # NOTE: Includes New Year's day for next year. + # NOTE: Static holidays. Includes New Year's day for next year. holidays = [date(year, 1, 1), date(year, 7, 1), date(year, 9, 30), date(year, 11, 11), date(year, 12, 25), date(year, 12, 26), date(year + 1, 1, 1)] # NOTE: Labour Day holidays.append(find_nth_monday(year, 9)) # NOTE: Thanksgiving - holidays.append(find_nth_monday(year, 10, occurence=2)) + holidays.append(find_nth_monday(year, 10, occurrence=2)) # NOTE: Victoria Day holidays.append(find_nth_monday(year, 5, day=25)) # NOTE: Easter, etc @@ -1007,7 +1016,6 @@ def check_dictionary_inclusion_equality(listo: List[dict] | dict, dicto: dict) - Returns: bool: True if dicto is equal to any dictionary in the list. """ - # logger.debug(f"Comparing: {listo} and {dicto}") if isinstance(dicto, list) and isinstance(listo, list): return listo == dicto elif isinstance(dicto, dict) and isinstance(listo, dict): @@ -1018,22 +1026,39 @@ def check_dictionary_inclusion_equality(listo: List[dict] | dict, dicto: dict) - raise TypeError(f"Unsupported variable: {type(listo)}") -def flatten_list(input_list: list): +def flatten_list(input_list: list) -> list: + """ + Takes nested lists and returns a single flat list. + + Args: + input_list (list): input nested list. + + Returns: + list: + """ return list(itertools.chain.from_iterable(input_list)) def sanitize_object_for_json(input_dict: dict) -> dict: + """ + Takes an object and makes sure its components can be converted to JSON + + Args: + input_dict (dict): Dictionary of interest + + Returns: + dict: + """ if not isinstance(input_dict, dict): match input_dict: case int() | float() | bool(): pass case _: try: - js = json.dumps(input_dict) + input_dict = json.dumps(input_dict) except TypeError: input_dict = str(input_dict) return input_dict - # return input_dict output = {} for key, value in input_dict.items(): match value: @@ -1041,22 +1066,37 @@ def sanitize_object_for_json(input_dict: dict) -> dict: value = [sanitize_object_for_json(object) for object in value] case dict(): value = sanitize_object_for_json(value) - case _: try: - js = json.dumps(value) + value = json.dumps(value) except TypeError: value = str(value) output[key] = value return output -def create_plate_grid(rows: int, columns: int): - matrix = np.array([[0 for yyy in range(1, columns + 1)] for xxx in range(1, rows + 1)]) - return {iii: (item[0][1]+1, item[0][0]+1) for iii, item in enumerate(np.ndenumerate(matrix), start=1)} +def create_plate_grid(rows: int, columns: int) -> dict: + """ + Makes an x by y array to represent a plate. + + Args: + rows (int): Number of rows. + columns (int): Number of columns + + Returns: + dict: cell number : (row, column) + """ + # NOTE: columns/rows + # matrix = np.array([[0 for yyy in range(1, columns + 1)] for xxx in range(1, rows + 1)]) + # NOTE: rows/columns + matrix = np.array([[0 for xxx in range(1, rows + 1)] for yyy in range(1, columns + 1)]) + return {iii: (item[0][1] + 1, item[0][0] + 1) for iii, item in enumerate(np.ndenumerate(matrix), start=1)} class classproperty(property): + """ + Allows for properties on classes as well as objects. + """ def __get__(self, owner_self, owner_cls): return self.fget(owner_cls) @@ -1396,6 +1436,16 @@ class Settings(BaseSettings, extra="allow"): @classmethod def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str: + """ + Retrieves database variables from alembic.ini file. + + Args: + alembic_path (Any): Path of the alembic.ini file. + mode (Literal['path', 'schema', 'user', 'pass']): Variable of interest. + + Returns: + Path | str + """ c = ConfigParser() c.read(alembic_path) url = c['alembic']['sqlalchemy.url'] diff --git a/submissions.spec b/submissions.spec index e1a8694..cbae940 100644 --- a/submissions.spec +++ b/submissions.spec @@ -32,8 +32,10 @@ a = Analysis( binaries=[], datas=[ ("src\\config.yml", "files"), - ("src\\submissions\\templates\\*", "files\\templates"), - ("src\\submissions\\templates\\css\\*", "files\\templates\\css"), + ("src\\submissions\\templates\\*.html", "files\\templates"), + ("src\\submissions\\templates\\css\\*.css", "files\\templates\\css"), + ("src\\submissions\\templates\\js\\*.js", "files\\templates\\js"), + ("src\\submissions\\templates\\support\\*", "files\\templates\\support"), ("docs\\build", "files\\docs"), ("src\\submissions\\resources\\*", "files\\resources"), ("alembic.ini", "files"), @@ -51,12 +53,32 @@ a = Analysis( ) pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) +#exe = EXE( +# pyz, +# a.scripts, +# a.binaries, +# a.datas, +# [], +# name=f"{__project__}_{__version__}", +# debug=True, +# bootloader_ignore_signals=False, +# strip=False, +# upx=True, +# upx_exclude=[], +# runtime_tmpdir=None, +# console=True, +# disable_windowed_traceback=False, +# argv_emulation=False, +# target_arch=None, +# codesign_identity=None, +# entitlements_file=None, +#) exe = EXE( pyz, a.scripts, [], exclude_binaries=True, - name=f"{__project__}_{__version__}", + name=f"{__project__}_{__version__}_2", debug=True, bootloader_ignore_signals=False, strip=False,