Update to autofill.

This commit is contained in:
Landon Wark
2023-09-20 09:19:59 -05:00
parent 0c843d1561
commit 82ab06efad
13 changed files with 378 additions and 149 deletions

View File

@@ -1,3 +1,7 @@
## 202309.03
- Autofill now adds name of reagent instead of type.
## 202309.02 ## 202309.02
- Massive restructure of app and database to allow better relationships between kits/reagenttypes & submissions/samples. - Massive restructure of app and database to allow better relationships between kits/reagenttypes & submissions/samples.

View File

@@ -1,5 +1,7 @@
- [ ] Make kits easier to add.
- [ ] Clean up & document code... again. - [ ] Clean up & document code... again.
- Including paring down the logging.debugs - Including paring down the logging.debugs
- Also including reducing number of functions in db.functions
- [ ] Fix Tests... again. - [ ] Fix Tests... again.
- [x] Rebuild database - [x] Rebuild database
- [x] Provide more generic names for reagenttypes in kits and move specific names to reagents. - [x] Provide more generic names for reagenttypes in kits and move specific names to reagents.

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package # Version of the realpython-reader package
__project__ = "submissions" __project__ = "submissions"
__version__ = "202309.2b" __version__ = "202309.3b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada" __copyright__ = "2022-2023, Government of Canada"

View File

