mid refactor for improved rebustness and readability

This commit is contained in:
Landon Wark
2023-03-15 15:38:02 -05:00
parent fc334155ff
commit c645d3a9cf
15 changed files with 337 additions and 468 deletions

View File

@@ -21,19 +21,13 @@ from pathlib import Path
logger = logging.getLogger(f"submissions.{__name__}")
# The below should allow automatic creation of foreign keys in the database
# The below _should_ allow automatic creation of foreign keys in the database
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def get_kits_by_use( ctx:dict, kittype_str:str|None) -> list:
pass
# ctx dict should contain the database session
def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict:
"""
Upserts submissions into database
@@ -73,21 +67,22 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d
def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
"""
_summary_
Inserts a reagent into the database.
Args:
ctx (dict): settings passed down from gui
reagent (models.Reagent): Reagent object to be added to db
Returns:
None|dict: obejct indicating issue to be reported in the gui
None|dict: object indicating issue to be reported in the gui
"""
logger.debug(reagent.__dict__)
logger.debug(f"Reagent dictionary: {reagent.__dict__}")
ctx['database_session'].add(reagent)
try:
ctx['database_session'].commit()
except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError):
return {"message":"The database is locked for editing."}
return None
def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
@@ -103,12 +98,12 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
"""
# convert submission type into model name
query = info_dict['submission_type'].replace(" ", "")
# check database for existing object
# Ensure an rsl plate number exists for the plate
if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
code = 2
instance = None
msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
# check database for existing object
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
# get model based on submission type converted above
logger.debug(f"Looking at models for submission type: {query}")
@@ -142,7 +137,8 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
field_value = lookup_org_by_name(ctx=ctx, name=q_str)
logger.debug(f"Got {field_value} for organization {q_str}")
case "submitter_plate_num":
# Because of unique constraint, the submitter plate number cannot be None, so...
# Because of unique constraint, there will be problems with
# multiple submissions named 'None', so...
logger.debug(f"Submitter plate id: {info_dict[item]}")
if info_dict[item] == None or info_dict[item] == "None":
logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
@@ -156,7 +152,8 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
except AttributeError:
logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
continue
# calculate cost of the run: immutable cost + mutable times number of columns
# calculate cost of the run: immutable cost + mutable times number of columns
# This is now attached to submission upon creation to preserve at-run costs incase of cost increase in the future.
try:
instance.run_cost = instance.extraction_kit.immutable_cost + (instance.extraction_kit.mutable_cost * ((instance.sample_count / 8)/12))
except (TypeError, AttributeError):
@@ -167,7 +164,7 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
logger.debug(f"Constructed instance: {instance.to_string()}")
except AttributeError as e:
logger.debug(f"Something went wrong constructing instance {info_dict['rsl_plate_num']}: {e}")
logger.debug(msg)
logger.debug(f"Constructed submissions message: {msg}")
return instance, {'code':code, 'message':msg}
@@ -194,7 +191,7 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
case "type":
reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower())
# add end-of-life extension from reagent type to expiry date
# Edit: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
# NOTE: this will now be done only in the reporting phase to account for potential changes in end-of-life extensions
# try:
# reagent.expiry = reagent.expiry + reagent.type.eol_ext
# except TypeError as e:
@@ -204,7 +201,6 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
return reagent
def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
"""
Query db for reagent based on lot number
@@ -219,6 +215,7 @@ def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
return lookedup
def get_all_reagenttype_names(ctx:dict) -> list[str]:
"""
Lookup all reagent types and get names
@@ -232,6 +229,7 @@ def get_all_reagenttype_names(ctx:dict) -> list[str]:
lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
return lookedup
def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
"""
Lookup a single reagent type by name
@@ -251,7 +249,7 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
"""
Lookup a kit by an sample type its used for
Lookup kits by a sample type its used for
Args:
ctx (dict): settings passed from gui
@@ -262,6 +260,7 @@ def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
"""
return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
"""
Lookup a kit type by name
@@ -288,7 +287,6 @@ def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
Returns:
list[models.Reagent]: list of retrieved reagents
"""
# return [item for item in ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()]
return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
@@ -308,8 +306,7 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st
# Hang on, this is going to be a long one.
# by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name)).all()
rt_types = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name.endswith(type_name))
# add filter for kit name... which I can not get to work.
# add_in = by_type.join(models.ReagentType.kits).filter(models.KitType.name==kit_name)
# add filter for kit name...
try:
check = not np.isnan(kit_name)
except TypeError:
@@ -317,12 +314,10 @@ def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:st
if check:
kit_type = lookup_kittype_by_name(ctx=ctx, name=kit_name)
logger.debug(f"reagenttypes: {[item.name for item in rt_types.all()]}, kit: {kit_type.name}")
# add in lookup for related kit_id
rt_types = rt_types.join(reagenttypes_kittypes).filter(reagenttypes_kittypes.c.kits_id==kit_type.id).first()
# for item in by_type:
# logger.debug([thing.name for thing in item.type.kits])
# output = [item for item in by_type if kit_name in [thing.name for thing in item.type.kits]]
# else:
else:
rt_types = rt_types.first()
output = rt_types.instances
return output
@@ -336,7 +331,7 @@ def lookup_all_submissions_by_type(ctx:dict, sub_type:str|None=None) -> list[mod
type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None.
Returns:
_type_: list of retrieved submissions
list[models.BasicSubmission]: list of retrieved submissions
"""
if sub_type == None:
subs = ctx['database_session'].query(models.BasicSubmission).all()
@@ -358,7 +353,7 @@ def lookup_all_orgs(ctx:dict) -> list[models.Organization]:
def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
"""
Lookup organization (lab) by name.
Lookup organization (lab) by (startswith) name.
Args:
ctx (dict): settings passed from gui
@@ -368,7 +363,6 @@ def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
models.Organization: retrieved organization
"""
logger.debug(f"Querying organization: {name}")
# return ctx['database_session'].query(models.Organization).filter(models.Organization.name==name).first()
return ctx['database_session'].query(models.Organization).filter(models.Organization.name.startswith(name)).first()
def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
@@ -383,10 +377,11 @@ def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
pd.DataFrame: dataframe constructed from retrieved submissions
"""
logger.debug(f"Type: {sub_type}")
# pass to lookup function
# use lookup function to create list of dicts
subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, sub_type=sub_type)]
# make df from dicts (records) in list
df = pd.DataFrame.from_records(subs)
# logger.debug(f"Pre: {df['Technician']}")
# Exclude sub information
try:
df = df.drop("controls", axis=1)
except:
@@ -395,7 +390,6 @@ def submissions_to_df(ctx:dict, sub_type:str|None=None) -> pd.DataFrame:
df = df.drop("ext_info", axis=1)
except:
logger.warning(f"Couldn't drop 'controls' column from submissionsheet df.")
# logger.debug(f"Post: {df['Technician']}")
return df
@@ -413,13 +407,9 @@ def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
def create_submission_details(ctx:dict, sub_id:int) -> dict:
pass
def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
"""
Lookup submissions by range of submitted dates
Lookup submissions greater than start_date and less than end_date
Args:
ctx (dict): settings passed from gui
@@ -429,18 +419,21 @@ def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_dat
Returns:
list[models.BasicSubmission]: list of retrieved submissions
"""
return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
# return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.submitted_date.between(start_date, end_date)).all()
def get_all_Control_Types_names(ctx:dict) -> list[models.ControlType]:
def get_all_Control_Types_names(ctx:dict) -> list[str]:
"""
Grabs all control type names from db.
Args:
settings (dict): settings passed down from click. Defaults to {}.
settings (dict): settings passed down from gui.
Returns:
list: names list
list: list of controltype names
"""
conTypes = ctx['database_session'].query(models.ControlType).all()
conTypes = [conType.name for conType in conTypes]
@@ -451,6 +444,7 @@ def get_all_Control_Types_names(ctx:dict) -> list[models.ControlType]:
def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
"""
Create and store a new kit in the database based on a .yml file
TODO: split into create and store functions
Args:
ctx (dict): Context dictionary passed down from frontend
@@ -459,18 +453,20 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
Returns:
dict: a dictionary containing results of db addition
"""
# try:
# power_users = ctx['power_users']
# except KeyError:
# Don't want just anyone adding kits
if not check_is_power_user(ctx=ctx):
logger.debug(f"{getuser()} does not have permission to add kits.")
return {'code':1, 'message':"This user does not have permission to add kits.", "status":"warning"}
# iterate through keys in dict
for type in exp:
if type == "password":
continue
# A submission type may use multiple kits.
for kt in exp[type]['kits']:
kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], cost_per_run=exp[type]["kits"][kt]["cost"])
kit = models.KitType(name=kt, used_for=[type.replace("_", " ").title()], constant_cost=exp[type]["kits"][kt]["constant_cost"], mutable_cost=exp[type]["kits"][kt]["mutable_cost"])
# A kit contains multiple reagent types.
for r in exp[type]['kits'][kt]['reagenttypes']:
# check if reagent type already exists.
look_up = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==r).first()
if look_up == None:
rt = models.ReagentType(name=r.replace(" ", "_").lower(), eol_ext=timedelta(30*exp[type]['kits'][kt]['reagenttypes'][r]['eol_ext']), kits=[kit])
@@ -478,15 +474,15 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> dict:
rt = look_up
rt.kits.append(kit)
# add this because I think it's necessary to get proper back population
# rt.kit_id.append(kit.id)
kit.reagent_types_id.append(rt.id)
ctx['database_session'].add(rt)
logger.debug(rt.__dict__)
logger.debug(kit.__dict__)
logger.debug(f"Kit construction reagent type: {rt.__dict__}")
logger.debug(f"Kit construction kit: {kit.__dict__}")
ctx['database_session'].add(kit)
ctx['database_session'].commit()
return {'code':0, 'message':'Kit has been added', 'status': 'information'}
def create_org_from_yaml(ctx:dict, org:dict) -> dict:
"""
Create and store a new organization based on a .yml file
@@ -498,30 +494,26 @@ def create_org_from_yaml(ctx:dict, org:dict) -> dict:
Returns:
dict: dictionary containing results of db addition
"""
# try:
# power_users = ctx['power_users']
# except KeyError:
# logger.debug("This user does not have permission to add kits.")
# return {'code':1,'message':"This user does not have permission to add organizations."}
# logger.debug(f"Adding organization for user: {getuser()}")
# if getuser() not in power_users:
# Don't want just anyone adding in clients
if not check_is_power_user(ctx=ctx):
logger.debug(f"{getuser()} does not have permission to add kits.")
return {'code':1, 'message':"This user does not have permission to add organizations."}
# the yml can contain multiple clients
for client in org:
cli_org = models.Organization(name=client.replace(" ", "_").lower(), cost_centre=org[client]['cost centre'])
# a client can contain multiple contacts
for contact in org[client]['contacts']:
cont_name = list(contact.keys())[0]
# check if contact already exists
look_up = ctx['database_session'].query(models.Contact).filter(models.Contact.name==cont_name).first()
if look_up == None:
cli_cont = models.Contact(name=cont_name, phone=contact[cont_name]['phone'], email=contact[cont_name]['email'], organization=[cli_org])
else:
cli_cont = look_up
cli_cont.organization.append(cli_org)
# cli_org.contacts.append(cli_cont)
# cli_org.contact_ids.append_foreign_key(cli_cont.id)
ctx['database_session'].add(cli_cont)
logger.debug(cli_cont.__dict__)
logger.debug(f"Client creation contact: {cli_cont.__dict__}")
logger.debug(f"Client creation client: {cli_org.__dict__}")
ctx['database_session'].add(cli_org)
ctx["database_session"].commit()
return {"code":0, "message":"Organization has been added."}
@@ -538,11 +530,11 @@ def lookup_all_sample_types(ctx:dict) -> list[str]:
list[str]: list of sample type names
"""
uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()]
# flattened list of lists
uses = list(set([item for sublist in uses for item in sublist]))
return uses
def get_all_available_modes(ctx:dict) -> list[str]:
"""
Get types of analysis for controls
@@ -553,6 +545,7 @@ def get_all_available_modes(ctx:dict) -> list[str]:
Returns:
list[str]: list of analysis types
"""
# Only one control is necessary since they all share the same control types.
rel = ctx['database_session'].query(models.Control).first()
try:
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
@@ -562,54 +555,49 @@ def get_all_available_modes(ctx:dict) -> list[str]:
return cols
def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None, end_date:date|None=None) -> list[models.Control]:
"""
Returns a list of control objects that are instances of the input controltype.
Between dates if supplied.
Args:
con_type (str): Name of the control type.
ctx (dict): Settings passed down from gui.
ctx (dict): Settings passed down from gui
con_type (str): Name of control type.
start_date (date | None, optional): Start date of query. Defaults to None.
end_date (date | None, optional): End date of query. Defaults to None.
Returns:
list: Control instances.
"""
list[models.Control]: list of control samples.
"""
logger.debug(f"Using dates: {start_date} to {end_date}")
if start_date != None and end_date != None:
output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).all()
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).filter(models.Control.submitted_date.between(start_date, end_date)).all()
else:
output = ctx['database_session'].query(models.Control).join(models.ControlType).filter_by(name=con_type).all()
logger.debug(f"Returned controls between dates: {output}")
return output
# query = ctx['database_session'].query(models.ControlType).filter_by(name=con_type)
# try:
# output = query.first().instances
# except AttributeError:
# output = None
# # Hacky solution to my not being able to get the sql query to work.
# if start_date != None and end_date != None:
# output = [item for item in output if item.submitted_date.date() > start_date and item.submitted_date.date() < end_date]
# # logger.debug(f"Type {con_type}: {query.first()}")
# return output
def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
"""
Get subtypes for a control analysis type
Get subtypes for a control analysis mode
Args:
ctx (dict): settings passed from gui
type (str): control type name
mode (str): analysis type name
mode (str): analysis mode name
Returns:
list[str]: list of subtype names
"""
# Only the first control of type is necessary since they all share subtypes
try:
outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0]
except TypeError:
return []
# Get analysis mode data as dict
jsoner = json.loads(getattr(outs, mode))
logger.debug(f"JSON out: {jsoner}")
try:
@@ -620,11 +608,30 @@ def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
return subtypes
def get_all_controls(ctx:dict):
def get_all_controls(ctx:dict) -> list[models.Control]:
"""
Retrieve a list of all controls from the database
Args:
ctx (dict): settings passed down from the gui.
Returns:
list[models.Control]: list of all control objects
"""
return ctx['database_session'].query(models.Control).all()
def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str):
def lookup_submission_by_rsl_num(ctx:dict, rsl_num:str) -> models.BasicSubmission:
"""
Retrieve a submission from the database based on rsl plate number
Args:
ctx (dict): settings passed down from gui
rsl_num (str): rsl plate number
Returns:
models.BasicSubmission: Submissions object retrieved from database
"""
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num.startswith(rsl_num)).first()
@@ -641,10 +648,15 @@ def delete_submission_by_id(ctx:dict, id:int) -> None:
id (int): id of submission to be deleted.
"""
# In order to properly do this Im' going to have to delete all of the secondary table stuff as well.
# Retrieve submission
sub = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
# Convert to dict for storing backup as a yml
backup = sub.to_dict()
with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
yaml.dump(backup, f)
try:
with open(Path(ctx['backup_path']).joinpath(f"{sub.rsl_plate_num}-backup({date.today().strftime('%Y%m%d')}).yml"), "w") as f:
yaml.dump(backup, f)
except KeyError:
pass
sub.reagents = []
for sample in sub.samples:
ctx['database_session'].delete(sample)

View File

@@ -21,7 +21,7 @@ class KitType(Base):
name = Column(String(64), unique=True) #: name of kit
submissions = relationship("BasicSubmission", back_populates="extraction_kit") #: submissions this kit was used for
used_for = Column(JSON) #: list of names of sample types this kit can process
cost_per_run = Column(FLOAT(2)) #: dollar amount for each full run of this kit
cost_per_run = Column(FLOAT(2)) #: dollar amount for each full run of this kit NOTE: depreciated, use the constant and mutable costs instead
mutable_cost = Column(FLOAT(2)) #: dollar amount that can change with number of columns (reagents, tips, etc)
constant_cost = Column(FLOAT(2)) #: dollar amount that will remain constant (plates, man hours, etc)
reagent_types = relationship("ReagentType", back_populates="kits", uselist=True, secondary=reagenttypes_kittypes) #: reagent types this kit contains
@@ -81,9 +81,7 @@ class Reagent(Base):
Returns:
str: string representing this object's type and lot number
"""
lot = str(self.lot)
r_type = str(self.type)
return f"{r_type} - {lot}"
return str(self.lot)
def to_sub_dict(self) -> dict:
"""

View File

@@ -16,6 +16,9 @@ class WWSample(Base):
rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date submission received
well_number = Column(String(8)) #: location on plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
testing_type = Column(String(64))
site_status = Column(String(64))
notes = Column(String(2000))
@@ -24,7 +27,7 @@ class WWSample(Base):
seq_submitted = Column(BOOLEAN())
ww_seq_run_id = Column(String(64))
sample_type = Column(String(8))
well_number = Column(String(8)) #: location on plate
def to_string(self) -> str:
"""

View File

@@ -35,6 +35,7 @@ class BasicSubmission(Base):
run_cost = Column(FLOAT(2)) #: total cost of running the plate. Set from kit costs at time of creation.
uploaded_by = Column(String(32)) #: user name of person who submitted the submission to the database.
# Allows for subclassing into ex. BacterialCulture, Wastewater, etc.
__mapper_args__ = {
"polymorphic_identity": "basic_submission",
"polymorphic_on": submission_type,
@@ -148,23 +149,25 @@ class BasicSubmission(Base):
}
return output
# Below are the custom submission
# Below are the custom submission types
class BacterialCulture(BasicSubmission):
"""
derivative submission type from BasicSubmission
"""
# control_id = Column(INTEGER, ForeignKey("_control_samples.id", ondelete="SET NULL", name="fk_BC_control_id"))
controls = relationship("Control", back_populates="submission", uselist=True) #: A control sample added to submission
samples = relationship("BCSample", back_populates="rsl_plate", uselist=True)
# bc_sample_id = Column(INTEGER, ForeignKey("_bc_samples.id", ondelete="SET NULL", name="fk_BC_sample_id"))
__mapper_args__ = {"polymorphic_identity": "bacterial_culture", "polymorphic_load": "inline"}
def to_dict(self) -> dict:
"""
Extends parent class method to add controls to dict
Returns:
dict: dictionary used in submissions summary
"""
output = super().to_dict()
output['controls'] = [item.to_sub_dict() for item in self.controls]
# logger.debug(f"{self.rsl_plate_num} technician: {output}")
return output

View File

@@ -2,7 +2,6 @@ from pandas import DataFrame
import re
def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
"""
get all unique values in a dataframe column by name
@@ -40,3 +39,5 @@ def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
# logger.debug(f"First run: {first_run}")
df = df.drop(df[df.name == first_run].index)
return df
else:
return None

