diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5209313..154782e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 202307.02
+
+- Better column counting for cost recovery purposes.
+- Improvements to pydantic validations.
+
## 202307.01
- Fixed bug where date increment of controls not working for multiple same dates.
diff --git a/TODO.md b/TODO.md
index 85324ba..215f79f 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,9 +1,8 @@
-- [ ] Think about trying to migrate required column in reagenttypes to reagenttypes_kittypes
- - In case reagent type is required for one kit, but not another. Possible?
+- [x] Code clean-up and refactor (2023-07).
+- [ ] Migrate context settings to pydantic-settings model.
- [x] Insert column into reagent type to indicate if reagent is required for kit.
- Needed to keep interchangeable bead plates from being forced into forms.
-- [ ] Migrate context settings to pydantic-settings model.
-- [ ] Migrate the parser.sub dictionary to pydantic models.
+- [x] Migrate the parser.sub dictionary to pydantic models.
- [x] Move type_decider to metadata based method rather than excel map.
- [x] Solve bug for plate mapping when two samples of same name are in different rows.
- Try importing "L:\Robotics Laboratory Support\Submissions\Wastewater\2023\2023-06-21\RSL-WW-20230621-1.xlsx" for example.
diff --git a/alembic/versions/4c6221f01324_added_last_used_to_reagenttype.py b/alembic/versions/4c6221f01324_added_last_used_to_reagenttype.py
new file mode 100644
index 0000000..2079905
--- /dev/null
+++ b/alembic/versions/4c6221f01324_added_last_used_to_reagenttype.py
@@ -0,0 +1,32 @@
+"""added last_used to reagenttype
+
+Revision ID: 4c6221f01324
+Revises: 7aadd731ff63
+Create Date: 2023-07-07 14:32:24.064042
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '4c6221f01324'
+down_revision = '7aadd731ff63'
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table('_reagent_types', schema=None) as batch_op:
+ batch_op.add_column(sa.Column('last_used', sa.String(length=32), nullable=True))
+
+ # ### end Alembic commands ###
+
+
+def downgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ with op.batch_alter_table('_reagent_types', schema=None) as batch_op:
+ batch_op.drop_column('last_used')
+
+ # ### end Alembic commands ###
diff --git a/src/submissions/__init__.py b/src/submissions/__init__.py
index 800c631..2c63dfe 100644
--- a/src/submissions/__init__.py
+++ b/src/submissions/__init__.py
@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package
__project__ = "submissions"
-__version__ = "202307.1b"
+__version__ = "202307.2b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada"
diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py
index 3236628..d3ee7ea 100644
--- a/src/submissions/backend/db/functions.py
+++ b/src/submissions/backend/db/functions.py
@@ -5,14 +5,12 @@ Convenience functions for interacting with the database.
from . import models
from .models.kits import reagenttypes_kittypes
from .models.submissions import reagents_submissions
-# from .models.samples import WWSample
import pandas as pd
import sqlalchemy.exc
import sqlite3
import logging
from datetime import date, datetime, timedelta
from sqlalchemy import and_
-import uuid
from sqlalchemy import JSON, event
from sqlalchemy.engine import Engine
import json
@@ -22,6 +20,7 @@ import yaml
from pathlib import Path
+
logger = logging.getLogger(f"submissions.{__name__}")
# The below _should_ allow automatic creation of foreign keys in the database
@@ -111,12 +110,12 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
# convert submission type into model name
query = info_dict['submission_type'].replace(" ", "")
# Ensure an rsl plate number exists for the plate
- # if info_dict["rsl_plate_num"] == 'nan' or info_dict["rsl_plate_num"] == None or not check_not_nan(info_dict["rsl_plate_num"]):
if not check_regex_match("^RSL", info_dict["rsl_plate_num"]):
instance = None
msg = "A proper RSL plate number is required."
return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
else:
+ # enforce conventions on the rsl plate number from the form
info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name
# check database for existing object
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
@@ -160,10 +159,11 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
case "submitter_plate_num":
# Because of unique constraint, there will be problems with
# multiple submissions named 'None', so...
+ # Should be depreciated with use of pydantic validator
logger.debug(f"Submitter plate id: {info_dict[item]}")
- if info_dict[item] == None or info_dict[item] == "None" or info_dict[item] == "":
- logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
- info_dict[item] = uuid.uuid4().hex.upper()
+ # if info_dict[item] == None or info_dict[item] == "None" or info_dict[item] == "":
+ # logger.debug(f"Got None as a submitter plate number, inserting random string to preserve database unique constraint.")
+ # info_dict[item] = uuid.uuid4().hex.upper()
field_value = info_dict[item]
case _:
field_value = info_dict[item]
@@ -233,20 +233,6 @@ def construct_reagent(ctx:dict, info_dict:dict) -> models.Reagent:
# pass
return reagent
-# def lookup_reagent(ctx:dict, reagent_lot:str) -> models.Reagent:
-# """
-# Query db for reagent based on lot number
-
-# Args:
-# ctx (dict): settings passed down from gui
-# reagent_lot (str): lot number to query
-
-# Returns:
-# models.Reagent: looked up reagent
-# """
-# lookedup = ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
-# return lookedup
-
def get_all_reagenttype_names(ctx:dict) -> list[str]:
"""
Lookup all reagent types and get names
@@ -276,7 +262,7 @@ def lookup_reagenttype_by_name(ctx:dict, rt_name:str) -> models.ReagentType:
logger.debug(f"Found ReagentType: {lookedup}")
return lookedup
-def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
+def lookup_kittype_by_use(ctx:dict, used_by:str|None=None) -> list[models.KitType]:
"""
Lookup kits by a sample type its used for
@@ -287,7 +273,10 @@ def lookup_kittype_by_use(ctx:dict, used_by:str) -> list[models.KitType]:
Returns:
list[models.KitType]: list of kittypes that have that sample type in their uses
"""
- return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
+ if used_by != None:
+ return ctx['database_session'].query(models.KitType).filter(models.KitType.used_for.contains(used_by)).all()
+ else:
+ return ctx['database_session'].query(models.KitType).all()
def lookup_kittype_by_name(ctx:dict, name:str) -> models.KitType:
"""
@@ -872,19 +861,34 @@ def platemap_plate(submission:models.BasicSubmission) -> list:
# image = make_plate_map(plate_dicto)
return plate_dicto
-
-def lookup_reagent(ctx:dict, reagent_lot:str|None=None, type_name:str|None=None) -> models.Reagent:
+def lookup_reagent(ctx:dict, reagent_lot:str, type_name:str|None=None) -> models.Reagent:
"""
- Query db for reagent based on lot number
+ Query db for reagent based on lot number, with optional reagent type to enforce
Args:
ctx (dict): settings passed down from gui
reagent_lot (str): lot number to query
+ type_name (str | None, optional): name of reagent type. Defaults to None.
Returns:
models.Reagent: looked up reagent
"""
if reagent_lot != None and type_name != None:
- return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).all()
+ return ctx['database_session'].query(models.Reagent).join(models.Reagent.type, aliased=True).filter(models.ReagentType.name==type_name).filter(models.Reagent.lot==reagent_lot).first()
elif type_name == None:
- return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
\ No newline at end of file
+ return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
+
+def lookup_last_used_reagenttype_lot(ctx:dict, type_name:str) -> models.Reagent:
+ """
+ Look up the last used reagent of the reagent type
+
+ Args:
+ ctx (dict): Settings passed down from gui
+ type_name (str): Name of reagent type
+
+ Returns:
+ models.Reagent: Reagent object with last used lot.
+ """
+ rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first()
+ logger.debug(f"Reagent type looked up for {type_name}: {rt.__str__()}")
+ return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name)
\ No newline at end of file
diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py
index caf0fc6..e26e240 100644
--- a/src/submissions/backend/db/models/kits.py
+++ b/src/submissions/backend/db/models/kits.py
@@ -55,11 +55,8 @@ class ReagentType(Base):
instances = relationship("Reagent", back_populates="type") #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
required = Column(INTEGER, server_default="1") #: sqlite boolean to determine if reagent type is essential for the kit
- # __table_args__ = (
- # CheckConstraint(required >= 0, name='check_required_positive'),
- # CheckConstraint(required < 2, name='check_required_less_2'),
- # {})
-
+ last_used = Column(String(32)) #: last used lot number of this type of reagent
+
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
@@ -125,6 +122,13 @@ class Reagent(Base):
"expiry": place_holder.strftime("%Y-%m-%d")
}
+ def to_reagent_dict(self) -> dict:
+ return {
+ "type": self.type.name,
+ "lot": self.lot,
+ "expiry": self.expiry.strftime("%Y-%m-%d")
+ }
+
class Discount(Base):
"""
diff --git a/src/submissions/backend/db/models/samples.py b/src/submissions/backend/db/models/samples.py
index f2ab46d..5b8a2a0 100644
--- a/src/submissions/backend/db/models/samples.py
+++ b/src/submissions/backend/db/models/samples.py
@@ -6,6 +6,7 @@ from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, FLOAT, BO
from sqlalchemy.orm import relationship
import logging
+
logger = logging.getLogger(f"submissions.{__name__}")
@@ -22,7 +23,7 @@ class WWSample(Base):
rsl_plate = relationship("Wastewater", back_populates="samples") #: relationship to parent plate
rsl_plate_id = Column(INTEGER, ForeignKey("_submissions.id", ondelete="SET NULL", name="fk_WWS_submission_id"))
collection_date = Column(TIMESTAMP) #: Date submission received
- well_number = Column(String(8)) #: location on 24 well plate
+ well_number = Column(String(8)) #: location on 96 well plate
# The following are fields from the sample tracking excel sheet Ruth put together.
# I have no idea when they will be implemented or how.
testing_type = Column(String(64))
@@ -36,7 +37,7 @@ class WWSample(Base):
ww_seq_run_id = Column(String(64))
sample_type = Column(String(8))
pcr_results = Column(JSON)
- elution_well = Column(String(8)) #: location on 96 well plate
+ well_24 = Column(String(8)) #: location on 24 well plate
artic_rsl_plate = relationship("WastewaterArtic", back_populates="samples")
artic_well_number = Column(String(8))
@@ -57,10 +58,6 @@ class WWSample(Base):
Returns:
dict: well location and id NOTE: keys must sync with BCSample to_sub_dict below
"""
- # well_col = self.well_number[1:]
- # well_row = self.well_number[0]
- # if well_col > 4:
- # well
if self.ct_n1 != None and self.ct_n2 != None:
# logger.debug(f"Using well info in name.")
name = f"{self.ww_sample_full_id}\n\t- ct N1: {'{:.2f}'.format(self.ct_n1)} ({self.n1_status})\n\t- ct N2: {'{:.2f}'.format(self.ct_n2)} ({self.n2_status})"
@@ -87,8 +84,8 @@ class WWSample(Base):
except TypeError as e:
logger.error(f"Couldn't check positives for {self.rsl_number}. Looks like there isn't PCR data.")
return None
- well_row = row_dict[self.elution_well[0]]
- well_col = self.elution_well[1:]
+ well_row = row_dict[self.well_number[0]]
+ well_col = self.well_number[1:]
# if positive:
# try:
# # The first character of the elution well is the row
diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py
index dbe2a04..68aefff 100644
--- a/src/submissions/backend/db/models/submissions.py
+++ b/src/submissions/backend/db/models/submissions.py
@@ -5,7 +5,6 @@ import math
from . import Base
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, Table, JSON, FLOAT
from sqlalchemy.orm import relationship
-from datetime import datetime as dt
import logging
import json
from json.decoder import JSONDecodeError
@@ -164,7 +163,8 @@ class BasicSubmission(Base):
def calculate_base_cost(self):
try:
- cols_count_96 = ceil(int(self.sample_count) / 8)
+ # cols_count_96 = ceil(int(self.sample_count) / 8)
+ cols_count_96 = self.calculate_column_count()
except Exception as e:
logger.error(f"Column count error: {e}")
# cols_count_24 = ceil(int(self.sample_count) / 3)
@@ -173,6 +173,11 @@ class BasicSubmission(Base):
except Exception as e:
logger.error(f"Calculation error: {e}")
+ def calculate_column_count(self):
+ columns = [int(sample.well_number[-2:]) for sample in self.samples]
+ logger.debug(f"Here are the columns for {self.rsl_plate_num}: {columns}")
+ return max(columns)
+
# Below are the custom submission types
class BacterialCulture(BasicSubmission):
diff --git a/src/submissions/backend/excel/__init__.py b/src/submissions/backend/excel/__init__.py
index e69a1f4..6fa938e 100644
--- a/src/submissions/backend/excel/__init__.py
+++ b/src/submissions/backend/excel/__init__.py
@@ -4,47 +4,3 @@ Contains pandas convenience functions for interacting with excel workbooks
from .reports import *
from .parser import *
-
-# from pandas import DataFrame
-# import re
-
-
-# def get_unique_values_in_df_column(df: DataFrame, column_name: str) -> list:
-# """
-# get all unique values in a dataframe column by name
-
-# Args:
-# df (DataFrame): input dataframe
-# column_name (str): name of column of interest
-
-# Returns:
-# list: sorted list of unique values
-# """
-# return sorted(df[column_name].unique())
-
-
-# def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
-# """
-# Removes semi-duplicates from dataframe after finding sequencing repeats.
-
-# Args:
-# settings (dict): settings passed from gui
-# df (DataFrame): initial dataframe
-
-# Returns:
-# DataFrame: dataframe with originals removed in favour of repeats.
-# """
-# sample_names = get_unique_values_in_df_column(df, column_name="name")
-# if 'rerun_regex' in ctx:
-# # logger.debug(f"Compiling regex from: {settings['rerun_regex']}")
-# rerun_regex = re.compile(fr"{ctx['rerun_regex']}")
-# for sample in sample_names:
-# # logger.debug(f'Running search on {sample}')
-# if rerun_regex.search(sample):
-# # logger.debug(f'Match on {sample}')
-# first_run = re.sub(rerun_regex, "", sample)
-# # logger.debug(f"First run: {first_run}")
-# df = df.drop(df[df.name == first_run].index)
-# return df
-# else:
-# return None
diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py
index a3ef871..a751154 100644
--- a/src/submissions/backend/excel/parser.py
+++ b/src/submissions/backend/excel/parser.py
@@ -8,14 +8,14 @@ import pandas as pd
from pathlib import Path
from backend.db.models import WWSample, BCSample
from backend.db import lookup_ww_sample_by_ww_sample_num
-from backend.pydant import PydSubmission
+from backend.pydant import PydSubmission, PydReagent
import logging
from collections import OrderedDict
import re
import numpy as np
from datetime import date, datetime
import uuid
-from tools import check_not_nan, RSLNamer, massage_common_reagents
+from tools import check_not_nan, RSLNamer, massage_common_reagents, convert_nans_to_nones
logger = logging.getLogger(f"submissions.{__name__}")
@@ -26,31 +26,29 @@ class SheetParser(object):
def __init__(self, ctx:dict, filepath:Path|None = None):
"""
Args:
+ ctx (dict): Settings passed down from gui
filepath (Path | None, optional): file path to excel sheet. Defaults to None.
- """
+ """
self.ctx = ctx
logger.debug(f"Parsing {filepath.__str__()}")
- # set attributes based on kwargs from gui ctx
- # for kwarg in kwargs:
- # setattr(self, f"_{kwarg}", kwargs[kwarg])
- # self.__dict__.update(kwargs)
if filepath == None:
logger.error(f"No filepath given.")
self.xl = None
else:
self.filepath = filepath
+ # Open excel file
try:
self.xl = pd.ExcelFile(filepath.__str__())
except ValueError as e:
logger.error(f"Incorrect value: {e}")
self.xl = None
- # TODO: replace OrderedDict with pydantic BaseModel
self.sub = OrderedDict()
# make decision about type of sample we have
self.sub['submission_type'] = self.type_decider()
# select proper parser based on sample type
parse_sub = getattr(self, f"parse_{self.sub['submission_type'].lower()}")
parse_sub()
+ # self.calculate_column_count()
def type_decider(self) -> str:
"""
@@ -65,7 +63,7 @@ class SheetParser(object):
return categories[0].replace(" ", "_")
else:
# This code is going to be depreciated once there is full adoption of the client sheets
- # with updated metadata
+ # with updated metadata... but how will it work for Artic?
try:
for type in self.ctx['submission_types']:
# This gets the *first* submission type that matches the sheet names in the workbook
@@ -76,7 +74,6 @@ class SheetParser(object):
logger.warning(f"We were unable to parse the submission type due to: {e}")
return "Unknown"
-
def parse_unknown(self) -> None:
"""
Dummy function to handle unknown excel structures
@@ -84,7 +81,6 @@ class SheetParser(object):
logger.error(f"Unknown excel workbook structure. Cannot parse.")
self.sub = None
-
def parse_generic(self, sheet_name:str) -> pd.DataFrame:
"""
Pulls information common to all wasterwater/bacterial culture types and passes on dataframe
@@ -98,14 +94,17 @@ class SheetParser(object):
# self.xl is a pd.ExcelFile so we need to parse it into a df
submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object)
self.sub['submitter_plate_num'] = submission_info.iloc[0][1]
- self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name
+ if check_not_nan(submission_info.iloc[10][1]):
+ self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name
+ else:
+ # self.sub['rsl_plate_num'] = RSLNamer(self.filepath).parsed_name
+ self.sub['rsl_plate_num'] = None
self.sub['submitted_date'] = submission_info.iloc[1][1]
self.sub['submitting_lab'] = submission_info.iloc[0][3]
self.sub['sample_count'] = submission_info.iloc[2][3]
self.sub['extraction_kit'] = submission_info.iloc[3][3]
return submission_info
-
def parse_bacterial_culture(self) -> None:
"""
pulls info specific to bacterial culture sample type
@@ -121,22 +120,27 @@ class SheetParser(object):
for ii, row in df.iterrows():
# skip positive control
logger.debug(f"Running reagent parse for {row[1]} with type {type(row[1])} and value: {row[2]} with type {type(row[2])}")
- if not isinstance(row[2], float) and check_not_nan(row[1]):
+ # if the lot number isn't a float and the reagent type isn't blank
+ # if not isinstance(row[2], float) and check_not_nan(row[1]):
+ if check_not_nan(row[1]):
# must be prefixed with 'lot_' to be recognized by gui
+ # This is no longer true since reagents are loaded into their own key in dictionary
try:
reagent_type = row[1].replace(' ', '_').lower().strip()
except AttributeError:
pass
+ # If there is a double slash in the type field, such as ethanol/iso
+ # Use the cell to the left for reagent type.
if reagent_type == "//":
if check_not_nan(row[2]):
reagent_type = row[0].replace(' ', '_').lower().strip()
else:
continue
try:
- output_var = row[2].upper()
+ output_var = convert_nans_to_nones(str(row[2]).upper())
except AttributeError:
logger.debug(f"Couldn't upperize {row[2]}, must be a number")
- output_var = row[2]
+ output_var = convert_nans_to_nones(str(row[2]))
logger.debug(f"Output variable is {output_var}")
logger.debug(f"Expiry date for imported reagent: {row[3]}")
if check_not_nan(row[3]):
@@ -149,22 +153,17 @@ class SheetParser(object):
expiry = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + row[3] - 2)
else:
logger.debug(f"Date: {row[3]}")
- expiry = date.today()
+ # expiry = date.today()
+ expiry = date(year=1970, month=1, day=1)
# self.sub[f"lot_{reagent_type}"] = {'lot':output_var, 'exp':expiry}
- self.sub['reagents'].append(dict(type=reagent_type, lot=output_var, exp=expiry))
+ # self.sub['reagents'].append(dict(type=reagent_type, lot=output_var, exp=expiry))
+ self.sub['reagents'].append(PydReagent(type=reagent_type, lot=output_var, exp=expiry))
submission_info = self.parse_generic("Sample List")
# iloc is [row][column] and the first row is set as header row so -2
- tech = str(submission_info.iloc[11][1])
- # moved to pydantic model
- # if tech == "nan":
- # tech = "Unknown"
- # elif len(tech.split(",")) > 1:
- # tech_reg = re.compile(r"[A-Z]{2}")
- # tech = ", ".join(tech_reg.findall(tech))
- self.sub['technician'] = tech
+ self.sub['technician'] = str(submission_info.iloc[11][1])
# reagents
# must be prefixed with 'lot_' to be recognized by gui
- # TODO: find a more adaptable way to read reagents.
+ # This is no longer true wince the creation of self.sub['reagents']
self.sub['reagents'] = []
reagent_range = submission_info.iloc[1:14, 4:8]
logger.debug(reagent_range)
@@ -175,7 +174,6 @@ class SheetParser(object):
logger.debug(f"Parser result: {self.sub}")
self.sample_result, self.sub['samples'] = sample_parse()
-
def parse_wastewater(self) -> None:
"""
pulls info specific to wastewater sample type
@@ -196,17 +194,18 @@ class SheetParser(object):
"""
# iterate through sub-df rows
for ii, row in df.iterrows():
- if not isinstance(row[5], float) and check_not_nan(row[5]):
+ logger.debug(f"Parsing this row for reagents: {row}")
+ if check_not_nan(row[5]):
# must be prefixed with 'lot_' to be recognized by gui
# regex below will remove 80% from 80% ethanol in the Wastewater kit.
output_key = re.sub(r"^\d{1,3}%\s?", "", row[0].lower().strip().replace(' ', '_'))
output_key = output_key.strip("_")
# output_var is the lot number
try:
- output_var = row[5].upper()
+ output_var = convert_nans_to_nones(str(row[5].upper()))
except AttributeError:
logger.debug(f"Couldn't upperize {row[5]}, must be a number")
- output_var = row[5]
+ output_var = convert_nans_to_nones(str(row[5]))
if check_not_nan(row[7]):
try:
expiry = row[7].date()
@@ -214,8 +213,12 @@ class SheetParser(object):
expiry = date.today()
else:
expiry = date.today()
+ logger.debug(f"Expiry date for {output_key}: {expiry} of type {type(expiry)}")
# self.sub[f"lot_{output_key}"] = {'lot':output_var, 'exp':expiry}
- self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry))
+ # self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry))
+ reagent = PydReagent(type=output_key, lot=output_var, exp=expiry)
+ logger.debug(f"Here is the created reagent: {reagent}")
+ self.sub['reagents'].append(reagent)
# parse submission sheet
submission_info = self.parse_generic("WW Submissions (ENTER HERE)")
# parse enrichment sheet
@@ -230,7 +233,7 @@ class SheetParser(object):
qprc_info = self.xl.parse("qPCR Worksheet", dtype=object)
# set qpcr reagent range
pcr_reagent_range = qprc_info.iloc[0:5, 9:20]
- # compile technician info
+ # compile technician info from all sheets
self.sub['technician'] = f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}"
self.sub['reagents'] = []
parse_reagents(enr_reagent_range)
@@ -242,7 +245,6 @@ class SheetParser(object):
self.sample_result, self.sub['samples'] = sample_parse()
self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object)
-
def parse_wastewater_artic(self) -> None:
"""
pulls info specific to wastewater_arctic submission type
@@ -258,10 +260,10 @@ class SheetParser(object):
output_key = output_key.strip("_")
output_key = massage_common_reagents(output_key)
try:
- output_var = row[1].upper()
+ output_var = convert_nans_to_nones(str(row[1].upper()))
except AttributeError:
logger.debug(f"Couldn't upperize {row[1]}, must be a number")
- output_var = row[1]
+ output_var = convert_nans_to_nones(str(row[1]))
logger.debug(f"Output variable is {output_var}")
logger.debug(f"Expiry date for imported reagent: {row[2]}")
if check_not_nan(row[2]):
@@ -277,7 +279,8 @@ class SheetParser(object):
else:
logger.debug(f"Date: {row[2]}")
expiry = date.today()
- self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry))
+ # self.sub['reagents'].append(dict(type=output_key, lot=output_var, exp=expiry))
+ self.sub['reagents'].append(PydReagent(type=output_key, lot=output_var, exp=expiry))
else:
continue
def massage_samples(df:pd.DataFrame) -> pd.DataFrame:
@@ -317,20 +320,19 @@ class SheetParser(object):
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples")
self.sample_result, self.sub['samples'] = sample_parse()
-
def to_pydantic(self) -> PydSubmission:
"""
Generates a pydantic model of scraped data for validation
Returns:
PydSubmission: output pydantic model
- """
- psm = PydSubmission(filepath=self.filepath, **self.sub)
+ """
+ logger.debug(f"Submission dictionary coming into 'to_pydantic':\n{pprint.pformat(self.sub)}")
+ psm = PydSubmission(ctx=self.ctx, filepath=self.filepath, **self.sub)
delattr(psm, "filepath")
return psm
-
-
+
class SampleParser(object):
"""
object to pull data for samples in excel sheet and construct individual sample objects
@@ -385,7 +387,7 @@ class SampleParser(object):
list[WWSample]: list of sample objects
"""
def search_df_for_sample(sample_rsl:str):
- # logger.debug(f"Attempting to find sample {sample_rsl} in \n {self.elution_map}")
+ logger.debug(f"Attempting to find sample {sample_rsl} in \n {self.elution_map}")
well = self.elution_map.where(self.elution_map==sample_rsl)
# logger.debug(f"Well: {well}")
well = well.dropna(how='all').dropna(axis=1, how="all")
@@ -394,9 +396,9 @@ class SampleParser(object):
logger.debug(f"well {sample_rsl} post processing: {well.size}: {type(well)}, {well.index[0]}, {well.columns[0]}")
self.elution_map.at[well.index[0], well.columns[0]] = np.nan
try:
- col = str(int(well.columns[0]))
+ col = str(int(well.columns[0])).zfill(2)
except ValueError:
- col = str(well.columns[0])
+ col = str(well.columns[0]).zfill(2)
except TypeError as e:
logger.error(f"Problem parsing out column number for {well}:\n {e}")
return f"{well.index[0]}{col}"
@@ -424,10 +426,12 @@ class SampleParser(object):
# new.testing_type = sample['Unnamed: 6']
# new.site_status = sample['Unnamed: 7']
new.notes = str(sample['Unnamed: 6']) # previously Unnamed: 8
- new.well_number = sample['Unnamed: 1']
+ new.well_24 = sample['Unnamed: 1']
elu_well = search_df_for_sample(new.rsl_number)
if elu_well != None:
- new.elution_well = elu_well
+ row = elu_well[0]
+ col = elu_well[1:].zfill(2)
+ new.well_number = f"{row}{col}"
else:
# try:
return_val += f"{new.rsl_number}\n"
@@ -455,12 +459,14 @@ class SampleParser(object):
missed_samples.append(sample['sample_name'])
continue
logger.debug(f"Got instance: {instance.ww_sample_full_id}")
+ if sample['well'] != None:
+ row = sample['well'][0]
+ col = sample['well'][1:].zfill(2)
+ sample['well'] = f"{row}{col}"
instance.artic_well_number = sample['well']
new_list.append(instance)
missed_str = "\n\t".join(missed_samples)
return f"Could not find matches for the following samples:\n\t {missed_str}", new_list
-
-
class PCRParser(object):
@@ -590,5 +596,5 @@ class PCRParser(object):
self.samples.append(sample_obj)
-
+
diff --git a/src/submissions/backend/excel/reports.py b/src/submissions/backend/excel/reports.py
index efa1c0d..af2ca4b 100644
--- a/src/submissions/backend/excel/reports.py
+++ b/src/submissions/backend/excel/reports.py
@@ -3,24 +3,14 @@ Contains functions for generating summary reports
'''
from pandas import DataFrame
import logging
-from jinja2 import Environment, FileSystemLoader
from datetime import date, timedelta
-import sys
-from pathlib import Path
import re
-from tools import check_if_app
from typing import Tuple
+from configure import jinja_template_loading
logger = logging.getLogger(f"submissions.{__name__}")
-# set path of templates depending on pyinstaller/raw python
-# if getattr(sys, 'frozen', False):
-if check_if_app():
- loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
-else:
- loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
-loader = FileSystemLoader(loader_path)
-env = Environment(loader=loader)
+env = jinja_template_loading()
logger = logging.getLogger(f"submissions.{__name__}")
@@ -115,7 +105,6 @@ def convert_data_list_to_df(ctx:dict, input:list[dict], subtype:str|None=None) -
# logger.debug(df)
# move date of sample submitted on same date as previous ahead one.
df = displace_date(df)
- # df.sort_values('submitted_date').to_excel("controls.xlsx", engine="openpyxl")
# ad hoc method to make data labels more accurate.
df = df_column_renamer(df=df)
return df
@@ -156,46 +145,33 @@ def displace_date(df:DataFrame) -> DataFrame:
dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in sorted(df['name'].unique())]
previous_dates = []
for _, item in enumerate(dict_list):
- # try:
- # # check = item['date'] == dict_list[ii-1]['date']
- # check = item['date'] in previous_dates
- # except IndexError:
- # check = False
- # if check:
- # # occurences = previous_dates.count(item['date'])
- # logger.debug(f"We found one! Increment date!\n\t{item['date']} to {item['date'] + timedelta(days=1)}")
- # # get df locations where name == item name
- # mask = df['name'] == item['name']
- # # increment date in dataframe
- # df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
- # outdate = item['date'] + timedelta(days=1)
- # # previous_dates.append(item['date'] + timedelta(days=1))
- # else:
- # outdate = item['date']
- # previous_dates.append(outdate)
- # logger.debug(f"\n\tCurrent date: {outdate}\n\tPrevious dates:{previous_dates}")
- # logger.debug(type(item))
df, previous_dates = check_date(df=df, item=item, previous_dates=previous_dates)
return df
def check_date(df:DataFrame, item:dict, previous_dates:list) -> Tuple[DataFrame, list]:
-
+ """
+ Checks if an items date is already present in df and adjusts df accordingly
+
+ Args:
+ df (DataFrame): input dataframe
+ item (dict): control for checking
+ previous_dates (list): list of dates found in previous controls
+
+ Returns:
+ Tuple[DataFrame, list]: Output dataframe and appended list of previous dates
+ """
try:
- # check = item['date'] == dict_list[ii-1]['date']
check = item['date'] in previous_dates
except IndexError:
check = False
previous_dates.append(item['date'])
if check:
- # occurences = previous_dates.count(item['date'])
logger.debug(f"We found one! Increment date!\n\t{item['date']} to {item['date'] + timedelta(days=1)}")
# get df locations where name == item name
mask = df['name'] == item['name']
# increment date in dataframe
df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
-
item['date'] += timedelta(days=1)
- # previous_dates.append(item['date'] + timedelta(days=1))
passed = False
else:
passed = True
@@ -249,8 +225,7 @@ def drop_reruns_from_df(ctx:dict, df: DataFrame) -> DataFrame:
# logger.debug(f"First run: {first_run}")
df = df.drop(df[df.name == first_run].index)
return df
- # else:
- # return df
+
def make_hitpicks(input:list) -> DataFrame:
diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py
index 62d6270..4adfa57 100644
--- a/src/submissions/backend/pydant/__init__.py
+++ b/src/submissions/backend/pydant/__init__.py
@@ -1,65 +1,153 @@
import uuid
-from pydantic import BaseModel, validator
-from datetime import date
+from pydantic import BaseModel, field_validator, model_validator, Extra
+from datetime import date, datetime
from typing import List, Any
from tools import RSLNamer
from pathlib import Path
import re
import logging
+from tools import check_not_nan, convert_nans_to_nones
+import numpy as np
+
+
logger = logging.getLogger(f"submissions.{__name__}")
-class PydSubmission(BaseModel):
+class PydReagent(BaseModel):
+ type: str|None
+ lot: str|None
+ exp: date|None
+
+ @field_validator("type", mode='before')
+ @classmethod
+ def remove_undesired_types(cls, value):
+ match value:
+ case "atcc":
+ return None
+ case _:
+ return value
+
+ @field_validator("lot", mode='before')
+ @classmethod
+ def enforce_lot_string(cls, value):
+ if value != None:
+ return convert_nans_to_nones(str(value))
+ return value
+
+ @field_validator("exp", mode="before")
+ @classmethod
+ def enforce_date(cls, value):
+ if isinstance(value, float) or value == np.nan:
+ raise ValueError(f"Date cannot be a float: {value}")
+ else:
+ return value
+
+
+
+class PydSubmission(BaseModel, extra=Extra.allow):
+ ctx: dict
filepath: Path
submission_type: str
submitter_plate_num: str|None
- rsl_plate_num: str
+ rsl_plate_num: str|dict|None
submitted_date: date
- submitting_lab: str
+ submitting_lab: str|None
sample_count: int
- extraction_kit: str
- technician: str
- reagents: List[dict]
+ extraction_kit: str|dict|None
+ technician: str|None
+ reagents: List[PydReagent] = []
samples: List[Any]
-
- @validator("submitted_date", pre=True)
+ # missing_fields: List[str] = []
+
+ @field_validator("submitted_date", mode="before")
@classmethod
def strip_datetime_string(cls, value):
+ if isinstance(value, datetime):
+ return value
+ if isinstance(value, date):
+ return value
return re.sub(r"_\d$", "", value)
- @validator("submitter_plate_num")
+ @field_validator("submitter_plate_num")
@classmethod
def enforce_with_uuid(cls, value):
if value == None or value == "" or value == "None":
return uuid.uuid4().hex.upper()
-
- @validator("rsl_plate_num", pre=True)
- @classmethod
- def rsl_from_file(cls, value, values):
- if value == None:
- logger.debug(f"Pydant values:\n{values}")
- return RSLNamer(values['filepath'].__str__()).parsed_name
else:
return value
- @validator("technician")
+ @field_validator("submitting_lab", mode="before")
+ @classmethod
+ def transform_nan(cls, value):
+ return convert_nans_to_nones(value)
+
+ @field_validator("rsl_plate_num", mode='before')
+ @classmethod
+ def rsl_from_file(cls, value, values):
+ logger.debug(f"RSL-plate initial value: {value}")
+ if check_not_nan(value):
+ if isinstance(value, str):
+ return dict(value=value, parsed=True)
+ else:
+ return value
+ else:
+ logger.debug(f"Pydant values:{type(values)}\n{values}")
+ return dict(value=RSLNamer(values.data['filepath'].__str__()).parsed_name, parsed=False)
+
+ @field_validator("technician")
@classmethod
def enforce_tech(cls, value):
if value == "nan" or value == "None":
value = "Unknown"
- # elif len(value.split(",")) > 1:
- # tech_reg = re.compile(r"\b[A-Z]{2}\b")
- # value = ", ".join(tech_reg.findall(value))
return value
- @validator("reagents")
+ @field_validator("reagents")
@classmethod
def remove_atcc(cls, value):
return_val = []
for reagent in value:
- match reagent['type']:
- case 'atcc':
- continue
- case _:
- return_val.append(reagent)
+ logger.debug(f"Pydantic reagent: {reagent}")
+ # match reagent.type.lower():
+ # case 'atcc':
+ # continue
+ # case _:
+ # return_val.append(reagent)
+ if reagent.type == None:
+ continue
+ else:
+ return_val.append(reagent)
return return_val
+
+ @field_validator("sample_count", mode='before')
+ @classmethod
+ def enforce_sample_count(cls, value):
+ if check_not_nan(value):
+ return int(value)
+ else:
+ # raise ValueError(f"{value} could not be used to create an integer.")
+ return convert_nans_to_nones(value)
+
+ @field_validator("extraction_kit", mode='before')
+ @classmethod
+ def get_kit_if_none(cls, value, values):
+ from frontend.custom_widgets.pop_ups import KitSelector
+ if check_not_nan(value):
+ return dict(value=value, parsed=True)
+ else:
+ # logger.debug(values.data)
+ dlg = KitSelector(ctx=values.data['ctx'], title="Kit Needed", message="At minimum a kit is needed. Please select one.")
+ if dlg.exec():
+ return dict(value=dlg.getValues(), parsed=False)
+ else:
+ raise ValueError("Extraction kit needed.")
+
+ # @model_validator(mode="after")
+ # def ensure_kit(cls, values):
+ # logger.debug(f"Model values: {values}")
+ # missing_fields = [k for k,v in values if v == None]
+ # if len(missing_fields) > 0:
+ # logger.debug(f"Missing fields: {missing_fields}")
+ # values['missing_fields'] = missing_fields
+ # return values
+
+
diff --git a/src/submissions/configure/__init__.py b/src/submissions/configure/__init__.py
index 366a9eb..6ca5051 100644
--- a/src/submissions/configure/__init__.py
+++ b/src/submissions/configure/__init__.py
@@ -1,6 +1,7 @@
'''
Contains functions for setting up program from config.yml and database.
'''
+from jinja2 import Environment, FileSystemLoader
import yaml
import sys, os, stat, platform, getpass
import logging
@@ -238,3 +239,22 @@ def copy_settings(settings_path:Path, settings:dict) -> dict:
with open(settings_path, 'w') as f:
yaml.dump(settings, f)
return settings
+
+
+def jinja_template_loading():
+ """
+ Returns jinja2 template environment.
+
+ Returns:
+ _type_: _description_
+ """
+ # determine if pyinstaller launcher is being used
+ if getattr(sys, 'frozen', False):
+ loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
+ else:
+ loader_path = Path(__file__).parents[1].joinpath('templates').absolute().__str__()
+
+ # jinja template loading
+ loader = FileSystemLoader(loader_path)
+ env = Environment(loader=loader)
+ return env
\ No newline at end of file
diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py
index dcfbe45..d7ba45f 100644
--- a/src/submissions/frontend/__init__.py
+++ b/src/submissions/frontend/__init__.py
@@ -10,14 +10,12 @@ from PyQt6.QtWidgets import (
)
from PyQt6.QtGui import QAction
from PyQt6.QtWebEngineWidgets import QWebEngineView
-# import pandas as pd
from pathlib import Path
from backend.db import (
construct_reagent, get_all_Control_Types_names, get_all_available_modes, store_reagent
)
from .main_window_functions import *
from tools import check_if_app
-# from backend.excel.reports import
from frontend.custom_widgets import SubmissionsSheet, AlertPop, AddReagentForm, KitAdder, ControlsDatePicker
import logging
from datetime import date
@@ -31,7 +29,7 @@ class App(QMainWindow):
def __init__(self, ctx: dict = {}):
super().__init__()
self.ctx = ctx
- # indicate version and database connected in title bar
+ # indicate version and connected database in title bar
try:
self.title = f"Submissions App (v{ctx['package'].__version__}) - {ctx['database']}"
except (AttributeError, KeyError):
@@ -123,11 +121,17 @@ class App(QMainWindow):
self.docsAction.triggered.connect(self.openDocs)
def showAbout(self):
+ """
+ Show the 'about' message
+ """
output = f"Version: {self.ctx['package'].__version__}\n\nAuthor: {self.ctx['package'].__author__['name']} - {self.ctx['package'].__author__['email']}\n\nCopyright: {self.ctx['package'].__copyright__}"
about = AlertPop(message=output, status="information")
about.exec()
def openDocs(self):
+ """
+ Open the documentation html pages
+ """
if check_if_app():
url = Path(sys._MEIPASS).joinpath("files", "docs", "index.html")
else:
@@ -135,8 +139,13 @@ class App(QMainWindow):
logger.debug(f"Attempting to open {url}")
webbrowser.get('windows-default').open(f"file://{url.__str__()}")
- # All main window functions return a result which is reported here, unless it is None
def result_reporter(self, result:dict|None=None):
+ """
+ Report any anomolous results - if any - to the user
+
+ Args:
+ result (dict | None, optional): The result from a function. Defaults to None.
+ """
if result != None:
msg = AlertPop(message=result['message'], status=result['status'])
msg.exec()
@@ -149,14 +158,12 @@ class App(QMainWindow):
logger.debug(f"Import result: {result}")
self.result_reporter(result)
-
def kit_reload(self):
"""
Removes all reagents from form before running kit integrity completion.
"""
self, result = kit_reload_function(self)
self.result_reporter(result)
-
def kit_integrity_completion(self):
"""
@@ -166,7 +173,6 @@ class App(QMainWindow):
"""
self, result = kit_integrity_completion_function(self)
self.result_reporter(result)
-
def submit_new_sample(self):
"""
@@ -174,82 +180,6 @@ class App(QMainWindow):
"""
self, result = submit_new_sample_function(self)
self.result_reporter(result)
- # get info from form
- # info = extract_form_info(self.table_widget.tab1)
- # reagents = {k:v for k,v in info.items() if k.startswith("lot_")}
- # info = {k:v for k,v in info.items() if not k.startswith("lot_")}
- # logger.debug(f"Info: {info}")
- # logger.debug(f"Reagents: {reagents}")
- # parsed_reagents = []
- # # compare reagents in form to reagent database
- # for reagent in reagents:
- # wanted_reagent = lookup_reagent(ctx=self.ctx, reagent_lot=reagents[reagent])
- # logger.debug(f"Looked up reagent: {wanted_reagent}")
- # # if reagent not found offer to add to database
- # if wanted_reagent == None:
- # r_lot = reagents[reagent]
- # dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?")
- # if dlg.exec():
- # logger.debug(f"checking reagent: {reagent} in self.reagents. Result: {self.reagents[reagent]}")
- # expiry_date = self.reagents[reagent]['exp']
- # wanted_reagent = self.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date)
- # else:
- # # In this case we will have an empty reagent and the submission will fail kit integrity check
- # logger.debug("Will not add reagent.")
- # if wanted_reagent != None:
- # parsed_reagents.append(wanted_reagent)
- # # move samples into preliminary submission dict
- # info['samples'] = self.samples
- # info['uploaded_by'] = getuser()
- # # construct submission object
- # logger.debug(f"Here is the info_dict: {pprint.pformat(info)}")
- # base_submission, result = construct_submission_info(ctx=self.ctx, info_dict=info)
- # # check output message for issues
- # match result['code']:
- # # code 1: ask for overwrite
- # case 1:
- # dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result['message'])
- # if dlg.exec():
- # base_submission.reagents = []
- # else:
- # return
- # # code 2: No RSL plate number given
- # case 2:
- # dlg = AlertPop(message=result['message'], status='critical')
- # dlg.exec()
- # return
- # case _:
- # pass
- # # add reagents to submission object
- # for reagent in parsed_reagents:
- # base_submission.reagents.append(reagent)
- # logger.debug("Checking kit integrity...")
- # kit_integrity = check_kit_integrity(base_submission)
- # if kit_integrity != None:
- # msg = AlertPop(message=kit_integrity['message'], status="critical")
- # msg.exec()
- # return
- # logger.debug(f"Sending submission: {base_submission.rsl_plate_num} to database.")
- # result = store_submission(ctx=self.ctx, base_submission=base_submission)
- # # check result of storing for issues
- # if result != None:
- # msg = AlertPop(result['message'])
- # msg.exec()
- # # update summary sheet
- # self.table_widget.sub_wid.setData()
- # # reset form
- # for item in self.table_widget.formlayout.parentWidget().findChildren(QWidget):
- # item.setParent(None)
- # if hasattr(self, 'csv'):
- # dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?")
- # if dlg.exec():
- # home_dir = Path(self.ctx["directory_path"]).joinpath(f"{base_submission.rsl_plate_num}.csv").resolve().__str__()
- # fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".csv")[0])
- # try:
- # self.csv.to_csv(fname.__str__(), index=False)
- # except PermissionError:
- # logger.debug(f"Could not get permissions to {fname}. Possibly the request was cancelled.")
-
def add_reagent(self, reagent_lot:str|None=None, reagent_type:str|None=None, expiry:date|None=None):
"""
@@ -277,7 +207,6 @@ class App(QMainWindow):
# send reagent to db
store_reagent(ctx=self.ctx, reagent=reagent)
return reagent
-
def generateReport(self):
"""
@@ -285,42 +214,6 @@ class App(QMainWindow):
"""
self, result = generate_report_function(self)
self.result_reporter(result)
- # Custom two date picker for start & end dates
- # dlg = ReportDatePicker()
- # if dlg.exec():
- # info = extract_form_info(dlg)
- # logger.debug(f"Report info: {info}")
- # # find submissions based on date range
- # subs = lookup_submissions_by_date_range(ctx=self.ctx, start_date=info['start_date'], end_date=info['end_date'])
- # # convert each object to dict
- # records = [item.report_dict() for item in subs]
- # # make dataframe from record dictionaries
- # df = make_report_xlsx(records=records)
- # html = make_report_html(df=df, start_date=info['start_date'], end_date=info['end_date'])
- # # setup filedialog to handle save location of report
- # home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submissions_Report_{info['start_date']}-{info['end_date']}.pdf").resolve().__str__()
- # fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0])
- # # logger.debug(f"report output name: {fname}")
- # with open(fname, "w+b") as f:
- # pisa.CreatePDF(html, dest=f)
- # writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl')
- # df.to_excel(writer, sheet_name="Report")
- # worksheet = writer.sheets['Report']
- # for idx, col in enumerate(df): # loop through all columns
- # series = df[col]
- # max_len = max((
- # series.astype(str).map(len).max(), # len of largest item
- # len(str(series.name)) # len of column name/header
- # )) + 20 # adding a little extra space
- # try:
- # worksheet.column_dimensions[get_column_letter(idx)].width = max_len
- # except ValueError:
- # pass
- # for cell in worksheet['D']:
- # if cell.row > 1:
- # cell.style = 'Currency'
- # writer.close()
-
def add_kit(self):
"""
@@ -328,29 +221,6 @@ class App(QMainWindow):
"""
self, result = add_kit_function(self)
self.result_reporter(result)
- # setup file dialog to find yaml flie
- # home_dir = str(Path(self.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0])
- # assert fname.exists()
- # # read yaml file
- # try:
- # with open(fname.__str__(), "r") as stream:
- # try:
- # exp = yaml.load(stream, Loader=yaml.Loader)
- # except yaml.YAMLError as exc:
- # logger.error(f'Error reading yaml file {fname}: {exc}')
- # return {}
- # except PermissionError:
- # return
- # # send to kit creator function
- # result = create_kit_from_yaml(ctx=self.ctx, exp=exp)
- # match result['code']:
- # case 0:
- # msg = AlertPop(message=result['message'], status='info')
- # case 1:
- # msg = AlertPop(message=result['message'], status='critical')
- # msg.exec()
-
def add_org(self):
"""
@@ -358,29 +228,6 @@ class App(QMainWindow):
"""
self, result = add_org_function(self)
self.result_reporter(result)
- # # setup file dialog to find yaml flie
- # home_dir = str(Path(self.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "yml(*.yml)")[0])
- # assert fname.exists()
- # # read yaml file
- # try:
- # with open(fname.__str__(), "r") as stream:
- # try:
- # org = yaml.load(stream, Loader=yaml.Loader)
- # except yaml.YAMLError as exc:
- # logger.error(f'Error reading yaml file {fname}: {exc}')
- # return {}
- # except PermissionError:
- # return
- # # send to kit creator function
- # result = create_org_from_yaml(ctx=self.ctx, org=org)
- # match result['code']:
- # case 0:
- # msg = AlertPop(message=result['message'], status='information')
- # case 1:
- # msg = AlertPop(message=result['message'], status='critical')
- # msg.exec()
-
def _controls_getter(self):
"""
@@ -388,39 +235,6 @@ class App(QMainWindow):
"""
self, result = controls_getter_function(self)
self.result_reporter(result)
- # # subtype defaults to disabled
- # try:
- # self.table_widget.sub_typer.disconnect()
- # except TypeError:
- # pass
- # # correct start date being more recent than end date and rerun
- # if self.table_widget.datepicker.start_date.date() > self.table_widget.datepicker.end_date.date():
- # logger.warning("Start date after end date is not allowed!")
- # threemonthsago = self.table_widget.datepicker.end_date.date().addDays(-60)
- # # block signal that will rerun controls getter and set start date
- # with QSignalBlocker(self.table_widget.datepicker.start_date) as blocker:
- # self.table_widget.datepicker.start_date.setDate(threemonthsago)
- # self._controls_getter()
- # return
- # # convert to python useable date object
- # self.start_date = self.table_widget.datepicker.start_date.date().toPyDate()
- # self.end_date = self.table_widget.datepicker.end_date.date().toPyDate()
- # self.con_type = self.table_widget.control_typer.currentText()
- # self.mode = self.table_widget.mode_typer.currentText()
- # self.table_widget.sub_typer.clear()
- # # lookup subtypes
- # sub_types = get_control_subtypes(ctx=self.ctx, type=self.con_type, mode=self.mode)
- # if sub_types != []:
- # # block signal that will rerun controls getter and update sub_typer
- # with QSignalBlocker(self.table_widget.sub_typer) as blocker:
- # self.table_widget.sub_typer.addItems(sub_types)
- # self.table_widget.sub_typer.setEnabled(True)
- # self.table_widget.sub_typer.currentTextChanged.connect(self._chart_maker)
- # else:
- # self.table_widget.sub_typer.clear()
- # self.table_widget.sub_typer.setEnabled(False)
- # self._chart_maker()
-
def _chart_maker(self):
"""
@@ -428,42 +242,6 @@ class App(QMainWindow):
"""
self, result = chart_maker_function(self)
self.result_reporter(result)
- # logger.debug(f"Control getter context: \n\tControl type: {self.con_type}\n\tMode: {self.mode}\n\tStart Date: {self.start_date}\n\tEnd Date: {self.end_date}")
- # if self.table_widget.sub_typer.currentText() == "":
- # self.subtype = None
- # else:
- # self.subtype = self.table_widget.sub_typer.currentText()
- # logger.debug(f"Subtype: {self.subtype}")
- # # query all controls using the type/start and end dates from the gui
- # controls = get_all_controls_by_type(ctx=self.ctx, con_type=self.con_type, start_date=self.start_date, end_date=self.end_date)
- # # if no data found from query set fig to none for reporting in webview
- # if controls == None:
- # fig = None
- # else:
- # # change each control to list of dicts
- # data = [control.convert_by_mode(mode=self.mode) for control in controls]
- # # flatten data to one dimensional list
- # data = [item for sublist in data for item in sublist]
- # # send to dataframe creator
- # df = convert_data_list_to_df(ctx=self.ctx, input=data, subtype=self.subtype)
- # if self.subtype == None:
- # title = self.mode
- # else:
- # title = f"{self.mode} - {self.subtype}"
- # # send dataframe to chart maker
- # fig = create_charts(ctx=self.ctx, df=df, ytitle=title)
- # logger.debug(f"Updating figure...")
- # # construct html for webview
- # html = '
'
- # if fig != None:
- # html += plotly.offline.plot(fig, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image')
- # else:
- # html += "No data was retrieved for the given parameters.
"
- # html += ''
- # self.table_widget.webengineview.setHtml(html)
- # self.table_widget.webengineview.update()
- # logger.debug("Figure updated... I hope.")
-
def linkControls(self):
"""
@@ -471,52 +249,6 @@ class App(QMainWindow):
"""
self, result = link_controls_function(self)
self.result_reporter(result)
- # all_bcs = lookup_all_submissions_by_type(self.ctx, "Bacterial Culture")
- # logger.debug(all_bcs)
- # all_controls = get_all_controls(self.ctx)
- # ac_list = [control.name for control in all_controls]
- # count = 0
- # for bcs in all_bcs:
- # logger.debug(f"Running for {bcs.rsl_plate_num}")
- # logger.debug(f"Here is the current control: {[control.name for control in bcs.controls]}")
- # samples = [sample.sample_id for sample in bcs.samples]
- # logger.debug(bcs.controls)
- # for sample in samples:
- # # replace below is a stopgap method because some dingus decided to add spaces in some of the ATCC49... so it looks like "ATCC 49"...
- # if " " in sample:
- # logger.warning(f"There is not supposed to be a space in the sample name!!!")
- # sample = sample.replace(" ", "")
- # # if sample not in ac_list:
- # if not any([ac.startswith(sample) for ac in ac_list]):
- # continue
- # else:
- # for control in all_controls:
- # diff = difflib.SequenceMatcher(a=sample, b=control.name).ratio()
- # if control.name.startswith(sample):
- # logger.debug(f"Checking {sample} against {control.name}... {diff}")
- # logger.debug(f"Found match:\n\tSample: {sample}\n\tControl: {control.name}\n\tDifference: {diff}")
- # if control in bcs.controls:
- # logger.debug(f"{control.name} already in {bcs.rsl_plate_num}, skipping")
- # continue
- # else:
- # logger.debug(f"Adding {control.name} to {bcs.rsl_plate_num} as control")
- # bcs.controls.append(control)
- # # bcs.control_id.append(control.id)
- # control.submission = bcs
- # control.submission_id = bcs.id
- # self.ctx["database_session"].add(control)
- # count += 1
- # self.ctx["database_session"].add(bcs)
- # logger.debug(f"Here is the new control: {[control.name for control in bcs.controls]}")
- # result = f"We added {count} controls to bacterial cultures."
- # logger.debug(result)
- # self.ctx['database_session'].commit()
- # msg = QMessageBox()
- # msg.setText("Controls added")
- # msg.setInformativeText(result)
- # msg.setWindowTitle("Controls added")
- # msg.exec()
-
def linkExtractions(self):
"""
@@ -524,55 +256,6 @@ class App(QMainWindow):
"""
self, result = link_extractions_function(self)
self.result_reporter(result)
- # home_dir = str(Path(self.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0])
- # with open(fname.__str__(), 'r') as f:
- # runs = [col.strip().split(",") for col in f.readlines()]
- # count = 0
- # for run in runs:
- # obj = dict(
- # start_time=run[0].strip(),
- # rsl_plate_num=run[1].strip(),
- # sample_count=run[2].strip(),
- # status=run[3].strip(),
- # experiment_name=run[4].strip(),
- # end_time=run[5].strip()
- # )
- # for ii in range(6, len(run)):
- # obj[f"column{str(ii-5)}_vol"] = run[ii]
- # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num'])
- # try:
- # logger.debug(f"Found submission: {sub.rsl_plate_num}")
- # count += 1
- # except AttributeError:
- # continue
- # if sub.extraction_info != None:
- # existing = json.loads(sub.extraction_info)
- # else:
- # existing = None
- # try:
- # if json.dumps(obj) in sub.extraction_info:
- # logger.debug(f"Looks like we already have that info.")
- # continue
- # except TypeError:
- # pass
- # if existing != None:
- # try:
- # logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}")
- # existing.append(obj)
- # logger.debug(f"Setting: {existing}")
- # sub.extraction_info = json.dumps(existing)
- # except TypeError:
- # logger.error(f"Error updating!")
- # sub.extraction_info = json.dumps([obj])
- # logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.extraction_info}")
- # else:
- # sub.extraction_info = json.dumps([obj])
- # self.ctx['database_session'].add(sub)
- # self.ctx["database_session"].commit()
- # dlg = AlertPop(message=f"We added {count} logs to the database.", status='information')
- # dlg.exec()
-
def linkPCR(self):
"""
@@ -580,56 +263,6 @@ class App(QMainWindow):
"""
self, result = link_pcr_function(self)
self.result_reporter(result)
- # home_dir = str(Path(self.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "csv(*.csv)")[0])
- # with open(fname.__str__(), 'r') as f:
- # runs = [col.strip().split(",") for col in f.readlines()]
- # count = 0
- # for run in runs:
- # obj = dict(
- # start_time=run[0].strip(),
- # rsl_plate_num=run[1].strip(),
- # biomek_status=run[2].strip(),
- # quant_status=run[3].strip(),
- # experiment_name=run[4].strip(),
- # end_time=run[5].strip()
- # )
- # # for ii in range(6, len(run)):
- # # obj[f"column{str(ii-5)}_vol"] = run[ii]
- # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=obj['rsl_plate_num'])
- # try:
- # logger.debug(f"Found submission: {sub.rsl_plate_num}")
- # except AttributeError:
- # continue
- # if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
- # existing = json.loads(sub.pcr_info)
- # else:
- # existing = None
- # try:
- # if json.dumps(obj) in sub.pcr_info:
- # logger.debug(f"Looks like we already have that info.")
- # continue
- # else:
- # count += 1
- # except TypeError:
- # logger.error(f"No json to dump")
- # if existing != None:
- # try:
- # logger.debug(f"Updating {type(existing)}: {existing} with {type(obj)}: {obj}")
- # existing.append(obj)
- # logger.debug(f"Setting: {existing}")
- # sub.pcr_info = json.dumps(existing)
- # except TypeError:
- # logger.error(f"Error updating!")
- # sub.pcr_info = json.dumps([obj])
- # logger.debug(f"Final ext info for {sub.rsl_plate_num}: {sub.pcr_info}")
- # else:
- # sub.pcr_info = json.dumps([obj])
- # self.ctx['database_session'].add(sub)
- # self.ctx["database_session"].commit()
- # dlg = AlertPop(message=f"We added {count} logs to the database.", status='information')
- # dlg.exec()
-
def importPCRResults(self):
"""
@@ -637,54 +270,6 @@ class App(QMainWindow):
"""
self, result = import_pcr_results_function(self)
self.result_reporter(result)
- # home_dir = str(Path(self.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(self, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0])
- # parser = PCRParser(ctx=self.ctx, filepath=fname)
- # logger.debug(f"Attempting lookup for {parser.plate_num}")
- # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=parser.plate_num)
- # try:
- # logger.debug(f"Found submission: {sub.rsl_plate_num}")
- # except AttributeError:
- # logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.")
- # parser.plate_num = "-".join(parser.plate_num.split("-")[:-1])
- # sub = lookup_submission_by_rsl_num(ctx=self.ctx, rsl_num=parser.plate_num)
- # try:
- # logger.debug(f"Found submission: {sub.rsl_plate_num}")
- # except AttributeError:
- # logger.error(f"Rescue of {parser.plate_num} failed.")
- # return
- # # jout = json.dumps(parser.pcr)
- # count = 0
- # if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
- # existing = json.loads(sub.pcr_info)
- # else:
- # # jout = None
- # existing = None
- # if existing != None:
- # try:
- # logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}")
- # if json.dumps(parser.pcr) not in sub.pcr_info:
- # existing.append(parser.pcr)
- # logger.debug(f"Setting: {existing}")
- # sub.pcr_info = json.dumps(existing)
- # except TypeError:
- # logger.error(f"Error updating!")
- # sub.pcr_info = json.dumps([parser.pcr])
- # logger.debug(f"Final pcr info for {sub.rsl_plate_num}: {sub.pcr_info}")
- # else:
- # sub.pcr_info = json.dumps([parser.pcr])
- # self.ctx['database_session'].add(sub)
- # logger.debug(f"Existing {type(sub.pcr_info)}: {sub.pcr_info}")
- # logger.debug(f"Inserting {type(json.dumps(parser.pcr))}: {json.dumps(parser.pcr)}")
- # self.ctx["database_session"].commit()
- # logger.debug(f"Got {len(parser.samples)} to update!")
- # for sample in parser.samples:
- # logger.debug(f"Running update on: {sample['sample']}")
- # sample['plate_rsl'] = sub.rsl_plate_num
- # update_ww_sample(ctx=self.ctx, sample_obj=sample)
- # dlg = AlertPop(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information')
- # dlg.exec()
-
class AddSubForm(QWidget):
@@ -756,4 +341,3 @@ class AddSubForm(QWidget):
self.layout.addWidget(self.tabs)
self.setLayout(self.layout)
-
diff --git a/src/submissions/frontend/all_window_functions.py b/src/submissions/frontend/all_window_functions.py
index e73cd59..0408902 100644
--- a/src/submissions/frontend/all_window_functions.py
+++ b/src/submissions/frontend/all_window_functions.py
@@ -1,21 +1,40 @@
from pathlib import Path
import logging
from PyQt6.QtWidgets import (
- QMainWindow, QLabel, QToolBar,
- QTabWidget, QWidget, QVBoxLayout,
- QPushButton, QFileDialog,
- QLineEdit, QMessageBox, QComboBox, QDateEdit, QHBoxLayout,
- QSpinBox, QDoubleSpinBox, QScrollArea
+ QMainWindow, QWidget, QFileDialog,
+ QLineEdit, QComboBox, QDateEdit, QSpinBox,
+ QDoubleSpinBox
)
logger = logging.getLogger(f"submissions.{__name__}")
-def select_open_file(obj:QMainWindow, extension:str) -> Path:
+def select_open_file(obj:QMainWindow, file_extension:str) -> Path:
+ """
+ File dialog to select a file to read from
+
+ Args:
+ obj (QMainWindow): Original main app window to be parent
+ file_extension (str): file extension
+
+ Returns:
+ Path: Path of file to be opened
+ """
home_dir = str(Path(obj.ctx["directory_path"]))
- fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{extension}(*.{extension})")[0])
+ fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = f"{file_extension}(*.{file_extension})")[0])
return fname
def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path:
+ """
+ File dialog to select a file to write to
+
+ Args:
+ obj (QMainWindow): Original main app window to be parent
+ default_name (str): default base file name
+ extension (str): file extension
+
+ Returns:
+ Path: Path of file to be opened
+ """
home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__()
fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0])
return fname
@@ -41,7 +60,7 @@ def extract_form_info(object) -> dict:
except AttributeError:
all_children = object.layout().parentWidget().findChildren(QWidget)
for item in all_children:
- logger.debug(f"Looking at: {item.objectName()}")
+ logger.debug(f"Looking at: {item.objectName()}: {type(item)}")
match item:
case QLineEdit():
dicto[item.objectName()] = item.text()
diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py
index 0898d51..cea6275 100644
--- a/src/submissions/frontend/custom_widgets/misc.py
+++ b/src/submissions/frontend/custom_widgets/misc.py
@@ -2,40 +2,27 @@
Contains miscellaneous widgets for frontend functions
'''
from datetime import date
-import difflib
-from typing import Tuple
from PyQt6.QtWidgets import (
QLabel, QVBoxLayout,
QLineEdit, QComboBox, QDialog,
QDialogButtonBox, QDateEdit, QSizePolicy, QWidget,
QGridLayout, QPushButton, QSpinBox, QDoubleSpinBox,
- QHBoxLayout, QMainWindow
+ QHBoxLayout
)
from PyQt6.QtCore import Qt, QDate, QSize
-# from submissions.backend.db.functions import lookup_kittype_by_use
-# from submissions.backend.db import lookup_regent_by_type_name_and_kit_name
from tools import check_not_nan
from ..all_window_functions import extract_form_info
from backend.db import get_all_reagenttype_names, lookup_all_sample_types, create_kit_from_yaml, \
- lookup_regent_by_type_name, lookup_kittype_by_use, lookup_all_orgs
- #, lookup_regent_by_type_name_and_kit_name
-from backend.excel.parser import SheetParser
-from jinja2 import Environment, FileSystemLoader
-import sys
-from pathlib import Path
+ lookup_regent_by_type_name, lookup_last_used_reagenttype_lot
+from configure import jinja_template_loading
import logging
import numpy as np
from .pop_ups import AlertPop
+from backend.pydant import PydReagent
logger = logging.getLogger(f"submissions.{__name__}")
-if getattr(sys, 'frozen', False):
- loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
-else:
- loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
-loader = FileSystemLoader(loader_path)
-env = Environment(loader=loader)
-
+env = jinja_template_loading()
class AddReagentForm(QDialog):
"""
@@ -210,45 +197,6 @@ class KitAdder(QWidget):
msg = AlertPop(message=result['message'], status=result['status'])
msg.exec()
- # def extract_form_info(self, object):
- # """
- # retrieves arbitrary number of labels, values from form
-
- # Args:
- # object (_type_): the object to extract info from
-
- # Returns:
- # _type_: _description_
- # """
- # labels = []
- # values = []
- # reagents = {}
- # for item in object.findChildren(QWidget):
- # logger.debug(item.parentWidget())
- # # if not isinstance(item.parentWidget(), ReagentTypeForm):
- # match item:
- # case QLabel():
- # labels.append(item.text().replace(" ", "_").strip(":").lower())
- # case QLineEdit():
- # # ad hoc check to prevent double reporting of qdatedit under lineedit for some reason
- # if not isinstance(prev_item, QDateEdit) and not isinstance(prev_item, QComboBox) and not isinstance(prev_item, QSpinBox) and not isinstance(prev_item, QScrollBar):
- # logger.debug(f"Previous: {prev_item}")
- # logger.debug(f"Item: {item}, {item.text()}")
- # values.append(item.text().strip())
- # case QComboBox():
- # values.append(item.currentText().strip())
- # case QDateEdit():
- # values.append(item.date().toPyDate())
- # case QSpinBox():
- # values.append(item.value())
- # case ReagentTypeForm():
- # re_labels, re_values, _ = self.extract_form_info(item)
- # reagent = {item[0]:item[1] for item in zip(re_labels, re_values)}
- # logger.debug(reagent)
- # # reagent = {reagent['name:']:{'eol':reagent['extension_of_life_(months):']}}
- # reagents[reagent["name_(*exactly*_as_it_appears_in_the_excel_submission_form)"].strip()] = {'eol_ext':int(reagent['extension_of_life_(months)'])}
- # prev_item = item
- # return labels, values, reagents
class ReagentTypeForm(QWidget):
"""
@@ -300,79 +248,24 @@ class ControlsDatePicker(QWidget):
return QSize(80,20)
-# class ImportReagent(QComboBox):
-
-# def __init__(self, ctx:dict, item:str, prsr:SheetParser|None=None):
-# super().__init__()
-# self.setEditable(True)
-# # Ensure that all reagenttypes have a name that matches the items in the excel parser
-# query_var = item.replace("lot_", "")
-# if prsr != None:
-# logger.debug(f"Import Reagent is looking at: {prsr.sub[item]} for {item}")
-# else:
-# logger.debug(f"Import Reagent is going to retrieve all reagents for {item}")
-# logger.debug(f"Query for: {query_var}")
-# if prsr != None:
-# if isinstance(prsr.sub[item], np.float64):
-# logger.debug(f"{prsr.sub[item]['lot']} is a numpy float!")
-# try:
-# prsr.sub[item] = int(prsr.sub[item]['lot'])
-# except ValueError:
-# pass
-# # query for reagents using type name from sheet and kit from sheet
-# logger.debug(f"Attempting lookup of reagents by type: {query_var}")
-# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
-# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
-# # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
-# output_reg = []
-# for reagent in relevant_reagents:
-# # extract strings from any sets.
-# if isinstance(reagent, set):
-# for thing in reagent:
-# output_reg.append(thing)
-# elif isinstance(reagent, str):
-# output_reg.append(reagent)
-# relevant_reagents = output_reg
-# # if reagent in sheet is not found insert it into the front of relevant reagents so it shows
-# if prsr != None:
-# logger.debug(f"Relevant reagents for {prsr.sub[item]}: {relevant_reagents}")
-# if str(prsr.sub[item]['lot']) not in relevant_reagents:
-# if check_not_nan(prsr.sub[item]['lot']):
-# relevant_reagents.insert(0, str(prsr.sub[item]['lot']))
-# else:
-# if len(relevant_reagents) > 1:
-# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
-# idx = relevant_reagents.index(str(prsr.sub[item]['lot']))
-# logger.debug(f"The index we got for {prsr.sub[item]['lot']} in {relevant_reagents} was {idx}")
-# moved_reag = relevant_reagents.pop(idx)
-# relevant_reagents.insert(0, moved_reag)
-# else:
-# logger.debug(f"Found {prsr.sub[item]['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
-# logger.debug(f"New relevant reagents: {relevant_reagents}")
-# self.setObjectName(f"lot_{item}")
-# self.addItems(relevant_reagents)
-
-
class ImportReagent(QComboBox):
- def __init__(self, ctx:dict, reagent:dict):
+ def __init__(self, ctx:dict, reagent:PydReagent):
super().__init__()
self.setEditable(True)
# Ensure that all reagenttypes have a name that matches the items in the excel parser
- query_var = reagent['type']
- logger.debug(f"Import Reagent is looking at: {reagent['lot']} for {reagent['type']}")
-
- if isinstance(reagent['lot'], np.float64):
- logger.debug(f"{reagent['lot']} is a numpy float!")
+ query_var = reagent.type
+ logger.debug(f"Import Reagent is looking at: {reagent.lot} for {reagent.type}")
+ if isinstance(reagent.lot, np.float64):
+ logger.debug(f"{reagent.lot} is a numpy float!")
try:
- reagent['lot'] = int(reagent['lot'])
+ reagent.lot = int(reagent.lot)
except ValueError:
pass
# query for reagents using type name from sheet and kit from sheet
logger.debug(f"Attempting lookup of reagents by type: {query_var}")
# below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
- relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]#, kit_name=prsr.sub['extraction_kit'])]
- # relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name_and_kit_name(ctx=ctx, type_name=query_var, kit_name=prsr.sub['extraction_kit'])]
+ relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]
output_reg = []
for rel_reagent in relevant_reagents:
# extract strings from any sets.
@@ -383,20 +276,79 @@ class ImportReagent(QComboBox):
output_reg.append(rel_reagent)
relevant_reagents = output_reg
# if reagent in sheet is not found insert it into the front of relevant reagents so it shows
- # if prsr != None:
- logger.debug(f"Relevant reagents for {reagent['lot']}: {relevant_reagents}")
- if str(reagent['lot']) not in relevant_reagents:
- if check_not_nan(reagent['lot']):
- relevant_reagents.insert(0, str(reagent['lot']))
+ logger.debug(f"Relevant reagents for {reagent.lot}: {relevant_reagents}")
+ if str(reagent.lot) not in relevant_reagents:
+ if check_not_nan(reagent.lot):
+ relevant_reagents.insert(0, str(reagent.lot))
+ else:
+ # TODO: look up the last used reagent of this type in the database
+ looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type)
+ logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}")
+ if looked_up_reg != None:
+ relevant_reagents.remove(str(looked_up_reg.lot))
+ relevant_reagents.insert(0, str(looked_up_reg.lot))
else:
if len(relevant_reagents) > 1:
- logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
- idx = relevant_reagents.index(str(reagent['lot']))
- logger.debug(f"The index we got for {reagent['lot']} in {relevant_reagents} was {idx}")
+ logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. Moving to front of list.")
+ idx = relevant_reagents.index(str(reagent.lot))
+ logger.debug(f"The index we got for {reagent.lot} in {relevant_reagents} was {idx}")
moved_reag = relevant_reagents.pop(idx)
relevant_reagents.insert(0, moved_reag)
else:
- logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
+ logger.debug(f"Found {reagent.lot} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
logger.debug(f"New relevant reagents: {relevant_reagents}")
- self.setObjectName(f"lot_{reagent['type']}")
- self.addItems(relevant_reagents)
\ No newline at end of file
+ self.setObjectName(f"lot_{reagent.type}")
+ self.addItems(relevant_reagents)
+
+
+# class ImportReagent(QComboBox):
+
+# def __init__(self, ctx:dict, reagent:dict):
+# super().__init__()
+# self.setEditable(True)
+# # Ensure that all reagenttypes have a name that matches the items in the excel parser
+# query_var = reagent['type']
+# logger.debug(f"Import Reagent is looking at: {reagent['lot']} for {reagent['type']}")
+# if isinstance(reagent['lot'], np.float64):
+# logger.debug(f"{reagent['lot']} is a numpy float!")
+# try:
+# reagent['lot'] = int(reagent['lot'])
+# except ValueError:
+# pass
+# # query for reagents using type name from sheet and kit from sheet
+# logger.debug(f"Attempting lookup of reagents by type: {query_var}")
+# # below was lookup_reagent_by_type_name_and_kit_name, but I couldn't get it to work.
+# relevant_reagents = [item.__str__() for item in lookup_regent_by_type_name(ctx=ctx, type_name=query_var)]
+# output_reg = []
+# for rel_reagent in relevant_reagents:
+# # extract strings from any sets.
+# if isinstance(rel_reagent, set):
+# for thing in rel_reagent:
+# output_reg.append(thing)
+# elif isinstance(rel_reagent, str):
+# output_reg.append(rel_reagent)
+# relevant_reagents = output_reg
+# # if reagent in sheet is not found insert it into the front of relevant reagents so it shows
+# logger.debug(f"Relevant reagents for {reagent['lot']}: {relevant_reagents}")
+# if str(reagent['lot']) not in relevant_reagents:
+# if check_not_nan(reagent['lot']):
+# relevant_reagents.insert(0, str(reagent['lot']))
+# else:
+# # TODO: look up the last used reagent of this type in the database
+# looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent['type'])
+# logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}")
+# if looked_up_reg != None:
+# relevant_reagents.remove(str(looked_up_reg.lot))
+# relevant_reagents.insert(0, str(looked_up_reg.lot))
+# else:
+# if len(relevant_reagents) > 1:
+# logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. Moving to front of list.")
+# idx = relevant_reagents.index(str(reagent['lot']))
+# logger.debug(f"The index we got for {reagent['lot']} in {relevant_reagents} was {idx}")
+# moved_reag = relevant_reagents.pop(idx)
+# relevant_reagents.insert(0, moved_reag)
+# else:
+# logger.debug(f"Found {reagent['lot']} in relevant reagents: {relevant_reagents}. But no need to move due to short list.")
+# logger.debug(f"New relevant reagents: {relevant_reagents}")
+# self.setObjectName(f"lot_{reagent['type']}")
+# self.addItems(relevant_reagents)
\ No newline at end of file
diff --git a/src/submissions/frontend/custom_widgets/pop_ups.py b/src/submissions/frontend/custom_widgets/pop_ups.py
index 7672115..74d6a1f 100644
--- a/src/submissions/frontend/custom_widgets/pop_ups.py
+++ b/src/submissions/frontend/custom_widgets/pop_ups.py
@@ -3,22 +3,15 @@ Contains dialogs for notification and prompting.
'''
from PyQt6.QtWidgets import (
QLabel, QVBoxLayout, QDialog,
- QDialogButtonBox, QMessageBox
+ QDialogButtonBox, QMessageBox, QComboBox
)
-from jinja2 import Environment, FileSystemLoader
-import sys
-from pathlib import Path
+from configure import jinja_template_loading
import logging
+from backend.db.functions import lookup_kittype_by_use
logger = logging.getLogger(f"submissions.{__name__}")
-# determine if pyinstaller launcher is being used
-if getattr(sys, 'frozen', False):
- loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
-else:
- loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
-loader = FileSystemLoader(loader_path)
-env = Environment(loader=loader)
+env = jinja_template_loading()
class QuestionAsker(QDialog):
@@ -52,3 +45,37 @@ class AlertPop(QMessageBox):
self.setInformativeText(message)
self.setWindowTitle(status.title())
+class KitSelector(QDialog):
+ """
+ dialog to ask yes/no questions
+ """
+ def __init__(self, ctx:dict, title:str, message:str) -> QDialog:
+ super().__init__()
+ self.setWindowTitle(title)
+ self.widget = QComboBox()
+ kits = [item.__str__() for item in lookup_kittype_by_use(ctx=ctx)]
+ self.widget.addItems(kits)
+ self.widget.setEditable(False)
+ # set yes/no buttons
+ QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
+ self.buttonBox = QDialogButtonBox(QBtn)
+ self.buttonBox.accepted.connect(self.accept)
+ self.buttonBox.rejected.connect(self.reject)
+ self.layout = QVBoxLayout()
+ # Text for the yes/no question
+ message = QLabel(message)
+ self.layout.addWidget(message)
+ self.layout.addWidget(self.widget)
+ self.layout.addWidget(self.buttonBox)
+ self.setLayout(self.layout)
+
+ def getValues(self):
+ return self.widget.currentText()
+
+ # @staticmethod
+ # def launch(parent):
+ # dlg = KitSelector(parent)
+ # r = dlg.exec_()
+ # if r:
+ # return dlg.getValues()
+ # return None
\ No newline at end of file
diff --git a/src/submissions/frontend/custom_widgets/sub_details.py b/src/submissions/frontend/custom_widgets/sub_details.py
index 04a36af..9173753 100644
--- a/src/submissions/frontend/custom_widgets/sub_details.py
+++ b/src/submissions/frontend/custom_widgets/sub_details.py
@@ -4,22 +4,19 @@ Contains widgets specific to the submission summary and submission details.
import base64
from datetime import datetime
from io import BytesIO
-import math
from PyQt6 import QtPrintSupport
from PyQt6.QtWidgets import (
QVBoxLayout, QDialog, QTableView,
QTextEdit, QPushButton, QScrollArea,
QMessageBox, QFileDialog, QMenu, QLabel,
- QDialogButtonBox, QToolBar, QMainWindow
+ QDialogButtonBox, QToolBar
)
-from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel, QItemSelectionModel
+from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel
from PyQt6.QtGui import QFontMetrics, QAction, QCursor, QPixmap, QPainter
from backend.db import submissions_to_df, lookup_submission_by_id, delete_submission_by_id, lookup_submission_by_rsl_num, hitpick_plate
-# from backend.misc import hitpick_plate
from backend.excel import make_hitpicks
-from jinja2 import Environment, FileSystemLoader
+from configure import jinja_template_loading
from xhtml2pdf import pisa
-import sys
from pathlib import Path
import logging
from .pop_ups import QuestionAsker, AlertPop
@@ -28,12 +25,7 @@ from getpass import getuser
logger = logging.getLogger(f"submissions.{__name__}")
-if getattr(sys, 'frozen', False):
- loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
-else:
- loader_path = Path(__file__).parents[2].joinpath('templates').absolute().__str__()
-loader = FileSystemLoader(loader_path)
-env = Environment(loader=loader)
+env = jinja_template_loading()
class pandasModel(QAbstractTableModel):
"""
@@ -407,7 +399,7 @@ class BarcodeWindow(QDialog):
class SubmissionComment(QDialog):
"""
- a window showing text details of submission
+ a window for adding comment text to a submission
"""
def __init__(self, ctx:dict, rsl:str) -> None:
diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py
index 28d65b6..fc3c7ee 100644
--- a/src/submissions/frontend/main_window_functions.py
+++ b/src/submissions/frontend/main_window_functions.py
@@ -29,24 +29,27 @@ from backend.db.functions import (
)
from backend.excel.parser import SheetParser, PCRParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
+from backend.pydant import PydReagent
from tools import check_not_nan, check_kit_integrity
from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker
from .custom_widgets.misc import ImportReagent
from .visualizations.control_charts import create_charts, construct_html
+from typing import List
+from openpyxl import load_workbook
logger = logging.getLogger(f"submissions.{__name__}")
def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]:
"""
- _summary_
+ Import a new submission to the app window
Args:
- obj (QMainWindow): _description_
+ obj (QMainWindow): original app window
Returns:
- Tuple[QMainWindow, dict|None]: _description_
+ Tuple[QMainWindow, dict|None]: Collection of new main app window and result dict
"""
logger.debug(f"\n\nStarting Import...\n\n")
result = None
@@ -54,8 +57,10 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# initialize samples
obj.samples = []
obj.reagents = {}
+ obj.missing_reagents = []
+
# set file dialog
- fname = select_open_file(obj, extension="xlsx")
+ fname = select_open_file(obj, file_extension="xlsx")
logger.debug(f"Attempting to parse file: {fname}")
if not fname.exists():
result = dict(message=f"File {fname.__str__()} not found.", status="critical")
@@ -65,116 +70,37 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
prsr = SheetParser(ctx=obj.ctx, filepath=fname)
except PermissionError:
logger.error(f"Couldn't get permission to access file: {fname}")
- return
+ return obj, result
+ # obj.column_count = prsr.column_count
try:
+ logger.debug(f"Submission dictionary: {prsr.sub}")
pyd = prsr.to_pydantic()
- logger.debug(f"Pydantic result: \n\n{pyd}\n\n")
- # with open("pickled.pkl", "wb") as f:
- # pickle.dump(pyd, f)
+ logger.debug(f"Pydantic result: \n\n{pprint.pformat(pyd)}\n\n")
except Exception as e:
return obj, dict(message= f"Problem creating pydantic model:\n\n{e}", status="critical")
- # moved to pydantic model
- # if prsr.sub['rsl_plate_num'] == None:
- # prsr.sub['rsl_plate_num'] = RSLNamer(fname.__str__()).parsed_name
- # logger.debug(f"prsr.sub = {prsr.sub}")
- for sample in pyd.samples:
- if hasattr(sample, "elution_well"):
- logger.debug(f"Sample from import: {sample.elution_well}")
- # obj.current_submission_type = prsr.sub['submission_type']
+ try:
+ obj.xl = prsr.filepath
+ except Exception as e:
+ logger.error(f"Unable to make obj xl.")
+ # for sample in pyd.samples:
+ # if hasattr(sample, "elution_well"):
+ # logger.debug(f"Sample from import: {sample.elution_well}")
+ # I don't remember why this is here.
+ missing_info = [k for k,v in pyd if v == None]
obj.current_submission_type = pyd.submission_type
# destroy any widgets from previous imports
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
- # # regex to parser out different variable types for decision making
- # variable_parser = re.compile(r"""
- # (?P^extraction_kit$) |
- # (?P^submitted_date$) |
- # (?P)^submitting_lab$ |
- # (?P)^samples$ |
- # (?P^lot_.*$) |
- # (?P^csv$)
- # """, re.VERBOSE)
- # for item in prsr.sub:
- # logger.debug(f"Item: {item}")
- # # attempt to match variable name to regex group
- # try:
- # mo = variable_parser.fullmatch(item).lastgroup
- # except AttributeError:
- # mo = "other"
- # logger.debug(f"Mo: {mo}")
- # match mo:
- # case 'submitting_lab':
- # # create label
- # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
- # logger.debug(f"{item}: {prsr.sub[item]}")
- # # create combobox to hold looked up submitting labs
- # add_widget = QComboBox()
- # labs = [item.__str__() for item in lookup_all_orgs(ctx=obj.ctx)]
- # # try to set closest match to top of list
- # try:
- # labs = difflib.get_close_matches(prsr.sub[item], labs, len(labs), 0)
- # except (TypeError, ValueError):
- # pass
- # # set combobox values to lookedup values
- # add_widget.addItems(labs)
- # case 'extraction_kit':
- # # create label
- # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
- # # if extraction kit not available, all other values fail
- # if not check_not_nan(prsr.sub[item]):
- # msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
- # msg.exec()
- # # create combobox to hold looked up kits
- # add_widget = QComboBox()
- # # lookup existing kits by 'submission_type' decided on by sheetparser
- # uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=prsr.sub['submission_type'])]
- # if check_not_nan(prsr.sub[item]):
- # logger.debug(f"The extraction kit in parser was: {prsr.sub[item]}")
- # uses.insert(0, uses.pop(uses.index(prsr.sub[item])))
- # obj.ext_kit = prsr.sub[item]
- # else:
- # logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
- # obj.ext_kit = uses[0]
- # add_widget.addItems(uses)
- # case 'submitted_date':
- # # create label
- # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
- # # uses base calendar
- # add_widget = QDateEdit(calendarPopup=True)
- # # sets submitted date based on date found in excel sheet
- # try:
- # add_widget.setDate(prsr.sub[item])
- # # if not found, use today
- # except:
- # add_widget.setDate(date.today())
- # case 'reagent':
- # # create label
- # reg_label = QLabel(item.replace("_", " ").title())
- # reg_label.setObjectName(f"lot_{item}_label")
- # obj.table_widget.formlayout.addWidget(reg_label)
- # # create reagent choice widget
- # add_widget = ImportReagent(ctx=obj.ctx, item=item, prsr=prsr)
- # obj.reagents[item] = prsr.sub[item]
- # case 'samples':
- # # hold samples in 'obj' until form submitted
- # logger.debug(f"{item}: {prsr.sub[item]}")
- # obj.samples = prsr.sub[item]
- # add_widget = None
- # case 'csv':
- # obj.csv = prsr.sub[item]
- # case _:
- # # anything else gets added in as a line edit
- # obj.table_widget.formlayout.addWidget(QLabel(item.replace("_", " ").title()))
- # add_widget = QLineEdit()
- # logger.debug(f"Setting widget text to {str(prsr.sub[item]).replace('_', ' ')}")
- # add_widget.setText(str(prsr.sub[item]).replace("_", " "))
+ # Get list of fields from pydantic model.
fields = list(pyd.model_fields.keys())
fields.remove('filepath')
logger.debug(f"pydantic fields: {fields}")
for field in fields:
value = getattr(pyd, field)
- if not check_not_nan(value):
- continue
+ logger.debug(f"Checking: {field}: {value}")
+ # No longer necessary with addition of pydantic validations
+ # if not check_not_nan(value):
+ # continue
match field:
case 'submitting_lab':
# create label
@@ -194,17 +120,19 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# create label
label = QLabel(field.replace("_", " ").title())
# if extraction kit not available, all other values fail
- if not check_not_nan(value):
+ if not check_not_nan(value['value']):
msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
msg.exec()
+ if not value['parsed']:
+ missing_info.append(field)
# create combobox to hold looked up kits
add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type)]
if check_not_nan(value):
- logger.debug(f"The extraction kit in parser was: {value}")
- uses.insert(0, uses.pop(uses.index(value)))
- obj.ext_kit = value
+ logger.debug(f"The extraction kit in parser was: {value['value']}")
+ uses.insert(0, uses.pop(uses.index(value['value'])))
+ obj.ext_kit = value['value']
else:
logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
obj.ext_kit = uses[0]
@@ -225,23 +153,33 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
logger.debug(f"{field}:\n\t{value}")
obj.samples = value
continue
- case 'csv':
- obj.csv = value
+ case "ctx":
continue
case 'reagents':
for reagent in value:
# create label
- reg_label = QLabel(reagent['type'].replace("_", " ").title())
- reg_label.setObjectName(f"lot_{reagent['type']}_label")
- # obj.table_widget.formlayout.addWidget(reg_label)
+ # reg_label = QLabel(reagent['type'].replace("_", " ").title())
+ try:
+ reg_label = QLabel(reagent.type.replace("_", " ").title())
+ except AttributeError:
+ continue
+ # reg_label.setObjectName(f"lot_{reagent['type']}_label")
+ reg_label.setObjectName(f"lot_{reagent.type}_label")
# create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent)
- add_widget.setObjectName(f"lot_{reagent['type']}")
+ add_widget.setObjectName(f"lot_{reagent.type}")
logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(reg_label)
obj.table_widget.formlayout.addWidget(add_widget)
- obj.reagents[reagent['type']] = reagent['lot']
+ obj.reagents[reagent.type] = reagent
continue
+ case "rsl_plate_num":
+ if not value['parsed']:
+ missing_info.append(field)
+ label = QLabel(field.replace("_", " ").title())
+ add_widget = QLineEdit()
+ logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}")
+ add_widget.setText(str(value['value']).replace("_", " "))
case _:
# anything else gets added in as a line edit
label = QLabel(field.replace("_", " ").title())
@@ -258,13 +196,27 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# compare obj.reagents with expected reagents in kit
if hasattr(obj, 'ext_kit'):
obj.kit_integrity_completion()
+ obj.missing_reagents = obj.missing_reagents + missing_info
logger.debug(f"Imported reagents: {obj.reagents}")
if prsr.sample_result != None:
msg = AlertPop(message=prsr.sample_result, status="WARNING")
msg.exec()
+ logger.debug(f"Pydantic extra fields: {pyd.model_extra}")
+ if "csv" in pyd.model_extra:
+ obj.csv = pyd.model_extra['csv']
+ logger.debug(f"All attributes of obj:\n{pprint.pformat(obj.__dict__)}")
return obj, result
-def kit_reload_function(obj:QMainWindow) -> QMainWindow:
+def kit_reload_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Reload the fields in the form
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
if isinstance(item, QLabel):
@@ -277,7 +229,16 @@ def kit_reload_function(obj:QMainWindow) -> QMainWindow:
obj.kit_integrity_completion_function()
return obj, result
-def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
+def kit_integrity_completion_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Compare kit contents to parsed contents
+
+ Args:
+ obj (QMainWindow): The original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
logger.debug(inspect.currentframe().f_back.f_code.co_name)
# find the widget that contains lit info
@@ -296,9 +257,11 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
# if kit integrity comes back with an error, make widgets with missing reagents using default info
if kit_integrity != None:
result = dict(message=kit_integrity['message'], status="Warning")
+ obj.missing_reagents = kit_integrity['missing']
for item in kit_integrity['missing']:
obj.table_widget.formlayout.addWidget(QLabel(f"Lot {item.replace('_', ' ').title()}"))
- add_widget = ImportReagent(ctx=obj.ctx, reagent=dict(type=item, lot=None, exp=None))#item=item)
+ reagent = dict(type=item, lot=None, exp=None)
+ add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent))#item=item)
obj.table_widget.formlayout.addWidget(add_widget)
submit_btn = QPushButton("Submit")
submit_btn.setObjectName("lot_submit_btn")
@@ -306,38 +269,50 @@ def kit_integrity_completion_function(obj:QMainWindow) -> QMainWindow:
submit_btn.clicked.connect(obj.submit_new_sample)
return obj, result
-def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
+def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Parse forms and add sample to the database.
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
logger.debug(f"\n\nBeginning Submission\n\n")
result = None
# extract info from the form widgets
info = extract_form_info(obj.table_widget.tab1)
# seperate out reagents
- reagents = {k:v for k,v in info.items() if k.startswith("lot_")}
+ reagents = {k.replace("lot_", ""):v for k,v in info.items() if k.startswith("lot_")}
info = {k:v for k,v in info.items() if not k.startswith("lot_")}
logger.debug(f"Info: {info}")
logger.debug(f"Reagents: {reagents}")
parsed_reagents = []
# compare reagents in form to reagent database
for reagent in reagents:
- # TODO: have this lookup by type and lot
+ # Lookup any existing reagent of this type with this lot number
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent)
logger.debug(f"Looked up reagent: {wanted_reagent}")
+ # logger.debug(f"\n\nLooking for {reagent} in {obj.reagents}\n\n")
# if reagent not found offer to add to database
if wanted_reagent == None:
r_lot = reagents[reagent]
dlg = QuestionAsker(title=f"Add {r_lot}?", message=f"Couldn't find reagent type {reagent.replace('_', ' ').title().strip('Lot')}: {r_lot} in the database.\n\nWould you like to add it?")
if dlg.exec():
logger.debug(f"checking reagent: {reagent} in obj.reagents. Result: {obj.reagents[reagent]}")
- expiry_date = obj.reagents[reagent]['exp']
+ expiry_date = obj.reagents[reagent].exp
wanted_reagent = obj.add_reagent(reagent_lot=r_lot, reagent_type=reagent.replace("lot_", ""), expiry=expiry_date)
else:
# In this case we will have an empty reagent and the submission will fail kit integrity check
logger.debug("Will not add reagent.")
if wanted_reagent != None:
parsed_reagents.append(wanted_reagent)
+ wanted_reagent.type.last_used = reagents[reagent]
# move samples into preliminary submission dict
info['samples'] = obj.samples
info['uploaded_by'] = getuser()
+ # info['columns'] = obj.column_count
# construct submission object
logger.debug(f"Here is the info_dict: {pprint.pformat(info)}")
base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info)
@@ -359,6 +334,7 @@ def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
# add reagents to submission object
for reagent in parsed_reagents:
base_submission.reagents.append(reagent)
+ logger.debug(f"Parsed reagents: {pprint.pformat(parsed_reagents)}")
logger.debug("Checking kit integrity...")
kit_integrity = check_kit_integrity(base_submission)
if kit_integrity != None:
@@ -371,7 +347,15 @@ def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
# reset form
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None)
- logger.debug(f"All attributes of obj: {pprint.pprint(obj.__dict__)}")
+ logger.debug(f"All attributes of obj: {pprint.pformat(obj.__dict__)}")
+ if len(obj.missing_reagents) > 0:
+ logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.")
+ extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit)
+ logger.debug(f"We have the extraction kit: {extraction_kit.name}")
+ logger.debug(f"Extraction kit map:\n\n{extraction_kit.used_for[obj.current_submission_type]}")
+ excel_map = extraction_kit.used_for[obj.current_submission_type]
+ input_reagents = [item.to_reagent_dict() for item in parsed_reagents]
+ autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info)
if hasattr(obj, 'csv'):
dlg = QuestionAsker("Export CSV?", "Would you like to export the csv file?")
if dlg.exec():
@@ -384,13 +368,18 @@ def submit_new_sample_function(obj:QMainWindow) -> QMainWindow:
delattr(obj, "csv")
except AttributeError:
pass
- # if obj.current_submission_type == "Bacterial_Culture":
- # hitpick = hitpick_plate(base_submission)
- # image = make_plate_map(hitpick)
- # image.show()
return obj, result
-def generate_report_function(obj:QMainWindow) -> QMainWindow:
+def generate_report_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Generate a summary of activities for a time period
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
# ask for date ranges
dlg = ReportDatePicker()
@@ -430,7 +419,16 @@ def generate_report_function(obj:QMainWindow) -> QMainWindow:
writer.close()
return obj, result
-def add_kit_function(obj:QMainWindow) -> QMainWindow:
+def add_kit_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Add a new kit to the database.
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
# setup file dialog to find yaml flie
fname = select_open_file(obj, extension="yml")
@@ -449,7 +447,16 @@ def add_kit_function(obj:QMainWindow) -> QMainWindow:
result = create_kit_from_yaml(ctx=obj.ctx, exp=exp)
return obj, result
-def add_org_function(obj:QMainWindow) -> QMainWindow:
+def add_org_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Add a new organization to the database.
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
# setup file dialog to find yaml flie
fname = select_open_file(obj, extension="yml")
@@ -468,7 +475,16 @@ def add_org_function(obj:QMainWindow) -> QMainWindow:
result = create_org_from_yaml(ctx=obj.ctx, org=org)
return obj, result
-def controls_getter_function(obj:QMainWindow) -> QMainWindow:
+def controls_getter_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Get controls based on start/end dates
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
# subtype defaults to disabled
try:
@@ -480,11 +496,12 @@ def controls_getter_function(obj:QMainWindow) -> QMainWindow:
logger.warning("Start date after end date is not allowed!")
threemonthsago = obj.table_widget.datepicker.end_date.date().addDays(-60)
# block signal that will rerun controls getter and set start date
+ # Without triggering this function again
with QSignalBlocker(obj.table_widget.datepicker.start_date) as blocker:
obj.table_widget.datepicker.start_date.setDate(threemonthsago)
obj._controls_getter()
return obj, result
- # convert to python useable date object
+ # convert to python useable date objects
obj.start_date = obj.table_widget.datepicker.start_date.date().toPyDate()
obj.end_date = obj.table_widget.datepicker.end_date.date().toPyDate()
obj.con_type = obj.table_widget.control_typer.currentText()
@@ -504,15 +521,15 @@ def controls_getter_function(obj:QMainWindow) -> QMainWindow:
obj._chart_maker()
return obj, result
-def chart_maker_function(obj:QMainWindow) -> QMainWindow:
+def chart_maker_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
"""
- create html chart for controls reporting
+ Create html chart for controls reporting
Args:
- obj (QMainWindow): original MainWindow
+ obj (QMainWindow): original app window
Returns:
- QMainWindow: MainWindow with control display updates
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
"""
result = None
logger.debug(f"Control getter context: \n\tControl type: {obj.con_type}\n\tMode: {obj.mode}\n\tStart Date: {obj.start_date}\n\tEnd Date: {obj.end_date}")
@@ -552,7 +569,16 @@ def chart_maker_function(obj:QMainWindow) -> QMainWindow:
logger.debug("Figure updated... I hope.")
return obj, result
-def link_controls_function(obj:QMainWindow) -> QMainWindow:
+def link_controls_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Link scraped controls to imported submissions.
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
all_bcs = lookup_all_submissions_by_type(obj.ctx, "Bacterial Culture")
logger.debug(all_bcs)
@@ -601,12 +627,20 @@ def link_controls_function(obj:QMainWindow) -> QMainWindow:
# msg.exec()
return obj, result
-def link_extractions_function(obj:QMainWindow) -> QMainWindow:
+def link_extractions_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Link extractions from runlogs to imported submissions
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
- # home_dir = str(Path(obj.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0])
- fname = select_open_file(obj, extension="csv")
+ fname = select_open_file(obj, file_extension="csv")
with open(fname.__str__(), 'r') as f:
+ # split csv on commas
runs = [col.strip().split(",") for col in f.readlines()]
count = 0
for run in runs:
@@ -618,9 +652,12 @@ def link_extractions_function(obj:QMainWindow) -> QMainWindow:
experiment_name=run[4].strip(),
end_time=run[5].strip()
)
+ # elution columns are item 6 in the comma split list to the end
for ii in range(6, len(run)):
new_run[f"column{str(ii-5)}_vol"] = run[ii]
+ # Lookup imported submissions
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
+ # If no such submission exists, move onto the next run
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
count += 1
@@ -630,12 +667,14 @@ def link_extractions_function(obj:QMainWindow) -> QMainWindow:
existing = json.loads(sub.extraction_info)
else:
existing = None
+ # Check if the new info already exists in the imported submission
try:
if json.dumps(new_run) in sub.extraction_info:
logger.debug(f"Looks like we already have that info.")
continue
except TypeError:
pass
+ # Update or create the extraction info
if existing != None:
try:
logger.debug(f"Updating {type(existing)}: {existing} with {type(new_run)}: {new_run}")
@@ -653,12 +692,20 @@ def link_extractions_function(obj:QMainWindow) -> QMainWindow:
result = dict(message=f"We added {count} logs to the database.", status='information')
return obj, result
-def link_pcr_function(obj:QMainWindow) -> QMainWindow:
+def link_pcr_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Link PCR data from run logs to an imported submission
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
- # home_dir = str(Path(obj.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "csv(*.csv)")[0])
- fname = select_open_file(obj, extension="csv")
+ fname = select_open_file(obj, file_extension="csv")
with open(fname.__str__(), 'r') as f:
+ # split csv rows on comma
runs = [col.strip().split(",") for col in f.readlines()]
count = 0
for run in runs:
@@ -670,17 +717,19 @@ def link_pcr_function(obj:QMainWindow) -> QMainWindow:
experiment_name=run[4].strip(),
end_time=run[5].strip()
)
- # for ii in range(6, len(run)):
- # obj[f"column{str(ii-5)}_vol"] = run[ii]
+ # lookup imported submission
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=new_run['rsl_plate_num'])
+ # if imported submission doesn't exist move on to next run
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
continue
+ # check if pcr_info already exists
if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
existing = json.loads(sub.pcr_info)
else:
existing = None
+ # check if this entry already exists in imported submission
try:
if json.dumps(new_run) in sub.pcr_info:
logger.debug(f"Looks like we already have that info.")
@@ -706,17 +755,25 @@ def link_pcr_function(obj:QMainWindow) -> QMainWindow:
result = dict(message=f"We added {count} logs to the database.", status='information')
return obj, result
-def import_pcr_results_function(obj:QMainWindow) -> QMainWindow:
+def import_pcr_results_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
+ """
+ Import Quant-studio PCR data to an imported submission
+
+ Args:
+ obj (QMainWindow): original app window
+
+ Returns:
+ Tuple[QMainWindow, dict]: Collection of new main app window and result dict
+ """
result = None
- # home_dir = str(Path(obj.ctx["directory_path"]))
- # fname = Path(QFileDialog.getOpenFileName(obj, 'Open file', home_dir, filter = "xlsx(*.xlsx)")[0])
- fname = select_open_file(obj, extension="xlsx")
+ fname = select_open_file(obj, file_extension="xlsx")
parser = PCRParser(ctx=obj.ctx, filepath=fname)
logger.debug(f"Attempting lookup for {parser.plate_num}")
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
try:
logger.debug(f"Found submission: {sub.rsl_plate_num}")
except AttributeError:
+ # If no plate is found, may be because this is a repeat. Lop off the '-1' or '-2' and repeat
logger.error(f"Submission of number {parser.plate_num} not found. Attempting rescue of plate repeat.")
parser.plate_num = "-".join(parser.plate_num.split("-")[:-1])
sub = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=parser.plate_num)
@@ -725,14 +782,13 @@ def import_pcr_results_function(obj:QMainWindow) -> QMainWindow:
except AttributeError:
logger.error(f"Rescue of {parser.plate_num} failed.")
return obj, dict(message="Couldn't find a submission with that RSL number.", status="warning")
- # jout = json.dumps(parser.pcr)
- count = 0
+ # Check if PCR info already exists
if hasattr(sub, 'pcr_info') and sub.pcr_info != None:
existing = json.loads(sub.pcr_info)
else:
- # jout = None
existing = None
if existing != None:
+ # update pcr_info
try:
logger.debug(f"Updating {type(existing)}: {existing} with {type(parser.pcr)}: {parser.pcr}")
if json.dumps(parser.pcr) not in sub.pcr_info:
@@ -757,5 +813,59 @@ def import_pcr_results_function(obj:QMainWindow) -> QMainWindow:
update_ww_sample(ctx=obj.ctx, sample_obj=sample)
result = dict(message=f"We added PCR info to {sub.rsl_plate_num}.", status='information')
return obj, result
- # dlg.exec()
+def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_reagents:List[str], info:dict):
+ """
+ Automatically fills in excel cells with reagent info.
+
+ Args:
+ obj (QMainWindow): Original main app window
+ xl_map (dict): Map of where each reagent goes in the excel workbook.
+ reagents (List[dict]): All reagents used in the kit.
+ missing_reagents (List[str]): Reagents that are required for the kit that were not present.
+ """
+ # logger.debug(reagents)
+
+ logger.debug(f"Here is the info dict coming in:\n{pprint.pformat(info)}")
+ logger.debug(f"Here are the missing reagents:\n{missing_reagents}")
+ relevant_map = {k:v for k,v in xl_map.items() if k in missing_reagents}
+ # logger.debug(relevant_map)
+ relevant_reagents = [item for item in reagents if item['type'] in missing_reagents]
+ relevant_info = {k:v for k,v in info.items() if k in missing_reagents}
+ logger.debug(f"Here is the relevant info: {pprint.pformat(relevant_info)}")
+ # logger.debug(f"Relevant reagents:\n{relevant_reagents}")
+ # construct new objects to put into excel sheets:
+ new_reagents = []
+ for reagent in relevant_reagents:
+ new_reagent = {}
+ new_reagent['type'] = reagent['type']
+ new_reagent['lot'] = relevant_map[new_reagent['type']]['lot']
+ new_reagent['lot']['value'] = reagent['lot']
+ new_reagent['expiry'] = relevant_map[new_reagent['type']]['expiry']
+ new_reagent['expiry']['value'] = reagent['expiry']
+ new_reagent['sheet'] = relevant_map[new_reagent['type']]['sheet']
+ new_reagents.append(new_reagent)
+ new_info = []
+ for item in relevant_info:
+ new_item = {}
+ new_item['type'] = item
+ new_item['location'] = relevant_map[item]
+ new_item['value'] = relevant_info[item]
+ new_info.append(new_item)
+ logger.debug(f"New reagents: {new_reagents}")
+ workbook = load_workbook(obj.xl)
+ sheets = workbook.sheetnames
+ logger.debug(workbook.sheetnames)
+ for sheet in sheets:
+ worksheet=workbook[sheet]
+ sheet_reagents = [item for item in new_reagents if item['sheet'] == sheet]
+ for reagent in sheet_reagents:
+ logger.debug(f"Attempting: {reagent['type']}:")
+ worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value'])
+ worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value'])
+ sheet_info = [item for item in new_info if item['location']['sheet'] == sheet]
+ for item in sheet_info:
+ logger.debug(f"Attempting: {item['type']}")
+ worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value'])
+ fname = select_save_file(obj=obj, default_name=Path(obj.xl).stem, extension="xlsx")
+ workbook.save(filename=fname.__str__())
diff --git a/src/submissions/frontend/visualizations/control_charts.py b/src/submissions/frontend/visualizations/control_charts.py
index a1db0ee..364cebc 100644
--- a/src/submissions/frontend/visualizations/control_charts.py
+++ b/src/submissions/frontend/visualizations/control_charts.py
@@ -283,6 +283,15 @@ def divide_chunks(input_list:list, chunk_count:int):
def construct_html(figure:Figure) -> str:
+ """
+ Creates final html code from plotly
+
+ Args:
+ figure (Figure): input figure
+
+ Returns:
+ str: html string
+ """
html = ''
if figure != None:
html += plotly.offline.plot(figure, output_type='div', include_plotlyjs='cdn')#, image = 'png', auto_open=True, image_filename='plot_image')
diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py
index 6623115..27cedd8 100644
--- a/src/submissions/tools/__init__.py
+++ b/src/submissions/tools/__init__.py
@@ -1,12 +1,14 @@
'''
Contains miscellaenous functions used by both frontend and backend.
'''
+from pathlib import Path
import re
import sys
import numpy as np
import logging
import getpass
from backend.db.models import BasicSubmission, KitType
+import pandas as pd
from typing import Tuple
logger = logging.getLogger(f"submissions.{__name__}")
@@ -22,19 +24,43 @@ def check_not_nan(cell_contents) -> bool:
bool: True if cell has value, else, false.
"""
# check for nan as a string first
+ try:
+ cell_contents = cell_contents.lower()
+ except AttributeError:
+ pass
if cell_contents == 'nan':
cell_contents = np.nan
if cell_contents == None:
cell_contents = np.nan
+ try:
+ if pd.isnull(cell_contents):
+ cell_contents = np.nan
+ except ValueError:
+ pass
try:
return not np.isnan(cell_contents)
except TypeError:
return True
except Exception as e:
- logger.debug(f"Check encounteded unknown error: {type(e).__name__} - {e}")
+ logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
return False
+def convert_nans_to_nones(input_str) -> str|None:
+ """
+ Get rid of various "nan", "NAN", "NaN", etc/
+
+ Args:
+ input_str (str): input string
+
+ Returns:
+ str: _description_
+ """
+ if not check_not_nan(input_str):
+ return None
+ return input_str
+
+
def check_is_power_user(ctx:dict) -> bool:
"""
Check to ensure current user is in power users list.
@@ -83,7 +109,7 @@ def check_kit_integrity(sub:BasicSubmission|KitType, reagenttypes:list|None=None
# What type is sub?
match sub:
case BasicSubmission():
- # very hacky method to ensure interchangeable plates are not
+ # Get all required reagent types for this kit.
ext_kit_rtypes = [reagenttype.name for reagenttype in sub.extraction_kit.reagent_types if reagenttype.required == 1]
# Overwrite function parameter reagenttypes
try:
@@ -179,7 +205,7 @@ class RSLNamer(object):
self.parsed_name = self.parsed_name.replace("_", "-")
- def retrieve_rsl_number(self, in_str:str) -> Tuple[str, str]:
+ def retrieve_rsl_number(self, in_str:str|Path) -> Tuple[str, str]:
"""
Uses regex to retrieve the plate number and submission type from an input string
@@ -189,6 +215,8 @@ class RSLNamer(object):
Returns:
Tuple[str, str]: tuple of (output rsl number, submission_type)
"""
+ if isinstance(in_str, Path):
+ in_str = in_str.stem
logger.debug(f"Attempting split of {in_str}")
try:
in_str = in_str.split("\\")[-1]
@@ -197,7 +225,7 @@ class RSLNamer(object):
self.submission_type = None
return
logger.debug(f"Attempting match of {in_str}")
- print(f"The initial plate name is: {in_str}")
+ logger.debug(f"The initial plate name is: {in_str}")
regex = re.compile(r"""
# (?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)|
(?PRSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)|
@@ -206,7 +234,7 @@ class RSLNamer(object):
""", flags = re.IGNORECASE | re.VERBOSE)
m = regex.search(in_str)
try:
- self.parsed_name = m.group().upper()
+ self.parsed_name = m.group().upper().strip(".")
logger.debug(f"Got parsed submission name: {self.parsed_name}")
self.submission_type = m.lastgroup
except AttributeError as e:
@@ -221,15 +249,15 @@ class RSLNamer(object):
self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW")
self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE)
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name)
- print(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}")
+ logger.debug(f"Coming out of the preliminary parsing, the plate name is {self.parsed_name}")
try:
plate_number = re.search(r"(?:(-|_)\d)(?!\d)", self.parsed_name).group().strip("_").strip("-")
- print(f"Plate number is: {plate_number}")
+ logger.debug(f"Plate number is: {plate_number}")
except AttributeError as e:
plate_number = "1"
# self.parsed_name = re.sub(r"(\d{8})(-|_\d)?(R\d)?", fr"\1-{plate_number}\3", self.parsed_name)
self.parsed_name = re.sub(r"(\d{8})(-|_)?\d?(R\d?)?", rf"\1-{plate_number}\3", self.parsed_name)
- print(f"After addition of plate number the plate name is: {self.parsed_name}")
+ logger.debug(f"After addition of plate number the plate name is: {self.parsed_name}")
try:
repeat = re.search(r"-\dR(?P\d)?", self.parsed_name).groupdict()['repeat']
if repeat == None:
@@ -239,8 +267,6 @@ class RSLNamer(object):
self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "")
-
-
def enforce_bacterial_culture(self):
"""
Uses regex to enforce proper formatting of bacterial culture samples
@@ -249,6 +275,9 @@ class RSLNamer(object):
self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE)
def enforce_wastewater_artic(self):
+ """
+ Uses regex to enforce proper formatting of wastewater samples
+ """
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"RSL-AR-\1\2\3", self.parsed_name, flags=re.IGNORECASE)
try:
plate_number = int(re.search(r"_\d?_", self.parsed_name).group().strip("_"))