@@ -18,7 +18,7 @@ import numpy as np
import yaml import yaml
from pathlib import Path from pathlib import Path
from tools import Settings, check_regex_match, RSLNamer from tools import Settings, check_regex_match, RSLNamer
from typing import List from typing import List, Tuple
@@ -27,6 +27,14 @@ logger = logging.getLogger(f"submissions.{__name__}")
# The below _should_ allow automatic creation of foreign keys in the database # The below _should_ allow automatic creation of foreign keys in the database
@event.listens_for(Engine, "connect") @event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record): def set_sqlite_pragma(dbapi_connection, connection_record):
"""
*should* allow automatic creation of foreign keys in the database
I have no idea how it actually works.
Args:
dbapi_connection (_type_): _description_
connection_record (_type_): _description_
"""
cursor = dbapi_connection.cursor() cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON") cursor.execute("PRAGMA foreign_keys=ON")
cursor.close() cursor.close()
@@ -79,7 +87,7 @@ def store_reagent(ctx:Settings, reagent:models.Reagent) -> None|dict:
return {"message":"The database is locked for editing."} return {"message":"The database is locked for editing."}
return None return None
def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmission: def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.BasicSubmission, dict]:
""" """
Construct submission object from dictionary Construct submission object from dictionary
@@ -273,7 +281,7 @@ def lookup_reagenttype_by_name(ctx:Settings, rt_name:str) -> models.ReagentType:
Returns: Returns:
models.ReagentType: looked up reagent type models.ReagentType: looked up reagent type
""" """
logger.debug(f"Looking up ReagentType by name: {rt_name.title()}") logger.debug(f"Looking up ReagentType by name: {rt_name}")
lookedup = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() lookedup = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==rt_name).first()
logger.debug(f"Found ReagentType: {lookedup}") logger.debug(f"Found ReagentType: {lookedup}")
return lookedup return lookedup
@@ -302,7 +310,7 @@ def lookup_kittype_by_name(ctx:Settings, name:str|dict) -> models.KitType:
Args: Args:
ctx (Settings): settings object passed from bui ctx (Settings): settings object passed from bui
name (str): name of kit to query name (str|dict): name of kit to query, or parsed object containing value=name
Returns: Returns:
models.KitType: retrieved kittype models.KitType: retrieved kittype
@@ -989,25 +997,6 @@ def lookup_reagent(ctx:Settings, reagent_lot:str, type_name:str|None=None) -> mo
# return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() # return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
return ctx.database_session.query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() return ctx.database_session.query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first()
def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str) -> models.Reagent:
"""
Look up the last used reagent of the reagent type
Args:
ctx (Settings): Settings object passed down from gui
type_name (str): Name of reagent type
Returns:
models.Reagent: Reagent object with last used lot.
"""
# rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first()
rt = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==type_name).first()
logger.debug(f"Reagent type looked up for {type_name}: {rt.__str__()}")
try:
return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name)
except AttributeError:
return None
def check_kit_integrity(sub:models.BasicSubmission|models.KitType, reagenttypes:list|None=None) -> dict|None: def check_kit_integrity(sub:models.BasicSubmission|models.KitType, reagenttypes:list|None=None) -> dict|None:
""" """
Ensures all reagents expected in kit are listed in Submission Ensures all reagents expected in kit are listed in Submission
@@ -1120,7 +1109,7 @@ def lookup_subsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str,
.filter(models.BasicSample.submitter_id==rsl_sample_num)\ .filter(models.BasicSample.submitter_id==rsl_sample_num)\
.first() .first()
def lookup_sub_wwsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str, rsl_sample_num:str) -> models.WastewaterAssociation: def lookup_sub_samp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str|models.BasicSample, rsl_sample_num:str|models.BasicSubmission) -> models.WastewaterAssociation:
""" """
_summary_ _summary_
@@ -1132,12 +1121,36 @@ def lookup_sub_wwsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:st
Returns: Returns:
models.SubmissionSampleAssociation: _description_ models.SubmissionSampleAssociation: _description_
""" """
return ctx.database_session.query(models.WastewaterAssociation)\ # logger.debug(f"{type(rsl_plate_num)}, {type(rsl_sample_num)}")
.join(models.Wastewater)\ match rsl_plate_num:
.join(models.WastewaterSample)\ case models.BasicSubmission()|models.Wastewater():
.filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)\ # logger.debug(f"Model for rsl_plate_num: {rsl_plate_num}")
.filter(models.BasicSample.submitter_id==rsl_sample_num)\ first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.first() .filter(models.SubmissionSampleAssociation.submission==rsl_plate_num)
case str():
# logger.debug(f"String for rsl_plate_num: {rsl_plate_num}")
first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\
.join(models.BasicSubmission)\
.filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)
case _:
logger.error(f"Unknown case for rsl_plate_num {rsl_plate_num}")
match rsl_sample_num:
case models.BasicSample()|models.WastewaterSample():
# logger.debug(f"Model for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
# case models.WastewaterSample:
# second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num)
case str():
# logger.debug(f"String for rsl_sample_num: {rsl_sample_num}")
second_query = first_query.join(models.BasicSample)\
.filter(models.BasicSample.submitter_id==rsl_sample_num)
case _:
logger.error(f"Unknown case for rsl_sample_num {rsl_sample_num}")
try:
return second_query.first()
except UnboundLocalError:
logger.error(f"Couldn't construct second query")
return None
def lookup_all_reagent_names_by_role(ctx:Settings, role_name:str) -> List[str]: def lookup_all_reagent_names_by_role(ctx:Settings, role_name:str) -> List[str]:
""" """
@@ -1183,7 +1196,7 @@ def add_reagenttype_to_kit(ctx:Settings, rt_name:str, kit_name:str, eol:int=0):
kit = lookup_kittype_by_name(ctx=ctx, name=kit_name) kit = lookup_kittype_by_name(ctx=ctx, name=kit_name)
rt = lookup_reagenttype_by_name(ctx=ctx, rt_name=rt_name) rt = lookup_reagenttype_by_name(ctx=ctx, rt_name=rt_name)
if rt == None: if rt == None:
rt = models.ReagentType(name=rt_name.strip(), eol_ext=timedelta(30*eol), last_used="") rt = models.ReagentType(name=rt_name.strip(), eol_ext=timedelta(30*eol))
ctx.database_session.add(rt) ctx.database_session.add(rt)
assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses={}) assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses={})
kit.kit_reagenttype_associations.append(assoc) kit.kit_reagenttype_associations.append(assoc)
@@ -1204,3 +1217,68 @@ def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission
logger.error(f"Can't set {k} to {v}") logger.error(f"Can't set {k} to {v}")
ctx.database_session.add(assoc) ctx.database_session.add(assoc)
ctx.database_session.commit() ctx.database_session.commit()
def lookup_ww_sample_by_processing_number(ctx:Settings, processing_number:str):
return ctx.database_session.query(models.WastewaterSample).filter(models.WastewaterSample.ww_processing_num==processing_number).first()
def lookup_kitreagentassoc_by_kit_and_reagent(ctx:Settings, kit:models.KitType|str, reagent_type:models.ReagentType|str) -> models.KitTypeReagentTypeAssociation:
"""
_summary_
Args:
ctx (Settings): _description_
kit (models.KitType | str): _description_
reagent_type (models.ReagentType | str): _description_
Returns:
models.KitTypeReagentTypeAssociation: _description_
"""
base_query = ctx.database_session.query(models.KitTypeReagentTypeAssociation)
match kit:
case models.KitType():
query1 = base_query.filter(models.KitTypeReagentTypeAssociation.kit_type==kit)
case str():
query1 = base_query.join(models.KitType).filter(models.KitType.name==kit)
case _:
query1 = base_query
match reagent_type:
case models.ReagentType():
query2 = query1.filter(models.KitTypeReagentTypeAssociation.reagent_type==reagent_type)
case str():
query2 = query1.join(models.ReagentType).filter(models.ReagentType.name==reagent_type)
case _:
query2 = query1
return query2.first()
def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str, extraction_kit:str|None=None) -> models.Reagent:
"""
Look up the last used reagent of the reagent type
Args:
ctx (Settings): Settings object passed down from gui
type_name (str): Name of reagent type
Returns:
models.Reagent: Reagent object with last used lot.
"""
assoc = lookup_kitreagentassoc_by_kit_and_reagent(ctx=ctx, kit=extraction_kit, reagent_type=type_name)
return lookup_reagent(ctx=ctx, reagent_lot=assoc.last_used)
def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType):
"""
_summary_
Args:
ctx (Settings): _description_
reagent (models.ReagentType): _description_
reagent_lot (str): _description_
"""
rt = list(set(reagent.type).intersection(kit.reagent_types))[0]
if rt != None:
assoc = lookup_kitreagentassoc_by_kit_and_reagent(ctx=ctx, kit=kit, reagent_type=rt)
if assoc != None:
if assoc.last_used != reagent.lot:
logger.debug(f"Updating {assoc} last used to {reagent.lot}")
assoc.last_used = reagent.lot
ctx.database_session.merge(assoc)
ctx.database_session.commit()

View File

