Mid clean-up.

This commit is contained in:
lwark
2025-01-10 13:49:24 -06:00
parent d93da3c90c
commit 5cded949ed
12 changed files with 178 additions and 90 deletions

View File

@@ -4,6 +4,7 @@ Contains all models for sqlalchemy
from __future__ import annotations
import sys, logging
from pandas import DataFrame
from pydantic import BaseModel
from sqlalchemy import Column, INTEGER, String, JSON
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
from sqlalchemy.ext.declarative import declared_attr
@@ -17,6 +18,7 @@ from tools import report_result
if 'pytest' in sys.modules:
sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__())
# NOTE: For inheriting in LogMixin
Base: DeclarativeMeta = declarative_base()
logger = logging.getLogger(f"submissions.{__name__}")
@@ -31,6 +33,7 @@ class LogMixin(Base):
if len(name) > 64:
name = name.replace("<", "").replace(">", "")
if len(name) > 64:
# NOTE: As if re'agent'
name = name.replace("agent", "")
if len(name) > 64:
name = f"...{name[-61:]}"
@@ -116,7 +119,7 @@ class BaseClass(Base):
return dict(singles=singles)
@classmethod
def find_regular_subclass(cls, name: str | None = None) -> Any:
def find_regular_subclass(cls, name: str = "") -> Any:
"""
Args:
@@ -126,8 +129,9 @@ class BaseClass(Base):
Any: Subclass of this object
"""
if not name:
return cls
# if not name:
# logger.warning("You need to include a name of what you're looking for.")
# return cls
if " " in name:
search = name.title().replace(" ", "")
else:
@@ -171,7 +175,7 @@ class BaseClass(Base):
try:
records = [obj.to_sub_dict(**kwargs) for obj in objects]
except AttributeError:
records = [obj.to_dict() for obj in objects]
records = [obj.to_omnigui_dict() for obj in objects]
return DataFrame.from_records(records)
@classmethod
@@ -190,7 +194,7 @@ class BaseClass(Base):
Execute sqlalchemy query with relevant defaults.
Args:
model (Any, optional): model to be queried. Defaults to None
model (Any, optional): model to be queried, allows for plugging in. Defaults to None
query (Query, optional): input query object. Defaults to None
limit (int): Maximum number of results. (0 = all). Defaults to 0
@@ -237,13 +241,28 @@ class BaseClass(Base):
report.add_result(Result(msg=e, status="Critical"))
return report
def to_dict(self):
def to_omnigui_dict(self) -> dict:
"""
For getting any object in an omni-thing friendly output.
Returns:
dict: Dictionary of object minus _sa_instance_state with id at the front.
"""
dicto = {k: v for k, v in self.__dict__.items() if k not in ["_sa_instance_state"]}
dicto = {'id': dicto.pop('id'), **dicto}
try:
dicto = {'id': dicto.pop('id'), **dicto}
except KeyError:
pass
return dicto
@classmethod
def get_pydantic_model(cls):
def get_pydantic_model(cls) -> BaseModel:
"""
Gets the pydantic model corresponding to this object.
Returns:
Pydantic model with name "Pyd{cls.__name__}"
"""
from backend.validators import pydant
try:
model = getattr(pydant, f"Pyd{cls.__name__}")
@@ -252,7 +271,13 @@ class BaseClass(Base):
return model
@classproperty
def add_edit_tooltips(self):
def add_edit_tooltips(self) -> dict:
"""
Gets tooltips for Omni-add-edit
Returns:
dict: custom dictionary for this class.
"""
return dict()
@@ -270,7 +295,7 @@ class ConfigItem(BaseClass):
@classmethod
def get_config_items(cls, *args) -> ConfigItem | List[ConfigItem]:
"""
Get desired config items from database
Get desired config items, or all from database
Returns:
ConfigItem|List[ConfigItem]: Config item(s)
@@ -283,6 +308,7 @@ class ConfigItem(BaseClass):
case 1:
config_items = query.filter(cls.key == args[0]).first()
case _:
# NOTE: All items whose key field is in args.
config_items = query.filter(cls.key.in_(args)).all()
return config_items

View File

@@ -24,7 +24,7 @@ class AuditLog(Base):
changes = Column(JSON)
def __repr__(self):
return f"<{self.user} @ {self.time}>"
return f"<{self.object}: {self.user} @ {self.time}>"
@classmethod
def query(cls, start_date: date | str | int | None = None, end_date: date | str | int | None = None) -> List["AuditLog"]:

View File

@@ -255,6 +255,7 @@ class Control(BaseClass):
f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}, falling back to BasicSubmission")
case _:
pass
# NOTE: if attrs passed in and this cls doesn't have all attributes in attr
if attrs and any([not hasattr(cls, attr) for attr in attrs.keys()]):
# NOTE: looks for first model that has all included kwargs
try:
@@ -272,6 +273,9 @@ class Control(BaseClass):
Args:
parent (QWidget): chart holding widget to add buttons to.
Returns:
None: Child methods will return things.
"""
return None
@@ -284,7 +288,7 @@ class Control(BaseClass):
chart_settings (dict): settings passed down from chart widget
ctx (Settings): settings passed down from gui
"""
return None
return Report(), None
def delete(self):
self.__database_session__.delete(self)
@@ -315,8 +319,14 @@ class PCRControl(Control):
Returns:
dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date
"""
return dict(name=self.name, ct=self.ct, subtype=self.subtype, target=self.target, reagent_lot=self.reagent_lot,
submitted_date=self.submitted_date.date())
return dict(
name=self.name,
ct=self.ct,
subtype=self.subtype,
target=self.target,
reagent_lot=self.reagent_lot,
submitted_date=self.submitted_date.date()
)
@classmethod
@report_result
@@ -403,9 +413,9 @@ class IridaControl(Control):
kraken = self.kraken
except TypeError:
kraken = {}
kraken_cnt_total = sum([kraken[item]['kraken_count'] for item in kraken])
kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()])
new_kraken = [dict(name=item, kraken_count=kraken[item]['kraken_count'],
kraken_percent="{0:.0%}".format(kraken[item]['kraken_count'] / kraken_cnt_total),
kraken_percent=f"{kraken[item]['kraken_count'] / kraken_cnt_total:0.2%}",
target=item in self.controltype.targets)
for item in kraken]
new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)
@@ -479,6 +489,7 @@ class IridaControl(Control):
@classmethod
def make_parent_buttons(cls, parent: QWidget) -> None:
"""
Creates buttons for controlling
Args:
parent (QWidget): chart holding widget to add buttons to.
@@ -486,6 +497,7 @@ class IridaControl(Control):
"""
super().make_parent_buttons(parent=parent)
rows = parent.layout.rowCount() - 2
# NOTE: check box for consolidating off-target items
checker = QCheckBox(parent)
checker.setChecked(True)
checker.setObjectName("irida_check")
@@ -703,6 +715,12 @@ class IridaControl(Control):
df = df[df.name not in exclude]
return df
def to_pydantic(self):
def to_pydantic(self) -> "PydIridaControl":
"""
Constructs a pydantic version of this object.
Returns:
PydIridaControl: This object as a pydantic model.
"""
from backend.validators import PydIridaControl
return PydIridaControl(**self.__dict__)

