documentation and converted to username based exclusion of adding new kits
This commit is contained in:
@@ -12,6 +12,7 @@ import base64
|
||||
from sqlalchemy import JSON
|
||||
import json
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from getpass import getuser
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
@@ -20,8 +21,19 @@ def get_kits_by_use( ctx:dict, kittype_str:str|None) -> list:
|
||||
# ctx dict should contain the database session
|
||||
|
||||
|
||||
def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None:
|
||||
def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|dict:
|
||||
"""
|
||||
Upserts submissions into database
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed down from gui
|
||||
base_submission (models.BasicSubmission): submission to be add to db
|
||||
|
||||
Returns:
|
||||
None|dict : object that indicates issue raised for reporting in gui
|
||||
"""
|
||||
logger.debug(f"Hello from store_submission")
|
||||
# Add all samples to sample table
|
||||
for sample in base_submission.samples:
|
||||
sample.rsl_plate = base_submission
|
||||
logger.debug(f"Attempting to add sample: {sample.to_string()}")
|
||||
@@ -30,6 +42,7 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None:
|
||||
except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e:
|
||||
logger.debug(f"Hit an integrity error : {e}")
|
||||
continue
|
||||
# Add submission to submission table
|
||||
ctx['database_session'].add(base_submission)
|
||||
logger.debug(f"Attempting to add submission: {base_submission.rsl_plate_num}")
|
||||
try:
|
||||
@@ -45,26 +58,51 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None:
|
||||
return None
|
||||
|
||||
|
||||
def store_reagent(ctx:dict, reagent:models.Reagent) -> None:
|
||||
def store_reagent(ctx:dict, reagent:models.Reagent) -> None|dict:
|
||||
"""
|
||||
_summary_
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed down from gui
|
||||
reagent (models.Reagent): Reagent object to be added to db
|
||||
|
||||
Returns:
|
||||
None|dict: obejct indicating issue to be reported in the gui
|
||||
"""
|
||||
logger.debug(reagent.__dict__)
|
||||
ctx['database_session'].add(reagent)
|
||||
try:
|
||||
ctx['database_session'].commit()
|
||||
except OperationalError:
|
||||
except (sqlite3.OperationalError, sqlalchemy.exc.OperationalError):
|
||||
return {"message":"The database is locked for editing."}
|
||||
|
||||
|
||||
def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmission:
|
||||
"""
|
||||
Construct submission obejct from dictionary
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed down from gui
|
||||
info_dict (dict): dictionary to be transformed
|
||||
|
||||
Returns:
|
||||
models.BasicSubmission: Constructed submission object
|
||||
"""
|
||||
# convert submission type into model name
|
||||
query = info_dict['submission_type'].replace(" ", "")
|
||||
# check database for existing object
|
||||
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
|
||||
msg = "This submission already exists.\nWould you like to overwrite?"
|
||||
# get model based on submission type converted above
|
||||
model = getattr(models, query)
|
||||
info_dict['submission_type'] = info_dict['submission_type'].replace(" ", "_").lower()
|
||||
# if query return nothing, ie doesn't already exist in db
|
||||
if instance == None:
|
||||
instance = model()
|
||||
msg = None
|
||||
for item in info_dict:
|
||||
logger.debug(f"Setting {item} to {info_dict[item]}")
|
||||
# set fields based on keys in dictionary
|
||||
match item:
|
||||
case "extraction_kit":
|
||||
q_str = info_dict[item]
|
||||
@@ -86,35 +124,34 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
|
||||
logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
|
||||
info_dict[item] = uuid.uuid4().hex.upper()
|
||||
field_value = info_dict[item]
|
||||
# case "samples":
|
||||
# for sample in info_dict[item]:
|
||||
# instance.samples.append(sample)
|
||||
# continue
|
||||
case _:
|
||||
field_value = info_dict[item]
|
||||
# insert into field
|
||||
try:
|
||||
setattr(instance, item, field_value)
|
||||
except AttributeError:
|
||||
logger.debug(f"Could not set attribute: {item} to {info_dict[item]}")
|
||||
continue
|
||||
# logger.debug(instance.__dict__)
|
||||
logger.debug(f"Constructed instance: {instance.to_string()}")
|
||||
logger.debug(msg)
|
||||
return instance, {'message':msg}
|
||||
# looked_up = []
|
||||
# for reagent in reagents:
|
||||
# my_reagent = lookup_reagent(reagent)
|
||||
# logger.debug(my_reagent)
|
||||
# looked_up.append(my_reagent)
|
||||
# logger.debug(looked_up)
|
||||
# instance.reagents = looked_up
|
||||
# ctx['database_session'].add(instance)
|
||||
# ctx['database_session'].commit()
|
||||
|
||||
|
||||
def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
|
||||
"""
|
||||
Construct reagent object from dictionary
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed down from gui
|
||||
info_dict (dict): dictionary to be converted
|
||||
|
||||
Returns:
|
||||
models.Reagent: Constructed reagent object
|
||||
"""
|
||||
reagent = models.Reagent()
|
||||
for item in info_dict:
|
||||
logger.debug(f"Reagent info item: {item}")
|
||||
# set fields based on keys in dictionary
|
||||
match item:
|
||||
case "lot":
|
||||
reagent.lot = info_dict[item].upper()
|
||||
@@ -122,25 +159,55 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
|
||||
reagent.expiry = info_dict[item]
|
||||
case "type":
|
||||
reagent.type = lookup_reagenttype_by_name(ctx=ctx, rt_name=info_dict[item].replace(" ", "_").lower())
|
||||
# add end-of-life extension from reagent type to expiry date
|
||||
try:
|
||||
reagent.expiry = reagent.expiry + reagent.type.eol_ext
|
||||
except TypeError as e:
|
||||
logger.debug(f"WE got a type error: {e}.")
|
||||
logger.debug(f"We got a type error: {e}.")
|
||||
except AttributeError:
|
||||
pass
|
||||
return reagent
|
||||
|
||||
|
||||
|
||||
def lookup_reagent(ctx:dict, reagent_lot:str):
|
||||
def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
|
||||
"""
|
||||
Query db for reagent based on lot number
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed down from gui
|
||||
reagent_lot (str): lot number to query
|
||||
|
||||
Returns:
|
||||
models.Reagent: looked up reagent
|
||||
"""
|
||||
lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
|
||||
return lookedup
|
||||
|
||||
def get_all_reagenttype_names(ctx:dict) -> list[str]:
|
||||
"""
|
||||
Lookup all reagent types and get names
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
|
||||
Returns:
|
||||
list[str]: reagent type names
|
||||
"""
|
||||
lookedup = [item.__str__() for item in ctx['database_session'].query(models.ReagentType).all()]
|
||||
return lookedup
|
||||
|
||||
def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
|
||||
"""
|
||||
Lookup a single reagent type by name
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
rt_name (str): reagent type name to look up
|
||||
|
||||
Returns:
|
||||
models.ReagentType: looked up reagent type
|
||||
"""
|
||||
logger.debug(f"Looking up ReagentType by name: {rt_name}")
|
||||
lookedup = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==rt_name).first()
|
||||
logger.debug(f"Found ReagentType: {lookedup}")
|
||||
@@ -148,27 +215,78 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
|
||||
|
||||
|
||||
def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
|
||||
# return [item for item in
|
||||
"""
|
||||
Lookup a kit by an sample type its used for
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
used_by (str): sample type (should be string in D3 of excel sheet)
|
||||
|
||||
Returns:
|
||||
list[models.KitType]: list of kittypes that have that sample type in their uses
|
||||
"""
|
||||
return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by))
|
||||
|
||||
def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
|
||||
"""
|
||||
Lookup a kit type by name
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from bui
|
||||
name (str): name of kit to query
|
||||
|
||||
Returns:
|
||||
models.KitType: retrieved kittype
|
||||
"""
|
||||
logger.debug(f"Querying kittype: {name}")
|
||||
return ctx['database_session'].query(models.KitType).filter(models.KitType.name==name).first()
|
||||
|
||||
|
||||
def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.ReagentType]:
|
||||
def lookup_regent_by_type_name(ctx:dict, type_name:str) -> list[models.Reagent]:
|
||||
"""
|
||||
Lookup reagents by their type name
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
type_name (str): reagent type name
|
||||
|
||||
Returns:
|
||||
list[models.Reagent]: list of retrieved reagents
|
||||
"""
|
||||
# return [item for item in ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()]
|
||||
return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).all()
|
||||
|
||||
|
||||
def lookup_regent_by_type_name_and_kit_name(ctx:dict, type_name:str, kit_name:str) -> list[models.Reagent]:
|
||||
"""
|
||||
Lookup reagents by their type name and kits they belong to
|
||||
|
||||
Args:
|
||||
ctx (dict): settings pass by gui
|
||||
type_name (str): reagent type name
|
||||
kit_name (str): kit name
|
||||
|
||||
Returns:
|
||||
list[models.Reagent]: list of retrieved reagents
|
||||
"""
|
||||
# Hang on, this is going to be a long one.
|
||||
by_type = ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name.endswith(type_name))
|
||||
# add filter for kit name
|
||||
add_in = by_type.join(models.ReagentType.kits).filter(models.KitType.name==kit_name)
|
||||
return add_in
|
||||
|
||||
|
||||
def lookup_all_submissions_by_type(ctx:dict, type:str|None=None):
|
||||
def lookup_all_submissions_by_type(ctx:dict, type:str|None=None) -> list[models.BasicSubmission]:
|
||||
"""
|
||||
Get all submissions, filtering by type if given
|
||||
|
||||
Args:
|
||||
ctx (dict): settings pass from gui
|
||||
type (str | None, optional): submission type (should be string in D3 of excel sheet). Defaults to None.
|
||||
|
||||
Returns:
|
||||
_type_: list of retrieved submissions
|
||||
"""
|
||||
if type == None:
|
||||
subs = ctx['database_session'].query(models.BasicSubmission).all()
|
||||
else:
|
||||
@@ -176,20 +294,60 @@ def lookup_all_submissions_by_type(ctx:dict, type:str|None=None):
|
||||
return subs
|
||||
|
||||
def lookup_all_orgs(ctx:dict) -> list[models.Organization]:
|
||||
"""
|
||||
Lookup all organizations (labs)
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
|
||||
Returns:
|
||||
list[models.Organization]: list of retrieved organizations
|
||||
"""
|
||||
return ctx['database_session'].query(models.Organization).all()
|
||||
|
||||
def lookup_org_by_name(ctx:dict, name:str|None) -> models.Organization:
|
||||
"""
|
||||
Lookup organization (lab) by name.
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
name (str | None): name of organization
|
||||
|
||||
Returns:
|
||||
models.Organization: retrieved organization
|
||||
"""
|
||||
logger.debug(f"Querying organization: {name}")
|
||||
return ctx['database_session'].query(models.Organization).filter(models.Organization.name==name).first()
|
||||
|
||||
def submissions_to_df(ctx:dict, type:str|None=None):
|
||||
def submissions_to_df(ctx:dict, type:str|None=None) -> pd.DataFrame:
|
||||
"""
|
||||
Convert submissions looked up by type to dataframe
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed by gui
|
||||
type (str | None, optional): submission type (should be string in D3 of excel sheet) Defaults to None.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: dataframe constructed from retrieved submissions
|
||||
"""
|
||||
logger.debug(f"Type: {type}")
|
||||
# pass to lookup function
|
||||
subs = [item.to_dict() for item in lookup_all_submissions_by_type(ctx=ctx, type=type)]
|
||||
df = pd.DataFrame.from_records(subs)
|
||||
return df
|
||||
|
||||
|
||||
def lookup_submission_by_id(ctx:dict, id:int) -> models.BasicSubmission:
|
||||
"""
|
||||
Lookup submission by id number
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
id (int): submission id number
|
||||
|
||||
Returns:
|
||||
models.BasicSubmission: retrieved submission
|
||||
"""
|
||||
return ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.id==id).first()
|
||||
|
||||
|
||||
@@ -198,6 +356,17 @@ def create_submission_details(ctx:dict, sub_id:int) -> dict:
|
||||
|
||||
|
||||
def lookup_submissions_by_date_range(ctx:dict, start_date:datetime.date, end_date:datetime.date) -> list[models.BasicSubmission]:
|
||||
"""
|
||||
Lookup submissions by range of submitted dates
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
start_date (datetime.date): date to start looking
|
||||
end_date (datetime.date): date to end looking
|
||||
|
||||
Returns:
|
||||
list[models.BasicSubmission]: list of retrieved submissions
|
||||
"""
|
||||
return ctx['database_session'].query(models.BasicSubmission).filter(and_(models.BasicSubmission.submitted_date > start_date, models.BasicSubmission.submitted_date < end_date)).all()
|
||||
|
||||
|
||||
@@ -226,12 +395,13 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> None:
|
||||
exp (dict): Experiment dictionary created from yaml file
|
||||
"""
|
||||
try:
|
||||
exp['password'].decode()
|
||||
except (UnicodeDecodeError, AttributeError):
|
||||
exp['password'] = exp['password'].encode()
|
||||
if base64.b64encode(exp['password']) != b'cnNsX3N1Ym1pNTVpb25z':
|
||||
logger.debug(f"Not the correct password.")
|
||||
return
|
||||
super_users = ctx['super_users']
|
||||
except KeyError:
|
||||
logger.debug("This user does not have permission to add kits.")
|
||||
return {'code':1,'message':"This user does not have permission to add kits."}
|
||||
if getuser not in super_users:
|
||||
logger.debug("This user does not have permission to add kits.")
|
||||
return {'code':1, 'message':"This user does not have permission to add kits."}
|
||||
for type in exp:
|
||||
if type == "password":
|
||||
continue
|
||||
@@ -249,9 +419,19 @@ def create_kit_from_yaml(ctx:dict, exp:dict) -> None:
|
||||
logger.debug(kit.__dict__)
|
||||
ctx['database_session'].add(kit)
|
||||
ctx['database_session'].commit()
|
||||
return {'code':0, 'message':'Kit has been added'}
|
||||
|
||||
|
||||
def lookup_all_sample_types(ctx:dict) -> list[str]:
|
||||
"""
|
||||
Lookup all sample types and get names
|
||||
|
||||
Args:
|
||||
ctx (dict): settings pass from gui
|
||||
|
||||
Returns:
|
||||
list[str]: list of sample type names
|
||||
"""
|
||||
uses = [item.used_for for item in ctx['database_session'].query(models.KitType).all()]
|
||||
uses = list(set([item for sublist in uses for item in sublist]))
|
||||
return uses
|
||||
@@ -259,6 +439,15 @@ def lookup_all_sample_types(ctx:dict) -> list[str]:
|
||||
|
||||
|
||||
def get_all_available_modes(ctx:dict) -> list[str]:
|
||||
"""
|
||||
Get types of analysis for controls
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
|
||||
Returns:
|
||||
list[str]: list of analysis types
|
||||
"""
|
||||
rel = ctx['database_session'].query(models.Control).first()
|
||||
try:
|
||||
cols = [item.name for item in list(rel.__table__.columns) if isinstance(item.type, JSON)]
|
||||
@@ -294,7 +483,18 @@ def get_all_controls_by_type(ctx:dict, con_type:str, start_date:date|None=None,
|
||||
return output
|
||||
|
||||
|
||||
def get_control_subtypes(ctx:dict, type:str, mode:str):
|
||||
def get_control_subtypes(ctx:dict, type:str, mode:str) -> list[str]:
|
||||
"""
|
||||
Get subtypes for a control analysis type
|
||||
|
||||
Args:
|
||||
ctx (dict): settings passed from gui
|
||||
type (str): control type name
|
||||
mode (str): analysis type name
|
||||
|
||||
Returns:
|
||||
list[str]: list of subtype names
|
||||
"""
|
||||
try:
|
||||
outs = get_all_controls_by_type(ctx=ctx, con_type=type)[0]
|
||||
except TypeError:
|
||||
|
||||
Reference in New Issue
Block a user