@@ -97,39 +97,7 @@ class KitType(Base):
map['info'] = {} map['info'] = {}
return map return map
class KitTypeReagentTypeAssociation(Base):
"""
table containing reagenttype/kittype associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
__tablename__ = "_reagenttypes_kittypes"
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
uses = Column(JSON)
required = Column(INTEGER)
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations")
# reference to the "ReagentType" object
reagent_type = relationship("ReagentType")
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
self.kit_type = kit_type
self.reagent_type = reagent_type
self.uses = uses
self.required = required
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
@validates('reagenttype')
def validate_reagenttype(self, key, value):
if not isinstance(value, ReagentType):
raise ValueError(f'{value} is not a reagenttype')
return value
class ReagentType(Base): class ReagentType(Base):
""" """
@@ -141,7 +109,16 @@ class ReagentType(Base):
name = Column(String(64)) #: name of reagent type name = Column(String(64)) #: name of reagent type
instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval eol_ext = Column(Interval()) #: extension of life interval
last_used = Column(String(32)) #: last used lot number of this type of reagent
reagenttype_kit_associations = relationship(
"KitTypeReagentTypeAssociation",
back_populates="reagent_type",
cascade="all, delete-orphan",
)
# association proxy of "user_keyword_associations" collection
# to "keyword" attribute
kit_types = association_proxy("kit_reagenttype_associations", "kit_type")
@validates('required') @validates('required')
def validate_age(self, key, value): def validate_age(self, key, value):
@@ -161,6 +138,44 @@ class ReagentType(Base):
def __repr__(self): def __repr__(self):
return f"ReagentType({self.name})" return f"ReagentType({self.name})"
class KitTypeReagentTypeAssociation(Base):
"""
table containing reagenttype/kittype associations
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
__tablename__ = "_reagenttypes_kittypes"
reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True)
kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True)
uses = Column(JSON)
required = Column(INTEGER)
last_used = Column(String(32)) #: last used lot number of this type of reagent
kit_type = relationship(KitType, back_populates="kit_reagenttype_associations")
# reference to the "ReagentType" object
reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations")
def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1):
self.kit_type = kit_type
self.reagent_type = reagent_type
self.uses = uses
self.required = required
def __repr__(self) -> str:
return f"<KitTypeReagentTypeAssociation({self.kit_type} & {self.reagent_type})>"
@validates('required')
def validate_age(self, key, value):
if not 0 <= value < 2:
raise ValueError(f'Invalid required value {value}. Must be 0 or 1.')
return value
@validates('reagenttype')
def validate_reagenttype(self, key, value):
if not isinstance(value, ReagentType):
raise ValueError(f'{value} is not a reagenttype')
return value
class Reagent(Base): class Reagent(Base):
""" """
Concrete reagent instance Concrete reagent instance
@@ -247,11 +262,13 @@ class Reagent(Base):
except AttributeError: except AttributeError:
rtype = "Unknown" rtype = "Unknown"
return { return {
"name":self.name,
"type": rtype, "type": rtype,
"lot": self.lot, "lot": self.lot,
"expiry": self.expiry.strftime("%Y-%m-%d") "expiry": self.expiry.strftime("%Y-%m-%d")
} }
class Discount(Base): class Discount(Base):
""" """
Relationship table for client labs for certain kits. Relationship table for client labs for certain kits.
@@ -266,6 +283,9 @@ class Discount(Base):
name = Column(String(128)) name = Column(String(128))
amount = Column(FLOAT(2)) amount = Column(FLOAT(2))
def __repr__(self) -> str:
return f"<Discount({self.name})>"
class SubmissionType(Base): class SubmissionType(Base):
""" """
Abstract of types of submissions. Abstract of types of submissions.

View File

@@ -47,3 +47,6 @@ class Contact(Base):
phone = Column(String(32)) #: contact phone number phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization
def __repr__(self) -> str:
return f"<Contact({self.name})>"

View File

@@ -13,8 +13,6 @@ from sqlalchemy.ext.associationproxy import association_proxy
import uuid import uuid
from pandas import Timestamp from pandas import Timestamp
from dateutil.parser import parse from dateutil.parser import parse
import pprint
from tools import check_not_nan
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -348,7 +346,7 @@ class BasicSample(Base):
return value return value
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<{self.sample_type.replace('_', ' ').title(). replace(' ', '')}({self.submitter_id})>" return f"<{self.sample_type.replace('_', ' ').title().replace(' ', '')}({self.submitter_id})>"
def set_attribute(self, name, value): def set_attribute(self, name, value):
# logger.debug(f"Setting {name} to {value}") # logger.debug(f"Setting {name} to {value}")
@@ -417,56 +415,36 @@ class WastewaterSample(BasicSample):
logger.debug(f"Validating {key}: {value}") logger.debug(f"Validating {key}: {value}")
return value or self.submitter_id return value or self.submitter_id
# def __init__(self, **kwargs):
# # Had a problem getting collection date from excel as text only.
# if 'collection_date' in kwargs.keys():
# logger.debug(f"Got collection_date: {kwargs['collection_date']}. Attempting parse.")
# if isinstance(kwargs['collection_date'], str):
# logger.debug(f"collection_date is a string...")
# kwargs['collection_date'] = parse(kwargs['collection_date'])
# logger.debug(f"output is {kwargs['collection_date']}")
# # Due to the plate map being populated with RSL numbers, we have to do some shuffling.
# try:
# kwargs['rsl_number'] = kwargs['submitter_id']
# except KeyError as e:
# logger.error(f"Error using {kwargs} for submitter_id")
# try:
# check = check_not_nan(kwargs['ww_full_sample_id'])
# except KeyError:
# logger.error(f"Error using {kwargs} for ww_full_sample_id")
# check = False
# if check:
# kwargs['submitter_id'] = kwargs["ww_full_sample_id"]
# super().__init__(**kwargs)
def set_attribute(self, name:str, value): def set_attribute(self, name:str, value):
""" """
Set an attribute of this object. Extends parent. Set an attribute of this object. Extends parent.
Args: Args:
name (str): _description_ name (str): name of the attribute
value (_type_): _description_ value (_type_): value to be set
""" """
# Due to the plate map being populated with RSL numbers, we have to do some shuffling. # Due to the plate map being populated with RSL numbers, we have to do some shuffling.
# logger.debug(f"Input - {name}:{value}")
match name: match name:
case "submitter_id": case "submitter_id":
# If submitter_id already has a value, stop
if self.submitter_id != None: if self.submitter_id != None:
return return
# otherwise also set rsl_number to the same value
else: else:
super().set_attribute("rsl_number", value) super().set_attribute("rsl_number", value)
case "ww_full_sample_id": case "ww_full_sample_id":
# If value present, set ww_full_sample_id and make this the submitter_id
if value != None: if value != None:
super().set_attribute(name, value) super().set_attribute(name, value)
name = "submitter_id" name = "submitter_id"
case 'collection_date': case 'collection_date':
# If this is a string use dateutils to parse into date()
if isinstance(value, str): if isinstance(value, str):
logger.debug(f"collection_date {value} is a string. Attempting parse...") logger.debug(f"collection_date {value} is a string. Attempting parse...")
value = parse(value) value = parse(value)
case "rsl_number": case "rsl_number":
if value == None: if value == None:
value = self.submitter_id value = self.submitter_id
# logger.debug(f"Output - {name}:{value}")
super().set_attribute(name, value) super().set_attribute(name, value)