View File

@@ -129,8 +129,10 @@ class KitType(BaseClass):
"""
return f"<KitType({self.name})>"
def get_reagents(self, required: bool = False, submission_type: str | SubmissionType | None = None) -> Generator[
ReagentRole, None, None]:
def get_reagents(self,
required: bool = False,
submission_type: str | SubmissionType | None = None
) -> Generator[ReagentRole, None, None]:
"""
Return ReagentTypes linked to kit through KitTypeReagentTypeAssociation.
@@ -192,13 +194,13 @@ class KitType(BaseClass):
Lookup a list of or single KitType.
Args:
name (str, optional): Name of desired kit (returns single instance). Defaults to None.
used_for (str | Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
id (int | None, optional): Kit id in the database. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
name (str, optional): Name of desired kit (returns single instance). Defaults to None.
used_for (str | Submissiontype | None, optional): Submission type the kit is used for. Defaults to None.
id (int | None, optional): Kit id in the database. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
KitType|List[KitType]: KitType(s) of interest.
KitType|List[KitType]: KitType(s) of interest.
"""
query: Query = cls.__database_session__.query(cls)
match used_for:
@@ -240,23 +242,23 @@ class KitType(BaseClass):
dict: Dictionary containing relevant info for SubmissionType construction
"""
base_dict = dict(name=self.name, reagent_roles=[], equipment_roles=[])
for k, v in self.construct_xl_map_for_use(submission_type=submission_type):
for key, value in self.construct_xl_map_for_use(submission_type=submission_type):
try:
assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == k)
assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == key)
except StopIteration as e:
continue
for kk, vv in assoc.to_export_dict().items():
v[kk] = vv
base_dict['reagent_roles'].append(v)
for k, v in submission_type.construct_field_map("equipment"):
value[kk] = vv
base_dict['reagent_roles'].append(value)
for key, value in submission_type.construct_field_map("equipment"):
try:
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k)
item.equipment_role.name == key)
except StopIteration:
continue
for kk, vv in assoc.to_export_dict(extraction_kit=self).items():
v[kk] = vv
base_dict['equipment_roles'].append(v)
value[kk] = vv
base_dict['equipment_roles'].append(value)
return base_dict
@classmethod
@@ -402,6 +404,7 @@ class ReagentRole(BaseClass):
case _:
pass
assert reagent.role
# NOTE: Get all roles common to the reagent and the kit.
result = set(kit_type.reagent_roles).intersection(reagent.role)
return next((item for item in result), None)
match name:
@@ -500,7 +503,7 @@ class Reagent(BaseClass, LogMixin):
except (TypeError, AttributeError) as e:
place_holder = date.today()
logger.error(f"We got a type error setting {self.lot} expiry: {e}. setting to today for testing")
# NOTE: The notation for not having an expiry is 1970.1.1
# NOTE: The notation for not having an expiry is 1970.01.01
if self.expiry.year == 1970:
place_holder = "NA"
else:
@@ -555,7 +558,7 @@ class Reagent(BaseClass, LogMixin):
instance = PydReagent(**kwargs)
new = True
instance, _ = instance.toSQL()
logger.debug(f"Instance: {instance}")
logger.info(f"Instance from query or create: {instance}")
return instance, new
@classmethod
@@ -609,33 +612,70 @@ class Reagent(BaseClass, LogMixin):
pass
return cls.execute_query(query=query, limit=limit)
def set_attribute(self, key, value):
match key:
case "lot":
value = value.upper()
case "role":
match value:
case ReagentRole():
role = value
case str():
role = ReagentRole.query(name=value, limit=1)
case _:
return
if role and role not in self.role:
self.role.append(role)
return
case "comment":
return
case "expiry":
if isinstance(value, str):
value = date(year=1970, month=1, day=1)
# NOTE: if min time is used, any reagent set to expire today (Bac postive control, eg) will have expired at midnight and therefore be flagged.
# NOTE: Make expiry at date given, plus maximum time = end of day
value = datetime.combine(value, datetime.max.time())
value = value.replace(tzinfo=timezone)
case _:
pass
logger.debug(f"Role to be set to: {value}")
try:
self.__setattr__(key, value)
except AttributeError as e:
logger.error(f"Could not set {key} due to {e}")
@check_authorization
def edit_from_search(self, obj, **kwargs):
from frontend.widgets.misc import AddReagentForm
from frontend.widgets.omni_add_edit import AddEdit
role = ReagentRole.query(kwargs['role'])
if role:
role_name = role.name
else:
role_name = None
dlg = AddReagentForm(reagent_lot=self.lot, reagent_role=role_name, expiry=self.expiry, reagent_name=self.name)
# dlg = AddReagentForm(reagent_lot=self.lot, reagent_role=role_name, expiry=self.expiry, reagent_name=self.name)
dlg = AddEdit(parent=None, instance=self)
if dlg.exec():
vars = dlg.parse_form()
for key, value in vars.items():
match key:
case "expiry":
if isinstance(value, str):
field_value = datetime.strptime(value, "%Y-%m-%d")
elif isinstance(value, date):
field_value = datetime.combine(value, datetime.max.time())
else:
field_value = value
field_value.replace(tzinfo=timezone)
case "role":
continue
case _:
field_value = value
self.__setattr__(key, field_value)
pyd = dlg.parse_form()
for field in pyd.model_fields:
self.set_attribute(field, pyd.__getattribute__(field))
# for key, value in vars.items():
# match key:
# case "expiry":
# if isinstance(value, str):
# field_value = datetime.strptime(value, "%Y-%m-%d")
# elif isinstance(value, date):
# field_value = datetime.combine(value, datetime.max.time())
# else:
# field_value = value
# field_value.replace(tzinfo=timezone)
# case "role":
# continue
# case _:
# field_value = value
# self.__setattr__(key, field_value)
self.save()
# print(self.__dict__)
@classproperty
def add_edit_tooltips(self):
@@ -767,7 +807,7 @@ class SubmissionType(BaseClass):
Grabs the default excel template file.
Returns:
bytes: The excel sheet.
bytes: The Excel sheet.
"""
submission_type = cls.query(name="Bacterial Culture")
return submission_type.template_file
@@ -787,13 +827,13 @@ class SubmissionType(BaseClass):
def set_template_file(self, filepath: Path | str):
"""
Sets the binary store to an excel file.
Sets the binary store to an Excel file.
Args:
filepath (Path | str): Path to the template file.
Raises:
ValueError: Raised if file is not excel file.
ValueError: Raised if file is not Excel file.
"""
if isinstance(filepath, str):
filepath = Path(filepath)

View File

@@ -375,7 +375,10 @@ class BasicSubmission(BaseClass, LogMixin):
output["contact_phone"] = contact_phone
output["custom"] = custom
output["controls"] = controls
output["completed_date"] = self.completed_date
try:
output["completed_date"] = self.completed_date.strftime("%Y-%m-%d")
except AttributeError:
output["completed_date"] = self.completed_date
return output
def calculate_column_count(self) -> int: