Addition of autofilling excel forms. Improved pydantic validation.

This commit is contained in:
Landon Wark
2023-07-19 14:33:15 -05:00
parent 1c804bfc6a
commit ba35696055
21 changed files with 774 additions and 961 deletions

View File

@@ -5,14 +5,12 @@ Convenience functions for interacting with the database.
from . import models
from .models.kits import reagenttypes_kittypes
from .models.submissions import reagents_submissions
# from .models.samples import WWSample
import pandas as pd
import sqlalchemy.exc
import sqlite3
import logging
from datetime import date, datetime, timedelta
from sqlalchemy import and_
import uuid
from sqlalchemy import JSON, event
from sqlalchemy.engine import Engine
import json
@@ -22,6 +20,7 @@ import yaml
from pathlib import Path
logger = logging.getLogger(f"submissions.{__name__}")
# The below _should_ allow automatic creation of foreign keys in the database
@@ -111,12 +110,12 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
# convert submission type into model name
query = info_dict['submission_type'].replace(" ", "")
# Ensure an rsl plate number exists for the plate
# if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
if not check_regex_match("^RSL", info_dict["rsl_plate_num"]):
instance = None
msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
else:
# enforce conventions on the rsl plate number from the form
info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name
# check database for existing object
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
@@ -160,10 +159,11 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
case "submitter_plate_num":
# Because of unique constraint, there will be problems with
# multiple submissions named 'None', so...
# Should be depreciated with use of pydantic validator
logger.debug(f"Submitter plate id: {info_dict[item]}")
if info_dict[item] == None or info_dict[item] == "None" or info_dict[item] == "":
logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
info_dict[item] = uuid.uuid4().hex.upper()
# if info_dict[item] == None or info_dict[item] == "None" or info_dict[item] == "":
# logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
# info_dict[item] = uuid.uuid4().hex.upper()
field_value = info_dict[item]
case _:
field_value = info_dict[item]
@@ -233,20 +233,6 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
# pass
return reagent
# def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
# """
# Query db for reagent based on lot number
# Args:
# ctx (dict): settings passed down from gui
# reagent_lot (str): lot number to query
# Returns:
# models.Reagent: looked up reagent
# """
# lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
# return lookedup
def get_all_reagenttype_names(ctx:dict) -> list[str]:
"""
Lookup all reagent types and get names
@@ -276,7 +262,7 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
logger.debug(f"Found ReagentType: {lookedup}")
return lookedup
def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
def lookup_kittype_by_use(ctx:dict, used_by:str|None=None) -> list[models.KitType]:
"""
Lookup kits by a sample type its used for
@@ -287,7 +273,10 @@ def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
Returns:
list[models.KitType]: list of kittypes that have that sample type in their uses
"""
return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
if used_by != None:
return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
else:
return ctx['database_session'].query(models.KitType).all()
def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
"""
@@ -872,19 +861,34 @@ def platemap_plate(submission:models.BasicSubmission) -> list:
# image = make_plate_map(plate_dicto)
return plate_dicto
def lookup_reagent(ctx:dict, reagent_lot:str|None=None, type_name:str|None=None) -> models.Reagent:
def lookup_reagent(ctx:dict, reagent_lot:str, type_name:str|None=None) -> models.Reagent:
"""
Query db for reagent based on lot number
Query db for reagent based on lot number, with optional reagent type to enforce
Args:
ctx (dict): settings passed down from gui
reagent_lot (str): lot number to query
type_name (str | None, optional): name of reagent type. Defaults to None.
Returns:
models.Reagent: looked up reagent
"""
if reagent_lot != None and type_name != None:
return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).all()
return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).first()
elif type_name == None:
return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
def lookup_last_used_reagenttype_lot(ctx:dict, type_name:str) -> models.Reagent:
"""
Look up the last used reagent of the reagent type
Args:
ctx (dict): Settings passed down from gui
type_name (str): Name of reagent type
Returns:
models.Reagent: Reagent object with last used lot.
"""
rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first()
logger.debug(f"Reagent type looked up for {type_name}: {rt.__str__()}")
return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name)

View File

@@ -55,11 +55,8 @@ class ReagentType(Base):
instances = relationship("Reagent", back_populates="type") #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit
# __table_args__ = (
# CheckConstraint(required >= 0, name='check_required_positive'),
# CheckConstraint(required < 2, name='check_required_less_2'),
# {})
last_used = Column(String(32)) #: last used lot number of this type of reagent
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
@@ -125,6 +122,13 @@ class Reagent(Base):
"expiry": place_holder.strftime("%Y-%m-%d")
}
def to_reagent_dict(self) -> dict:
return {
"type": self.type.name,
"lot": self.lot,
"expiry": self.expiry.strftime("%Y-%m-%d")
}
class Discount(Base):
"""

View File

@@ -6,6 +6,7 @@ from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, FLOAT, BO
from sqlalchemy.orm import relationship
import logging
logger = logging.getLogger(f"submissions.{__name__}")
@@ -22,7 +23,7 @@ class WWSample(Base):
rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date submission received
well_number = Column(String(8)) #: location on 24 well plate
well_number = Column(String(8)) #: location on 96 well plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
testing_type = Column(String(64))
@@ -36,7 +37,7 @@ class WWSample(Base):
ww_seq_run_id = Column(String(64))
sample_type = Column(String(8))
pcr_results = Column(JSON)
elution_well = Column(String(8)) #: location on 96 well plate
well_24 = Column(String(8)) #: location on 24 well plate
artic_rsl_plate = relationship("WastewaterArtic", back_populates="samples")
artic_well_number = Column(String(8))
@@ -57,10 +58,6 @@ class WWSample(Base):
Returns:
dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below
"""
# well_col = self.well_number[1:]
# well_row = self.well_number[0]
# if well_col > 4:
# well
if self.ct_n1 != None and self.ct_n2 != None:
# logger.debug(f"Using well info in name.")
name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})"
@@ -87,8 +84,8 @@ class WWSample(Base):
except TypeError as e:
logger.error(f"Couldn't check positives for {self.rsl_number}. Looks like there isn't PCR data.")
return None
well_row = row_dict[self.elution_well[0]]
well_col = self.elution_well[1:]
well_row = row_dict[self.well_number[0]]
well_col = self.well_number[1:]
# if positive:
# try:
# # The first character of the elution well is the row

View File

@@ -5,7 +5,6 @@ import math
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT
from sqlalchemy.orm import relationship
from datetime import datetime as dt
import logging
import json
from json.decoder import JSONDecodeError
@@ -164,7 +163,8 @@ class BasicSubmission(Base):
def calculate_base_cost(self):
try:
cols_count_96 = ceil(int(self.sample_count) / 8)
# cols_count_96 = ceil(int(self.sample_count) / 8)
cols_count_96 = self.calculate_column_count()
except Exception as e:
logger.error(f"Column count error: {e}")
# cols_count_24 = ceil(int(self.sample_count) / 3)
@@ -173,6 +173,11 @@ class BasicSubmission(Base):
except Exception as e:
logger.error(f"Calculation error: {e}")
def calculate_column_count(self):
columns = [int(sample.well_number[-2:]) for sample in self.samples]
logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}")
return max(columns)
# Below are the custom submission types
class BacterialCulture(BasicSubmission):