View File

@@ -14,7 +14,7 @@ import re
import numpy as np import numpy as np
from datetime import date from datetime import date
from dateutil.parser import parse, ParserError from dateutil.parser import parse, ParserError
from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings, convert_well_to_row_column
from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -48,13 +48,11 @@ class SheetParser(object):
# make decision about type of sample we have # make decision about type of sample we have
self.sub['submission_type'] = self.type_decider() self.sub['submission_type'] = self.type_decider()
# # grab the info map from the submission type in database # # grab the info map from the submission type in database
# self.info_map = self.fetch_kit_info_map()
self.parse_info() self.parse_info()
self.import_kit_validation_check() self.import_kit_validation_check()
self.parse_reagents() self.parse_reagents()
self.import_reagent_validation_check() self.import_reagent_validation_check()
self.parse_samples() self.parse_samples()
# self.sub['sample_count'] = len(self.sub['samples'])
def type_decider(self) -> str: def type_decider(self) -> str:
@@ -91,7 +89,7 @@ class SheetParser(object):
def parse_info(self): def parse_info(self):
""" """
_summary_ Pulls basic information from the excel sheet
""" """
info = InfoParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_info() info = InfoParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_info()
parser_query = f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}" parser_query = f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}"
@@ -101,14 +99,23 @@ class SheetParser(object):
except AttributeError: except AttributeError:
logger.error(f"Couldn't find submission parser: {parser_query}") logger.error(f"Couldn't find submission parser: {parser_query}")
for k,v in info.items(): for k,v in info.items():
if k != "sample": match k:
case "sample":
pass
case _:
self.sub[k] = v self.sub[k] = v
logger.debug(f"Parser.sub after info scrape: {pprint.pformat(self.sub)}") logger.debug(f"Parser.sub after info scrape: {pprint.pformat(self.sub)}")
def parse_reagents(self): def parse_reagents(self):
"""
Pulls reagent info from the excel sheet
"""
self.sub['reagents'] = ReagentParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=self.sub['extraction_kit']).parse_reagents() self.sub['reagents'] = ReagentParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=self.sub['extraction_kit']).parse_reagents()
def parse_samples(self): def parse_samples(self):
"""
Pulls sample info from the excel sheet
"""
self.sample_result, self.sub['samples'] = SampleParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_samples() self.sample_result, self.sub['samples'] = SampleParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_samples()
def parse_bacterial_culture(self, input_dict) -> dict: def parse_bacterial_culture(self, input_dict) -> dict:
@@ -159,7 +166,7 @@ class SheetParser(object):
Returns: Returns:
List[PydReagent]: List of reagents List[PydReagent]: List of reagents
""" """
if not check_not_nan(self.sub['extraction_kit']): if not check_not_nan(self.sub['extraction_kit']['value']):
dlg = KitSelector(ctx=self.ctx, title="Kit Needed", message="At minimum a kit is needed. Please select one.") dlg = KitSelector(ctx=self.ctx, title="Kit Needed", message="At minimum a kit is needed. Please select one.")
if dlg.exec(): if dlg.exec():
self.sub['extraction_kit'] = dict(value=dlg.getValues(), parsed=False) self.sub['extraction_kit'] = dict(value=dlg.getValues(), parsed=False)
@@ -197,7 +204,16 @@ class InfoParser(object):
self.xl = xl self.xl = xl
logger.debug(f"Info map for InfoParser: {pprint.pformat(self.map)}") logger.debug(f"Info map for InfoParser: {pprint.pformat(self.map)}")
def fetch_submission_info_map(self, submission_type:dict) -> dict: def fetch_submission_info_map(self, submission_type:str|dict) -> dict:
"""
Gets location of basic info from the submission_type object in the database.
Args:
submission_type (str|dict): name of the submission type or parsed object with value=submission_type
Returns:
dict: Location map of all info for this submission type
"""
if isinstance(submission_type, str): if isinstance(submission_type, str):
submission_type = dict(value=submission_type, parsed=False) submission_type = dict(value=submission_type, parsed=False)
logger.debug(f"Looking up submission type: {submission_type['value']}") logger.debug(f"Looking up submission type: {submission_type['value']}")
@@ -206,6 +222,12 @@ class InfoParser(object):
return info_map return info_map
def parse_info(self) -> dict: def parse_info(self) -> dict:
"""
Pulls basic info from the excel sheet.
Returns:
dict: key:value of basic info
"""
dicto = {} dicto = {}
for sheet in self.xl.sheet_names: for sheet in self.xl.sheet_names:
df = self.xl.parse(sheet, header=None) df = self.xl.parse(sheet, header=None)
@@ -302,6 +324,8 @@ class SampleParser(object):
sample_info_map = self.fetch_sample_info_map(submission_type=submission_type) sample_info_map = self.fetch_sample_info_map(submission_type=submission_type)
self.plate_map = self.construct_plate_map(plate_map_location=sample_info_map['plate_map']) self.plate_map = self.construct_plate_map(plate_map_location=sample_info_map['plate_map'])
self.lookup_table = self.construct_lookup_table(lookup_table_location=sample_info_map['lookup_table']) self.lookup_table = self.construct_lookup_table(lookup_table_location=sample_info_map['lookup_table'])
if "plates" in sample_info_map:
self.plates = sample_info_map['plates']
self.excel_to_db_map = sample_info_map['xl_db_translation'] self.excel_to_db_map = sample_info_map['xl_db_translation']
self.create_basic_dictionaries_from_plate_map() self.create_basic_dictionaries_from_plate_map()
if isinstance(self.lookup_table, pd.DataFrame): if isinstance(self.lookup_table, pd.DataFrame):
@@ -383,7 +407,7 @@ class SampleParser(object):
sample[k] = v sample[k] = v
logger.debug(f"Output sample dict: {sample}") logger.debug(f"Output sample dict: {sample}")
def parse_samples(self) -> List[dict]: def parse_samples(self, generate:bool=True) -> List[dict]:
result = None result = None
new_samples = [] new_samples = []
for ii, sample in enumerate(self.samples): for ii, sample in enumerate(self.samples):
@@ -414,7 +438,10 @@ class SampleParser(object):
translated_dict = custom_parser(translated_dict) translated_dict = custom_parser(translated_dict)
except AttributeError: except AttributeError:
logger.error(f"Couldn't get custom parser: {parser_query}") logger.error(f"Couldn't get custom parser: {parser_query}")
if generate:
new_samples.append(self.generate_sample_object(translated_dict)) new_samples.append(self.generate_sample_object(translated_dict))
else:
new_samples.append(translated_dict)
return result, new_samples return result, new_samples
def generate_sample_object(self, input_dict) -> models.BasicSample: def generate_sample_object(self, input_dict) -> models.BasicSample:
@@ -464,6 +491,7 @@ class SampleParser(object):
dict: Updated sample dictionary dict: Updated sample dictionary
""" """
logger.debug(f"Called wastewater sample parser") logger.debug(f"Called wastewater sample parser")
return input_dict
def parse_wastewater_artic_sample(self, input_dict:dict) -> dict: def parse_wastewater_artic_sample(self, input_dict:dict) -> dict:
""" """
@@ -482,6 +510,24 @@ class SampleParser(object):
input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip() input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip()
return input_dict return input_dict
def parse_first_strand_sample(self, input_dict:dict) -> dict:
logger.debug("Called first strand sample parser")
input_dict['well'] = re.search(r"\s\((.*)\)$", input_dict['submitter_id']).groups()[0]
input_dict['submitter_id'] = re.sub(r"\s\(.*\)$", "", str(input_dict['submitter_id'])).strip()
return input_dict
def grab_plates(self):
plates = []
for plate in self.plates:
df = self.xl.parse(plate['sheet'], header=None)
if isinstance(df.iat[plate['row']-1, plate['column']-1], str):
output = RSLNamer(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]).parsed_name
else:
continue
plates.append(output)
return plates
class PCRParser(object): class PCRParser(object):
""" """
Object to pull data from Design and Analysis PCR export file. Object to pull data from Design and Analysis PCR export file.