View File

@@ -74,16 +74,15 @@ class SheetParser(object):
Returns:
pd.DataFrame: relevant dataframe from excel sheet
"""
"""
# self.xl is a pd.ExcelFile so we need to parse it into a df
submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object)
self.sub['submitter_plate_num'] = submission_info.iloc[0][1]
self.sub['rsl_plate_num'] = submission_info.iloc[10][1]
self.sub['submitted_date'] = submission_info.iloc[1][1]
self.sub['submitting_lab'] = submission_info.iloc[0][3]
self.sub['sample_count'] = submission_info.iloc[2][3]
self.sub['extraction_kit'] = submission_info.iloc[3][3]
return submission_info
@@ -104,10 +103,6 @@ class SheetParser(object):
if ii == 11:
continue
logger.debug(f"Running reagent parse for {row[1]} with type {type(row[1])} and value: {row[2]} with type {type(row[2])}")
# try:
# check = not np.isnan(row[1])
# except TypeError:
# check = True
if not isinstance(row[2], float) and check_not_nan(row[1]):
# must be prefixed with 'lot_' to be recognized by gui
try:
@@ -122,13 +117,7 @@ class SheetParser(object):
logger.debug(f"Couldn't upperize {row[2]}, must be a number")
output_var = row[2]
logger.debug(f"Output variable is {output_var}")
# self.sub[f"lot_{reagent_type}"] = output_var
# update 2023-02-10 to above allowing generation of expiry date in adding reagent to db.
logger.debug(f"Expiry date for imported reagent: {row[3]}")
# try:
# check = not np.isnan(row[3])
# except TypeError:
# check = True
if check_not_nan(row[3]):
expiry = row[3].date()
else:
@@ -146,19 +135,8 @@ class SheetParser(object):
# reagents
# must be prefixed with 'lot_' to be recognized by gui
# Todo: find a more adaptable way to read reagents.
reagent_range = submission_info.iloc[1:13, 4:8]
_parse_reagents(reagent_range)
# self.sub['lot_wash_1'] = submission_info.iloc[1][6] #if pd.isnull(submission_info.iloc[1][6]) else string_formatter(submission_info.iloc[1][6])
# self.sub['lot_wash_2'] = submission_info.iloc[2][6] #if pd.isnull(submission_info.iloc[2][6]) else string_formatter(submission_info.iloc[2][6])
# self.sub['lot_binding_buffer'] = submission_info.iloc[3][6] #if pd.isnull(submission_info.iloc[3][6]) else string_formatter(submission_info.iloc[3][6])
# self.sub['lot_magnetic_beads'] = submission_info.iloc[4][6] #if pd.isnull(submission_info.iloc[4][6]) else string_formatter(submission_info.iloc[4][6])
# self.sub['lot_lysis_buffer'] = submission_info.iloc[5][6] #if np.nan(submission_info.iloc[5][6]) else string_formatter(submission_info.iloc[5][6])
# self.sub['lot_elution_buffer'] = submission_info.iloc[6][6] #if pd.isnull(submission_info.iloc[6][6]) else string_formatter(submission_info.iloc[6][6])
# self.sub['lot_isopropanol'] = submission_info.iloc[9][6] #if pd.isnull(submission_info.iloc[9][6]) else string_formatter(submission_info.iloc[9][6])
# self.sub['lot_ethanol'] = submission_info.iloc[10][6] #if pd.isnull(submission_info.iloc[10][6]) else string_formatter(submission_info.iloc[10][6])
# self.sub['lot_positive_control'] = submission_info.iloc[103][1] #if pd.isnull(submission_info.iloc[103][1]) else string_formatter(submission_info.iloc[103][1])
# self.sub['lot_plate'] = submission_info.iloc[12][6] #if pd.isnull(submission_info.iloc[12][6]) else string_formatter(submission_info.iloc[12][6])
# get individual sample info
sample_parser = SampleParser(submission_info.iloc[15:111])
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples")
@@ -178,12 +156,8 @@ class SheetParser(object):
Args:
df (pd.DataFrame): input sub dataframe
"""
# logger.debug(df)
# iterate through sub-df rows
for ii, row in df.iterrows():
# try:
# check = not np.isnan(row[5])
# except TypeError:
# check = True
if not isinstance(row[5], float) and check_not_nan(row[5]):
# must be prefixed with 'lot_' to be recognized by gui
# regex below will remove 80% from 80% ethanol in the Wastewater kit.
@@ -202,34 +176,26 @@ class SheetParser(object):
else:
expiry = date.today()
self.sub[f"lot_{output_key}"] = {'lot':output_var, 'exp':expiry}
# parse submission sheet
submission_info = self._parse_generic("WW Submissions (ENTER HERE)")
# parse enrichment sheet
enrichment_info = self.xl.parse("Enrichment Worksheet", dtype=object)
# set enrichment reagent range
enr_reagent_range = enrichment_info.iloc[0:4, 9:20]
# parse extraction sheet
extraction_info = self.xl.parse("Extraction Worksheet", dtype=object)
# set extraction reagent range
ext_reagent_range = extraction_info.iloc[0:5, 9:20]
# parse qpcr sheet
qprc_info = self.xl.parse("qPCR Worksheet", dtype=object)
# set qpcr reagent range
pcr_reagent_range = qprc_info.iloc[0:5, 9:20]
# compile technician info
self.sub['technician'] = f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}"
_parse_reagents(enr_reagent_range)
_parse_reagents(ext_reagent_range)
_parse_reagents(pcr_reagent_range)
# reagents
# logger.debug(qprc_info)
# self.sub['lot_lysis_buffer'] = enrichment_info.iloc[0][14] #if pd.isnull(enrichment_info.iloc[0][14]) else string_formatter(enrichment_info.iloc[0][14])
# self.sub['lot_proteinase_K'] = enrichment_info.iloc[1][14] #if pd.isnull(enrichment_info.iloc[1][14]) else string_formatter(enrichment_info.iloc[1][14])
# self.sub['lot_magnetic_virus_particles'] = enrichment_info.iloc[2][14] #if pd.isnull(enrichment_info.iloc[2][14]) else string_formatter(enrichment_info.iloc[2][14])
# self.sub['lot_enrichment_reagent_1'] = enrichment_info.iloc[3][14] #if pd.isnull(enrichment_info.iloc[3][14]) else string_formatter(enrichment_info.iloc[3][14])
# self.sub['lot_binding_buffer'] = extraction_info.iloc[0][14] #if pd.isnull(extraction_info.iloc[0][14]) else string_formatter(extraction_info.iloc[0][14])
# self.sub['lot_magnetic_beads'] = extraction_info.iloc[1][14] #if pd.isnull(extraction_info.iloc[1][14]) else string_formatter(extraction_info.iloc[1][14])
# self.sub['lot_wash'] = extraction_info.iloc[2][14] #if pd.isnull(extraction_info.iloc[2][14]) else string_formatter(extraction_info.iloc[2][14])
# self.sub['lot_ethanol'] = extraction_info.iloc[3][14] #if pd.isnull(extraction_info.iloc[3][14]) else string_formatter(extraction_info.iloc[3][14])
# self.sub['lot_elution_buffer'] = extraction_info.iloc[4][14] #if pd.isnull(extraction_info.iloc[4][14]) else string_formatter(extraction_info.iloc[4][14])
# self.sub['lot_master_mix'] = qprc_info.iloc[0][14] #if pd.isnull(qprc_info.iloc[0][14]) else string_formatter(qprc_info.iloc[0][14])
# self.sub['lot_pre_mix_1'] = qprc_info.iloc[1][14] #if pd.isnull(qprc_info.iloc[1][14]) else string_formatter(qprc_info.iloc[1][14])
# self.sub['lot_pre_mix_2'] = qprc_info.iloc[2][14] #if pd.isnull(qprc_info.iloc[2][14]) else string_formatter(qprc_info.iloc[2][14])
# self.sub['lot_positive_control'] = qprc_info.iloc[3][14] #if pd.isnull(qprc_info.iloc[3][14]) else string_formatter(qprc_info.iloc[3][14])
# self.sub['lot_ddh2o'] = qprc_info.iloc[4][14] #if pd.isnull(qprc_info.iloc[4][14]) else string_formatter(qprc_info.iloc[4][14])
# get individual sample info
# parse samples
sample_parser = SampleParser(submission_info.iloc[16:40])
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples")
self.sub['samples'] = sample_parse()
@@ -241,6 +207,12 @@ class SampleParser(object):
"""
def __init__(self, df:pd.DataFrame) -> None:
"""
convert sample sub-dataframe to dictionary of records
Args:
df (pd.DataFrame): input sample dataframe
"""
self.samples = df.to_dict("records")
@@ -287,6 +259,7 @@ class SampleParser(object):
not_a_nan = not np.isnan(sample['Unnamed: 3'])
except TypeError:
not_a_nan = True
# if we don't have a sample full id, make one up
if not_a_nan:
new.ww_sample_full_id = sample['Unnamed: 3']
else:

View File

@@ -1,8 +1,6 @@
from pandas import DataFrame, concat
from operator import itemgetter
from pandas import DataFrame
# from backend.db import models
import json
import logging
from jinja2 import Environment, FileSystemLoader
from datetime import date, timedelta
@@ -38,13 +36,8 @@ def make_report_xlsx(records:list[dict]) -> DataFrame:
df2 = df.groupby(["Submitting Lab", "Extraction Kit"]).agg({'Extraction Kit':'count', 'Cost': 'sum', 'Sample Count':'sum'})
df2 = df2.rename(columns={"Extraction Kit": 'Kit Count'})
logger.debug(f"Output daftaframe for xlsx: {df2.columns}")
# apply formating to cost column
# df2.iloc[:, (df2.columns.get_level_values(1)=='sum') & (df2.columns.get_level_values(0)=='Cost')] = df2.iloc[:, (df2.columns.get_level_values(1)=='sum') & (df2.columns.get_level_values(0)=='Cost')].applymap('${:,.2f}'.format)
return df2
# def split_row_item(item:str) -> float:
# return item.split(" ")[-1]
def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str:
@@ -63,23 +56,20 @@ def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str:
output = []
logger.debug(f"Report DataFrame: {df}")
for ii, row in enumerate(df.iterrows()):
# row = [item for item in row]
logger.debug(f"Row {ii}: {row}")
lab = row[0][0]
logger.debug(type(row))
logger.debug(f"Old lab: {old_lab}, Current lab: {lab}")
logger.debug(f"Name: {row[0][1]}")
data = [item for item in row[1]]
# logger.debug(data)
# logger.debug(f"Cost: {split_row_item(data[1])}")
# logger.debug(f"Kit count: {split_row_item(data[0])}")
# logger.debug(f"Sample Count: {split_row_item(data[2])}")
kit = dict(name=row[0][1], cost=data[1], plate_count=int(data[0]), sample_count=int(data[2]))
# if this is the same lab as before add together
if lab == old_lab:
output[-1]['kits'].append(kit)
output[-1]['total_cost'] += kit['cost']
output[-1]['total_samples'] += kit['sample_count']
output[-1]['total_plates'] += kit['plate_count']
# if not the same lab, make a new one
else:
adder = dict(lab=lab, kits=[kit], total_cost=kit['cost'], total_samples=kit['sample_count'], total_plates=kit['plate_count'])
output.append(adder)
@@ -91,83 +81,6 @@ def make_report_html(df:DataFrame, start_date:date, end_date:date) -> str:
return html
# def split_controls_dictionary(ctx:dict, input_dict) -> list[dict]:
# # this will be the date in string form
# dict_name = list(input_dict.keys())[0]
# # the data associated with the date key
# sub_dict = input_dict[dict_name]
# # How many "count", "Percent", etc are in the dictionary
# data_size = get_dict_size(sub_dict)
# output = []
# for ii in range(data_size):
# new_dict = {}
# for genus in sub_dict:
# logger.debug(genus)
# sub_name = list(sub_dict[genus].keys())[ii]
# new_dict[genus] = sub_dict[genus][sub_name]
# output.append({"date":dict_name, "name": sub_name, "data": new_dict})
# return output
# def get_dict_size(input:dict):
# return max(len(input[item]) for item in input)
# def convert_all_controls(ctx:dict, data:list) -> dict:
# dfs = {}
# dict_list = [split_controls_dictionary(ctx, datum) for datum in data]
# dict_list = [item for sublist in dict_list for item in sublist]
# names = list(set([datum['name'] for datum in dict_list]))
# for name in names:
# # df = DataFrame()
# # entries = [{item['date']:item['data']} for item in dict_list if item['name']==name]
# # series_list = []
# # df = pd.json_normalize(entries)
# # for entry in entries:
# # col_name = list(entry.keys())[0]
# # col_dict = entry[col_name]
# # series = pd.Series(data=col_dict.values(), index=col_dict.keys(), name=col_name)
# # # df[col_name] = series.values
# # # logger.debug(df.index)
# # series_list.append(series)
# # df = DataFrame(series_list).T.fillna(0)
# # logger.debug(df)
# dfs['name'] = df
# return dfs
# def convert_control_by_mode(ctx:dict, control:models.Control, mode:str) -> list[dict]:
# """
# split control object into analysis types... can I move this into the class itself?
# turns out I can
# Args:
# ctx (dict): settings passed from gui
# control (models.Control): control to be parsed into list
# mode (str): analysis type
# Returns:
# list[dict]: list of records
# """
# output = []
# data = json.loads(getattr(control, mode))
# for genus in data:
# _dict = {}
# _dict['name'] = control.name
# _dict['submitted_date'] = control.submitted_date
# _dict['genus'] = genus
# _dict['target'] = 'Target' if genus.strip("*") in control.controltype.targets else "Off-target"
# for key in data[genus]:
# _dict[key] = data[genus][key]
# output.append(_dict)
# # logger.debug(output)
# return output
def convert_data_list_to_df(ctx:dict, input:list[dict], subtype:str|None=None) -> DataFrame:
"""
Convert list of control records to dataframe