Prior to merging omni_search.py and sample_search.py
This commit is contained in:
@@ -3,6 +3,8 @@ Contains all models for sqlalchemy
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import sys, logging
|
||||
|
||||
import pandas as pd
|
||||
from sqlalchemy import Column, INTEGER, String, JSON
|
||||
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
|
||||
from sqlalchemy.ext.declarative import declared_attr
|
||||
@@ -97,6 +99,26 @@ class BaseClass(Base):
|
||||
singles = list(set(cls.singles + BaseClass.singles))
|
||||
return dict(singles=singles)
|
||||
|
||||
@classmethod
|
||||
def fuzzy_search(cls, **kwargs):
|
||||
query: Query = cls.__database_session__.query(cls)
|
||||
# logger.debug(f"Queried model. Now running searches in {kwargs}")
|
||||
for k, v in kwargs.items():
|
||||
# logger.debug(f"Running fuzzy search for attribute: {k} with value {v}")
|
||||
search = f"%{v}%"
|
||||
try:
|
||||
attr = getattr(cls, k)
|
||||
# NOTE: the secret sauce is in attr.like
|
||||
query = query.filter(attr.like(search))
|
||||
except (ArgumentError, AttributeError) as e:
|
||||
logger.error(f"Attribute {k} unavailable due to:\n\t{e}\nSkipping.")
|
||||
return query.limit(50).all()
|
||||
|
||||
@classmethod
|
||||
def results_to_df(cls, objects: list, **kwargs):
|
||||
records = [object.to_sub_dict(**kwargs) for object in objects]
|
||||
return pd.DataFrame.from_records(records)
|
||||
|
||||
@classmethod
|
||||
def query(cls, **kwargs) -> Any | List[Any]:
|
||||
"""
|
||||
|
||||
@@ -7,8 +7,8 @@ from pprint import pformat
|
||||
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
|
||||
from sqlalchemy.orm import relationship, validates, Query
|
||||
from sqlalchemy.ext.associationproxy import association_proxy
|
||||
from datetime import date
|
||||
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator
|
||||
from datetime import date, datetime
|
||||
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, yaml_regex_creator, timezone
|
||||
from typing import List, Literal, Generator, Any
|
||||
from pandas import ExcelFile
|
||||
from pathlib import Path
|
||||
@@ -355,7 +355,7 @@ class ReagentRole(BaseClass):
|
||||
match reagent:
|
||||
case str():
|
||||
# logger.debug(f"Lookup ReagentType by reagent str {reagent}")
|
||||
reagent = Reagent.query(lot_number=reagent)
|
||||
reagent = Reagent.query(lot=reagent)
|
||||
case _:
|
||||
pass
|
||||
assert reagent.role
|
||||
@@ -405,6 +405,8 @@ class Reagent(BaseClass):
|
||||
Concrete reagent instance
|
||||
"""
|
||||
|
||||
searchables = ["lot"]
|
||||
|
||||
id = Column(INTEGER, primary_key=True) #: primary key
|
||||
role = relationship("ReagentRole", back_populates="instances",
|
||||
secondary=reagentroles_reagents) #: joined parent reagent type
|
||||
@@ -430,7 +432,7 @@ class Reagent(BaseClass):
|
||||
else:
|
||||
return f"<Reagent({self.role.name}-{self.lot})>"
|
||||
|
||||
def to_sub_dict(self, extraction_kit: KitType = None, full_data: bool = False) -> dict:
|
||||
def to_sub_dict(self, extraction_kit: KitType = None, full_data: bool = False, **kwargs) -> dict:
|
||||
"""
|
||||
dictionary containing values necessary for gui
|
||||
|
||||
@@ -441,6 +443,7 @@ class Reagent(BaseClass):
|
||||
Returns:
|
||||
dict: representation of the reagent's attributes
|
||||
"""
|
||||
|
||||
if extraction_kit is not None:
|
||||
# NOTE: Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
|
||||
try:
|
||||
@@ -449,7 +452,10 @@ class Reagent(BaseClass):
|
||||
except:
|
||||
reagent_role = self.role[0]
|
||||
else:
|
||||
reagent_role = self.role[0]
|
||||
try:
|
||||
reagent_role = self.role[0]
|
||||
except IndexError:
|
||||
reagent_role = None
|
||||
try:
|
||||
rtype = reagent_role.name.replace("_", " ")
|
||||
except AttributeError:
|
||||
@@ -475,7 +481,8 @@ class Reagent(BaseClass):
|
||||
)
|
||||
if full_data:
|
||||
output['submissions'] = [sub.rsl_plate_num for sub in self.submissions]
|
||||
output['excluded'] = ['missing', 'submissions', 'excluded']
|
||||
output['excluded'] = ['missing', 'submissions', 'excluded', 'editable']
|
||||
output['editable'] = ['lot', 'expiry']
|
||||
return output
|
||||
|
||||
def update_last_used(self, kit: KitType) -> Report:
|
||||
@@ -508,8 +515,8 @@ class Reagent(BaseClass):
|
||||
@setup_lookup
|
||||
def query(cls,
|
||||
id: int | None = None,
|
||||
reagent_role: str | ReagentRole | None = None,
|
||||
lot_number: str | None = None,
|
||||
role: str | ReagentRole | None = None,
|
||||
lot: str | None = None,
|
||||
name: str | None = None,
|
||||
limit: int = 0
|
||||
) -> Reagent | List[Reagent]:
|
||||
@@ -533,13 +540,13 @@ class Reagent(BaseClass):
|
||||
limit = 1
|
||||
case _:
|
||||
pass
|
||||
match reagent_role:
|
||||
match role:
|
||||
case str():
|
||||
# logger.debug(f"Looking up reagents by reagent type str: {reagent_type}")
|
||||
query = query.join(cls.role).filter(ReagentRole.name == reagent_role)
|
||||
query = query.join(cls.role).filter(ReagentRole.name == role)
|
||||
case ReagentRole():
|
||||
# logger.debug(f"Looking up reagents by reagent type ReagentType: {reagent_type}")
|
||||
query = query.filter(cls.role.contains(reagent_role))
|
||||
query = query.filter(cls.role.contains(role))
|
||||
case _:
|
||||
pass
|
||||
match name:
|
||||
@@ -549,16 +556,43 @@ class Reagent(BaseClass):
|
||||
query = query.filter(cls.name == name)
|
||||
case _:
|
||||
pass
|
||||
match lot_number:
|
||||
match lot:
|
||||
case str():
|
||||
# logger.debug(f"Looking up reagent by lot number str: {lot_number}")
|
||||
query = query.filter(cls.lot == lot_number)
|
||||
# logger.debug(f"Looking up reagent by lot number str: {lot}")
|
||||
query = query.filter(cls.lot == lot)
|
||||
# NOTE: In this case limit number returned.
|
||||
limit = 1
|
||||
case _:
|
||||
pass
|
||||
return cls.execute_query(query=query, limit=limit)
|
||||
|
||||
@check_authorization
|
||||
def edit_from_search(self, **kwargs):
|
||||
from frontend.widgets.misc import AddReagentForm
|
||||
role = ReagentRole.query(kwargs['role'])
|
||||
if role:
|
||||
role_name = role.name
|
||||
else:
|
||||
role_name = None
|
||||
dlg = AddReagentForm(reagent_lot=self.lot, reagent_role=role_name, expiry=self.expiry, reagent_name=self.name)
|
||||
if dlg.exec():
|
||||
vars = dlg.parse_form()
|
||||
for key, value in vars.items():
|
||||
match key:
|
||||
case "expiry":
|
||||
if not isinstance(value, date):
|
||||
field_value = datetime.strptime(value, "%Y-%m-%d").date
|
||||
field_value.replace(tzinfo=timezone)
|
||||
else:
|
||||
field_value = value
|
||||
case "role":
|
||||
continue
|
||||
case _:
|
||||
field_value = value
|
||||
logger.debug(f"Setting reagent {key} to {field_value}")
|
||||
self.__setattr__(key, field_value)
|
||||
self.save()
|
||||
|
||||
|
||||
class Discount(BaseClass):
|
||||
"""
|
||||
@@ -1278,7 +1312,7 @@ class SubmissionReagentAssociation(BaseClass):
|
||||
case Reagent() | str():
|
||||
# logger.debug(f"Lookup SubmissionReagentAssociation by reagent Reagent {reagent}")
|
||||
if isinstance(reagent, str):
|
||||
reagent = Reagent.query(lot_number=reagent)
|
||||
reagent = Reagent.query(lot=reagent)
|
||||
query = query.filter(cls.reagent == reagent)
|
||||
case _:
|
||||
pass
|
||||
|
||||
@@ -844,7 +844,8 @@ class BasicSubmission(BaseClass):
|
||||
ws.cell(row=item['row'], column=item['column'], value=item['value'])
|
||||
return input_excel
|
||||
|
||||
def custom_sample_writer(self, sample:dict) -> dict:
|
||||
@classmethod
|
||||
def custom_sample_writer(self, sample: dict) -> dict:
|
||||
return sample
|
||||
|
||||
@classmethod
|
||||
@@ -2091,7 +2092,7 @@ class WastewaterArtic(BasicSubmission):
|
||||
return input_excel
|
||||
|
||||
@classmethod
|
||||
def custom_sample_writer(self, sample:dict) -> dict:
|
||||
def custom_sample_writer(self, sample: dict) -> dict:
|
||||
logger.debug("Wastewater Artic custom sample writer")
|
||||
if sample['source_plate_number'] in [0, "0"]:
|
||||
sample['source_plate_number'] = "control"
|
||||
|
||||
@@ -48,7 +48,7 @@ class PydReagent(BaseModel):
|
||||
def rescue_type_with_lookup(cls, value, values):
|
||||
if value is None and values.data['lot'] is not None:
|
||||
try:
|
||||
return Reagent.query(lot_number=values.data['lot'].name)
|
||||
return Reagent.query(lot=values.data['lot'].name)
|
||||
except AttributeError:
|
||||
return value
|
||||
return value
|
||||
@@ -127,7 +127,7 @@ class PydReagent(BaseModel):
|
||||
if self.model_extra is not None:
|
||||
self.__dict__.update(self.model_extra)
|
||||
# logger.debug(f"Reagent SQL constructor is looking up type: {self.type}, lot: {self.lot}")
|
||||
reagent = Reagent.query(lot_number=self.lot, name=self.name)
|
||||
reagent = Reagent.query(lot=self.lot, name=self.name)
|
||||
# logger.debug(f"Result: {reagent}")
|
||||
if reagent is None:
|
||||
reagent = Reagent()
|
||||
|
||||
Reference in New Issue
Block a user