View File

@@ -200,18 +200,26 @@ class PydSubmission(BaseModel, extra=Extra.allow):
@field_validator("extraction_kit", mode='before') @field_validator("extraction_kit", mode='before')
@classmethod @classmethod
def rescue_kit(cls, value): def rescue_kit(cls, value):
# from frontend.custom_widgets.pop_ups import KitSelector
# if check_not_nan(value): if check_not_nan(value):
# if isinstance(value, str): if isinstance(value, str):
# return dict(value=value, parsed=True) return dict(value=value, parsed=True)
# elif isinstance(value, dict): elif isinstance(value, dict):
# return value return value
# else: else:
# raise ValueError(f"No extraction kit found.") raise ValueError(f"No extraction kit found.")
if value == None: if value == None:
return dict(value=None, parsed=False) return dict(value=None, parsed=False)
return value return value
# @field_validator("extraction_kit")
# @classmethod
# def enforce_kit(cls, value, values):
# from frontend.custom_widgets.pop_ups import KitSelector
# if value['value'] == None:
# return dict(value=KitSelector(values.data['ctx'], title="Select Extraction Kit", message="No extraction kit was found, please select from below."))
# return value
@field_validator("submission_type", mode='before') @field_validator("submission_type", mode='before')
@classmethod @classmethod
def make_submission_type(cls, value, values): def make_submission_type(cls, value, values):

View File

@@ -64,6 +64,7 @@ class App(QMainWindow):
fileMenu = menuBar.addMenu("&File") fileMenu = menuBar.addMenu("&File")
# Creating menus using a title # Creating menus using a title
editMenu = menuBar.addMenu("&Edit") editMenu = menuBar.addMenu("&Edit")
methodsMenu = menuBar.addMenu("&Methods")
reportMenu = menuBar.addMenu("&Reports") reportMenu = menuBar.addMenu("&Reports")
maintenanceMenu = menuBar.addMenu("&Monthly") maintenanceMenu = menuBar.addMenu("&Monthly")
helpMenu = menuBar.addMenu("&Help") helpMenu = menuBar.addMenu("&Help")
@@ -71,6 +72,7 @@ class App(QMainWindow):
helpMenu.addAction(self.docsAction) helpMenu.addAction(self.docsAction)
fileMenu.addAction(self.importAction) fileMenu.addAction(self.importAction)
fileMenu.addAction(self.importPCRAction) fileMenu.addAction(self.importPCRAction)
methodsMenu.addAction(self.constructFS)
reportMenu.addAction(self.generateReportAction) reportMenu.addAction(self.generateReportAction)
maintenanceMenu.addAction(self.joinControlsAction) maintenanceMenu.addAction(self.joinControlsAction)
maintenanceMenu.addAction(self.joinExtractionAction) maintenanceMenu.addAction(self.joinExtractionAction)
@@ -103,6 +105,7 @@ class App(QMainWindow):
self.joinPCRAction = QAction("Link PCR Logs") self.joinPCRAction = QAction("Link PCR Logs")
self.helpAction = QAction("&About", self) self.helpAction = QAction("&About", self)
self.docsAction = QAction("&Docs", self) self.docsAction = QAction("&Docs", self)
self.constructFS = QAction("Make First Strand", self)
def _connectActions(self): def _connectActions(self):
@@ -125,6 +128,7 @@ class App(QMainWindow):
self.joinPCRAction.triggered.connect(self.linkPCR) self.joinPCRAction.triggered.connect(self.linkPCR)
self.helpAction.triggered.connect(self.showAbout) self.helpAction.triggered.connect(self.showAbout)
self.docsAction.triggered.connect(self.openDocs) self.docsAction.triggered.connect(self.openDocs)
self.constructFS.triggered.connect(self.construct_first_strand)
def showAbout(self): def showAbout(self):
""" """
@@ -290,6 +294,14 @@ class App(QMainWindow):
self, result = import_pcr_results_function(self) self, result = import_pcr_results_function(self)
self.result_reporter(result) self.result_reporter(result)
def construct_first_strand(self):
"""
Converts first strand excel sheet to Biomek CSV
"""
from .main_window_functions import construct_first_strand_function
self, result = construct_first_strand_function(self)
self.result_reporter(result)
class AddSubForm(QWidget): class AddSubForm(QWidget):
def __init__(self, parent): def __init__(self, parent):

View File

@@ -31,7 +31,7 @@ class AddReagentForm(QDialog):
super().__init__() super().__init__()
self.ctx = ctx self.ctx = ctx
if reagent_lot == None: if reagent_lot == None:
reagent_lot = "" reagent_lot = reagent_type
self.setWindowTitle("Add Reagent") self.setWindowTitle("Add Reagent")
@@ -257,7 +257,7 @@ class ControlsDatePicker(QWidget):
class ImportReagent(QComboBox): class ImportReagent(QComboBox):
def __init__(self, ctx:dict, reagent:PydReagent): def __init__(self, ctx:dict, reagent:PydReagent, extraction_kit:str):
super().__init__() super().__init__()
self.setEditable(True) self.setEditable(True)
# Ensure that all reagenttypes have a name that matches the items in the excel parser # Ensure that all reagenttypes have a name that matches the items in the excel parser
@@ -289,7 +289,7 @@ class ImportReagent(QComboBox):
relevant_reagents.insert(0, str(reagent.lot)) relevant_reagents.insert(0, str(reagent.lot))
else: else:
# TODO: look up the last used reagent of this type in the database # TODO: look up the last used reagent of this type in the database
looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type) looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type, extraction_kit=extraction_kit)
logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}") logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}")
if looked_up_reg != None: if looked_up_reg != None:
relevant_reagents.remove(str(looked_up_reg.lot)) relevant_reagents.remove(str(looked_up_reg.lot))

View File

@@ -26,12 +26,13 @@ from backend.db.functions import (
construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range, construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range,
create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type, create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type,
lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_subsampassoc_with_pcr, lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_subsampassoc_with_pcr,
check_kit_integrity check_kit_integrity, lookup_sub_samp_association_by_plate_sample, lookup_ww_sample_by_processing_number,
lookup_sample_by_submitter_id, update_last_used
) )
from backend.excel.parser import SheetParser, PCRParser from backend.excel.parser import SheetParser, PCRParser, SampleParser
from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df
from backend.pydant import PydReagent from backend.pydant import PydReagent
from tools import check_not_nan from tools import check_not_nan, convert_well_to_row_column
from .custom_widgets.pop_ups import AlertPop, QuestionAsker from .custom_widgets.pop_ups import AlertPop, QuestionAsker
from .custom_widgets import ReportDatePicker from .custom_widgets import ReportDatePicker
from .custom_widgets.misc import ImportReagent, ParsedQLabel from .custom_widgets.misc import ImportReagent, ParsedQLabel
@@ -182,7 +183,7 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# reg_label.setObjectName(f"lot_{reagent['type']}_label") # reg_label.setObjectName(f"lot_{reagent['type']}_label")
reg_label.setObjectName(f"lot_{reagent['value'].type}_label") reg_label.setObjectName(f"lot_{reagent['value'].type}_label")
# create reagent choice widget # create reagent choice widget
add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent['value']) add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent['value'], extraction_kit=pyd.extraction_kit['value'])
add_widget.setObjectName(f"lot_{reagent['value'].type}") add_widget.setObjectName(f"lot_{reagent['value'].type}")
logger.debug(f"Widget name set to: {add_widget.objectName()}") logger.debug(f"Widget name set to: {add_widget.objectName()}")
obj.table_widget.formlayout.addWidget(reg_label) obj.table_widget.formlayout.addWidget(reg_label)
@@ -277,7 +278,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> Tuple[QMainWindow, dic
for item in obj.missing_reagents: for item in obj.missing_reagents:
obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':False}, item.type, title=False)) obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':False}, item.type, title=False))
reagent = dict(type=item.type, lot=None, exp=date.today(), name=None) reagent = dict(type=item.type, lot=None, exp=date.today(), name=None)
add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent))#item=item) add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent), extraction_kit=obj.ext_kit)#item=item)
obj.table_widget.formlayout.addWidget(add_widget) obj.table_widget.formlayout.addWidget(add_widget)
submit_btn = QPushButton("Submit") submit_btn = QPushButton("Submit")
submit_btn.setObjectName("lot_submit_btn") submit_btn.setObjectName("lot_submit_btn")
@@ -310,7 +311,6 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
# Lookup any existing reagent of this type with this lot number # Lookup any existing reagent of this type with this lot number
wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent) wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent)
logger.debug(f"Looked up reagent: {wanted_reagent}") logger.debug(f"Looked up reagent: {wanted_reagent}")
# logger.debug(f"\n\nLooking for {reagent} in {obj.reagents}\n\n")
# if reagent not found offer to add to database # if reagent not found offer to add to database
if wanted_reagent == None: if wanted_reagent == None:
r_lot = reagents[reagent] r_lot = reagents[reagent]
@@ -328,15 +328,11 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
else: else:
# In this case we will have an empty reagent and the submission will fail kit integrity check # In this case we will have an empty reagent and the submission will fail kit integrity check
logger.debug("Will not add reagent.") logger.debug("Will not add reagent.")
# obj.ctx.database_session.rollback()
return obj, dict(message="Failed integrity check", status="critical") return obj, dict(message="Failed integrity check", status="critical")
# if wanted_reagent != None:
parsed_reagents.append(wanted_reagent) parsed_reagents.append(wanted_reagent)
wanted_reagent.type.last_used = reagents[reagent]
# move samples into preliminary submission dict # move samples into preliminary submission dict
info['samples'] = obj.samples info['samples'] = obj.samples
info['uploaded_by'] = getuser() info['uploaded_by'] = getuser()
# info['columns'] = obj.column_count
# construct submission object # construct submission object
logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") logger.debug(f"Here is the info_dict: {pprint.pformat(info)}")
base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info) base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info)
@@ -359,6 +355,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
# add reagents to submission object # add reagents to submission object
for reagent in parsed_reagents: for reagent in parsed_reagents:
base_submission.reagents.append(reagent) base_submission.reagents.append(reagent)
update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit)
logger.debug(f"Parsed reagents: {pprint.pformat(parsed_reagents)}") logger.debug(f"Parsed reagents: {pprint.pformat(parsed_reagents)}")
logger.debug("Checking kit integrity...") logger.debug("Checking kit integrity...")
kit_integrity = check_kit_integrity(base_submission) kit_integrity = check_kit_integrity(base_submission)
@@ -377,12 +374,8 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.") logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.")
extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit) extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit)
logger.debug(f"We have the extraction kit: {extraction_kit.name}") logger.debug(f"We have the extraction kit: {extraction_kit.name}")
# TODO replace below with function in KitType object. Update Kittype associations.
# excel_map = extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')]
excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type) excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type)
logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}") logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}")
# excel_map.update(extraction_kit.used_for[obj.current_submission_type.replace('_', ' ').title()])
input_reagents = [item.to_reagent_dict(extraction_kit=base_submission.extraction_kit) for item in parsed_reagents] input_reagents = [item.to_reagent_dict(extraction_kit=base_submission.extraction_kit) for item in parsed_reagents]
logger.debug(f"Parsed reagents going into autofile: {pprint.pformat(input_reagents)}") logger.debug(f"Parsed reagents going into autofile: {pprint.pformat(input_reagents)}")
autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info, missing_info=obj.missing_info) autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info, missing_info=obj.missing_info)
@@ -881,17 +874,8 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
# pare down reagents to only what's missing # pare down reagents to only what's missing
logger.debug(f"Checking {[item['type'] for item in reagents]} against {[reagent.type for reagent in missing_reagents]}") logger.debug(f"Checking {[item['type'] for item in reagents]} against {[reagent.type for reagent in missing_reagents]}")
relevant_reagents = [item for item in reagents if item['type'] in [reagent.type for reagent in missing_reagents]] relevant_reagents = [item for item in reagents if item['type'] in [reagent.type for reagent in missing_reagents]]
# relevant_reagents = []
# for item in reagents:
# logger.debug(f"Checking {item['type']} in {[reagent.type for reagent in missing_reagents]}")
# if item['type'] in [reagent.type for reagent in missing_reagents]:
# logger.debug("Hit!")
# relevant_reagents.append(item)
# else:
# logger.debug('Miss.')
logger.debug(f"Here are the relevant reagents: {pprint.pformat(relevant_reagents)}") logger.debug(f"Here are the relevant reagents: {pprint.pformat(relevant_reagents)}")
# hacky manipulation of submission type so it looks better. # hacky manipulation of submission type so it looks better.
# info['submission_type'] = info['submission_type'].replace("_", " ").title()
# pare down info to just what's missing # pare down info to just what's missing
relevant_info_map = {k:v for k,v in xl_map['info'].items() if k in missing_info and k != 'samples'} relevant_info_map = {k:v for k,v in xl_map['info'].items() if k in missing_info and k != 'samples'}
relevant_info = {k:v for k,v in info.items() if k in missing_info} relevant_info = {k:v for k,v in info.items() if k in missing_info}
@@ -910,9 +894,9 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
# name is only present for Bacterial Culture # name is only present for Bacterial Culture
try: try:
new_reagent['name'] = relevant_reagent_map[new_reagent['type']]['name'] new_reagent['name'] = relevant_reagent_map[new_reagent['type']]['name']
new_reagent['name']['value'] = reagent['type'] new_reagent['name']['value'] = reagent['name']
except: except Exception as e:
pass logger.error(f"Couldn't get name due to {e}")
new_reagents.append(new_reagent) new_reagents.append(new_reagent)
# construct new info objects to put into excel sheets # construct new info objects to put into excel sheets
new_info = [] new_info = []
@@ -936,21 +920,98 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
# Get relevant reagents for that sheet # Get relevant reagents for that sheet
sheet_reagents = [item for item in new_reagents if sheet in item['sheet']] sheet_reagents = [item for item in new_reagents if sheet in item['sheet']]
for reagent in sheet_reagents: for reagent in sheet_reagents:
logger.debug(f"Attempting: {reagent['type']}:") logger.debug(f"Attempting to write lot {reagent['lot']['value']} in: row {reagent['lot']['row']}, column {reagent['lot']['column']}")
worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value']) worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value'])
logger.debug(f"Attempting to write expiry {reagent['expiry']['value']} in: row {reagent['expiry']['row']}, column {reagent['expiry']['column']}")
worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value']) worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value'])
try: try:
worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value'].replace("_", " ").upper()) logger.debug(f"Attempting to write name {reagent['name']['value']} in: row {reagent['name']['row']}, column {reagent['name']['column']}")
except: worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value'])
pass except Exception as e:
logger.error(f"Could not write name {reagent['name']['value']} due to {e}")
# Get relevant info for that sheet # Get relevant info for that sheet
sheet_info = [item for item in new_info if sheet in item['location']['sheets']] sheet_info = [item for item in new_info if sheet in item['location']['sheets']]
for item in sheet_info: for item in sheet_info:
logger.debug(f"Attempting: {item['type']}") logger.debug(f"Attempting: {item['type']}")
worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value'])
# Hacky way to # Hacky way to pop in 'signed by'
if info['submission_type'] == "Bacterial Culture": if info['submission_type'] == "Bacterial Culture":
workbook["Sample List"].cell(row=14, column=2, value=getuser()[0:2].upper()) workbook["Sample List"].cell(row=14, column=2, value=getuser()[0:2].upper())
fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx")
workbook.save(filename=fname.__str__()) workbook.save(filename=fname.__str__())
def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]:
logger.debug(f"Looking up {input_sample_number} in {plates}")
samp = lookup_ww_sample_by_processing_number(ctx=obj.ctx, processing_number=input_sample_number)
if samp == None:
samp = lookup_sample_by_submitter_id(ctx=obj.ctx, submitter_id=input_sample_number)
logger.debug(f"Got sample: {samp}")
# if samp != None:
new_plates = [(iii+1, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate))) for iii, plate in enumerate(plates)]
# for iii, plate in enumerate(plates):
# lplate = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate)
# if lplate == None:
# continue
# else:
# logger.debug(f"Got a plate: {lplate}")
# new_plates.append((iii, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lplate)))
logger.debug(f"Associations: {pprint.pformat(new_plates)}")
try:
plate_num, plate = next(assoc for assoc in new_plates if assoc[1] is not None)
except StopIteration:
plate_num, plate = None, None
logger.debug(f"Plate number {plate_num} is {plate}")
return plate_num, plate
fname = select_open_file(obj=obj, file_extension="xlsx")
xl = pd.ExcelFile(fname)
sprsr = SampleParser(ctx=obj.ctx, xl=xl, submission_type="First Strand")
_, samples = sprsr.parse_samples(generate=False)
plates = sprsr.grab_plates()
output_samples = []
logger.debug(f"Samples: {pprint.pformat(samples)}")
old_plate_number = 1
for item in samples:
new_dict = {}
new_dict['sample'] = item['submitter_id']
if item['submitter_id'] == "NTC1":
new_dict['destination_row'] = 8
new_dict['destination_column'] = 2
new_dict['plate_number'] = 'control'
elif item['submitter_id'] == "NTC2":
new_dict['destination_row'] = 8
new_dict['destination_column'] = 5
new_dict['plate_number'] = 'control'
else:
new_dict['destination_row'] = item['row']
new_dict['destination_column'] = item['column']
# assocs = [(iii, lookup_ww_sample_by_processing_number_and_plate(ctx=obj.ctx, processing_number=new_dict['sample'], plate_number=plate)) for iii, plate in enumerate(plates)]
plate_num, plate = get_plates(input_sample_number=new_dict['sample'], plates=plates)
if plate_num == None:
plate_num = str(old_plate_number) + "*"
else:
old_plate_number = plate_num
logger.debug(f"Got plate number: {plate_num}, plate: {plate}")
if plate == None:
try:
new_dict['source_row'], new_dict['source_column'] = convert_well_to_row_column(item['well'])
new_dict['plate_number'] = plate_num
except KeyError:
pass
else:
new_dict['plate_number'] = plate_num
new_dict['plate'] = plate.submission.rsl_plate_num
new_dict['source_row'] = plate.row
new_dict['source_column'] = plate.column
output_samples.append(new_dict)
df = pd.DataFrame.from_records(output_samples)
df.sort_values(by=['destination_column', 'destination_row'], ascending=True, inplace=True)
columnsTitles = ['sample', 'destination_column', 'destination_row', 'plate_number', 'plate', "source_column", 'source_row']
df = df.reindex(columns=columnsTitles)
ofname = select_save_file(obj=obj, default_name=f"First Strand {date.today()}", extension="csv")
df.to_csv(ofname, index=False)
return obj, None

View File

@@ -622,3 +622,20 @@ def check_if_app(ctx:Settings=None) -> bool:
else: else:
return False return False
def convert_well_to_row_column(input_str:str) -> Tuple[int, int]:
"""
Converts typical alphanumeric (i.e. "A2") to row, column
Args:
input_str (str): Input string. Ex. "A2"
Returns:
Tuple[int, int]: row, column
"""
row_keys = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
try:
row = int(row_keys[input_str[0].upper()])
column = int(input_str[1:])
except IndexError:
return None, None
return row, column