diff --git a/CHANGELOG.md b/CHANGELOG.md index c8994f7..e0575cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 202309.03 + +- Autofill now adds name of reagent instead of type. + ## 202309.02 - Massive restructure of app and database to allow better relationships between kits/reagenttypes & submissions/samples. diff --git a/TODO.md b/TODO.md index 7752aa4..db476cc 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,7 @@ +- [ ] Make kits easier to add. - [ ] Clean up & document code... again. - Including paring down the logging.debugs + - Also including reducing number of functions in db.functions - [ ] Fix Tests... again. - [x] Rebuild database - [x] Provide more generic names for reagenttypes in kits and move specific names to reagents. diff --git a/src/submissions/__init__.py b/src/submissions/__init__.py index 136be07..22a5b51 100644 --- a/src/submissions/__init__.py +++ b/src/submissions/__init__.py @@ -4,7 +4,7 @@ from pathlib import Path # Version of the realpython-reader package __project__ = "submissions" -__version__ = "202309.2b" +__version__ = "202309.3b" __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __copyright__ = "2022-2023, Government of Canada" diff --git a/src/submissions/backend/db/functions.py b/src/submissions/backend/db/functions.py index c07c0c5..63600c1 100644 --- a/src/submissions/backend/db/functions.py +++ b/src/submissions/backend/db/functions.py @@ -18,7 +18,7 @@ import numpy as np import yaml from pathlib import Path from tools import Settings, check_regex_match, RSLNamer -from typing import List +from typing import List, Tuple @@ -27,6 +27,14 @@ logger = logging.getLogger(f"submissions.{__name__}") # The below _should_ allow automatic creation of foreign keys in the database @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): + """ + *should* allow automatic creation of foreign keys in the database + I have no idea how it actually works. + + Args: + dbapi_connection (_type_): _description_ + connection_record (_type_): _description_ + """ cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() @@ -79,7 +87,7 @@ def store_reagent(ctx:Settings, reagent:models.Reagent) -> None|dict: return {"message":"The database is locked for editing."} return None -def construct_submission_info(ctx:Settings, info_dict:dict) -> models.BasicSubmission: +def construct_submission_info(ctx:Settings, info_dict:dict) -> Tuple[models.BasicSubmission, dict]: """ Construct submission object from dictionary @@ -273,7 +281,7 @@ def lookup_reagenttype_by_name(ctx:Settings, rt_name:str) -> models.ReagentType: Returns: models.ReagentType: looked up reagent type """ - logger.debug(f"Looking up ReagentType by name: {rt_name.title()}") + logger.debug(f"Looking up ReagentType by name: {rt_name}") lookedup = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==rt_name).first() logger.debug(f"Found ReagentType: {lookedup}") return lookedup @@ -302,7 +310,7 @@ def lookup_kittype_by_name(ctx:Settings, name:str|dict) -> models.KitType: Args: ctx (Settings): settings object passed from bui - name (str): name of kit to query + name (str|dict): name of kit to query, or parsed object containing value=name Returns: models.KitType: retrieved kittype @@ -989,25 +997,6 @@ def lookup_reagent(ctx:Settings, reagent_lot:str, type_name:str|None=None) -> mo # return ctx['database_session'].query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() return ctx.database_session.query(models.Reagent).filter(models.Reagent.lot==reagent_lot).first() -def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str) -> models.Reagent: - """ - Look up the last used reagent of the reagent type - - Args: - ctx (Settings): Settings object passed down from gui - type_name (str): Name of reagent type - - Returns: - models.Reagent: Reagent object with last used lot. - """ - # rt = ctx['database_session'].query(models.ReagentType).filter(models.ReagentType.name==type_name).first() - rt = ctx.database_session.query(models.ReagentType).filter(models.ReagentType.name==type_name).first() - logger.debug(f"Reagent type looked up for {type_name}: {rt.__str__()}") - try: - return lookup_reagent(ctx=ctx, reagent_lot=rt.last_used, type_name=type_name) - except AttributeError: - return None - def check_kit_integrity(sub:models.BasicSubmission|models.KitType, reagenttypes:list|None=None) -> dict|None: """ Ensures all reagents expected in kit are listed in Submission @@ -1120,7 +1109,7 @@ def lookup_subsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str, .filter(models.BasicSample.submitter_id==rsl_sample_num)\ .first() -def lookup_sub_wwsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str, rsl_sample_num:str) -> models.WastewaterAssociation: +def lookup_sub_samp_association_by_plate_sample(ctx:Settings, rsl_plate_num:str|models.BasicSample, rsl_sample_num:str|models.BasicSubmission) -> models.WastewaterAssociation: """ _summary_ @@ -1132,12 +1121,36 @@ def lookup_sub_wwsamp_association_by_plate_sample(ctx:Settings, rsl_plate_num:st Returns: models.SubmissionSampleAssociation: _description_ """ - return ctx.database_session.query(models.WastewaterAssociation)\ - .join(models.Wastewater)\ - .join(models.WastewaterSample)\ - .filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num)\ - .filter(models.BasicSample.submitter_id==rsl_sample_num)\ - .first() + # logger.debug(f"{type(rsl_plate_num)}, {type(rsl_sample_num)}") + match rsl_plate_num: + case models.BasicSubmission()|models.Wastewater(): + # logger.debug(f"Model for rsl_plate_num: {rsl_plate_num}") + first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\ + .filter(models.SubmissionSampleAssociation.submission==rsl_plate_num) + case str(): + # logger.debug(f"String for rsl_plate_num: {rsl_plate_num}") + first_query = ctx.database_session.query(models.SubmissionSampleAssociation)\ + .join(models.BasicSubmission)\ + .filter(models.BasicSubmission.rsl_plate_num==rsl_plate_num) + case _: + logger.error(f"Unknown case for rsl_plate_num {rsl_plate_num}") + match rsl_sample_num: + case models.BasicSample()|models.WastewaterSample(): + # logger.debug(f"Model for rsl_sample_num: {rsl_sample_num}") + second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num) + # case models.WastewaterSample: + # second_query = first_query.filter(models.SubmissionSampleAssociation.sample==rsl_sample_num) + case str(): + # logger.debug(f"String for rsl_sample_num: {rsl_sample_num}") + second_query = first_query.join(models.BasicSample)\ + .filter(models.BasicSample.submitter_id==rsl_sample_num) + case _: + logger.error(f"Unknown case for rsl_sample_num {rsl_sample_num}") + try: + return second_query.first() + except UnboundLocalError: + logger.error(f"Couldn't construct second query") + return None def lookup_all_reagent_names_by_role(ctx:Settings, role_name:str) -> List[str]: """ @@ -1183,7 +1196,7 @@ def add_reagenttype_to_kit(ctx:Settings, rt_name:str, kit_name:str, eol:int=0): kit = lookup_kittype_by_name(ctx=ctx, name=kit_name) rt = lookup_reagenttype_by_name(ctx=ctx, rt_name=rt_name) if rt == None: - rt = models.ReagentType(name=rt_name.strip(), eol_ext=timedelta(30*eol), last_used="") + rt = models.ReagentType(name=rt_name.strip(), eol_ext=timedelta(30*eol)) ctx.database_session.add(rt) assoc = models.KitTypeReagentTypeAssociation(kit_type=kit, reagent_type=rt, uses={}) kit.kit_reagenttype_associations.append(assoc) @@ -1203,4 +1216,69 @@ def update_subsampassoc_with_pcr(ctx:Settings, submission:models.BasicSubmission except AttributeError: logger.error(f"Can't set {k} to {v}") ctx.database_session.add(assoc) - ctx.database_session.commit() \ No newline at end of file + ctx.database_session.commit() + +def lookup_ww_sample_by_processing_number(ctx:Settings, processing_number:str): + return ctx.database_session.query(models.WastewaterSample).filter(models.WastewaterSample.ww_processing_num==processing_number).first() + +def lookup_kitreagentassoc_by_kit_and_reagent(ctx:Settings, kit:models.KitType|str, reagent_type:models.ReagentType|str) -> models.KitTypeReagentTypeAssociation: + """ + _summary_ + + Args: + ctx (Settings): _description_ + kit (models.KitType | str): _description_ + reagent_type (models.ReagentType | str): _description_ + + Returns: + models.KitTypeReagentTypeAssociation: _description_ + """ + base_query = ctx.database_session.query(models.KitTypeReagentTypeAssociation) + match kit: + case models.KitType(): + query1 = base_query.filter(models.KitTypeReagentTypeAssociation.kit_type==kit) + case str(): + query1 = base_query.join(models.KitType).filter(models.KitType.name==kit) + case _: + query1 = base_query + match reagent_type: + case models.ReagentType(): + query2 = query1.filter(models.KitTypeReagentTypeAssociation.reagent_type==reagent_type) + case str(): + query2 = query1.join(models.ReagentType).filter(models.ReagentType.name==reagent_type) + case _: + query2 = query1 + return query2.first() + +def lookup_last_used_reagenttype_lot(ctx:Settings, type_name:str, extraction_kit:str|None=None) -> models.Reagent: + """ + Look up the last used reagent of the reagent type + + Args: + ctx (Settings): Settings object passed down from gui + type_name (str): Name of reagent type + + Returns: + models.Reagent: Reagent object with last used lot. + """ + assoc = lookup_kitreagentassoc_by_kit_and_reagent(ctx=ctx, kit=extraction_kit, reagent_type=type_name) + return lookup_reagent(ctx=ctx, reagent_lot=assoc.last_used) + +def update_last_used(ctx:Settings, reagent:models.Reagent, kit:models.KitType): + """ + _summary_ + + Args: + ctx (Settings): _description_ + reagent (models.ReagentType): _description_ + reagent_lot (str): _description_ + """ + rt = list(set(reagent.type).intersection(kit.reagent_types))[0] + if rt != None: + assoc = lookup_kitreagentassoc_by_kit_and_reagent(ctx=ctx, kit=kit, reagent_type=rt) + if assoc != None: + if assoc.last_used != reagent.lot: + logger.debug(f"Updating {assoc} last used to {reagent.lot}") + assoc.last_used = reagent.lot + ctx.database_session.merge(assoc) + ctx.database_session.commit() diff --git a/src/submissions/backend/db/models/kits.py b/src/submissions/backend/db/models/kits.py index f293aaa..7650351 100644 --- a/src/submissions/backend/db/models/kits.py +++ b/src/submissions/backend/db/models/kits.py @@ -97,39 +97,7 @@ class KitType(Base): map['info'] = {} return map -class KitTypeReagentTypeAssociation(Base): - """ - table containing reagenttype/kittype associations - DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html - """ - __tablename__ = "_reagenttypes_kittypes" - reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True) - kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True) - uses = Column(JSON) - required = Column(INTEGER) - kit_type = relationship(KitType, back_populates="kit_reagenttype_associations") - - # reference to the "ReagentType" object - reagent_type = relationship("ReagentType") - - def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1): - self.kit_type = kit_type - self.reagent_type = reagent_type - self.uses = uses - self.required = required - - @validates('required') - def validate_age(self, key, value): - if not 0 <= value < 2: - raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') - return value - - @validates('reagenttype') - def validate_reagenttype(self, key, value): - if not isinstance(value, ReagentType): - raise ValueError(f'{value} is not a reagenttype') - return value class ReagentType(Base): """ @@ -141,7 +109,16 @@ class ReagentType(Base): name = Column(String(64)) #: name of reagent type instances = relationship("Reagent", back_populates="type", secondary=reagenttypes_reagents) #: concrete instances of this reagent type eol_ext = Column(Interval()) #: extension of life interval - last_used = Column(String(32)) #: last used lot number of this type of reagent + + reagenttype_kit_associations = relationship( + "KitTypeReagentTypeAssociation", + back_populates="reagent_type", + cascade="all, delete-orphan", + ) + + # association proxy of "user_keyword_associations" collection + # to "keyword" attribute + kit_types = association_proxy("kit_reagenttype_associations", "kit_type") @validates('required') def validate_age(self, key, value): @@ -160,6 +137,44 @@ class ReagentType(Base): def __repr__(self): return f"ReagentType({self.name})" + +class KitTypeReagentTypeAssociation(Base): + """ + table containing reagenttype/kittype associations + DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html + """ + __tablename__ = "_reagenttypes_kittypes" + reagent_types_id = Column(INTEGER, ForeignKey("_reagent_types.id"), primary_key=True) + kits_id = Column(INTEGER, ForeignKey("_kits.id"), primary_key=True) + uses = Column(JSON) + required = Column(INTEGER) + last_used = Column(String(32)) #: last used lot number of this type of reagent + + kit_type = relationship(KitType, back_populates="kit_reagenttype_associations") + + # reference to the "ReagentType" object + reagent_type = relationship(ReagentType, back_populates="reagenttype_kit_associations") + + def __init__(self, kit_type=None, reagent_type=None, uses=None, required=1): + self.kit_type = kit_type + self.reagent_type = reagent_type + self.uses = uses + self.required = required + + def __repr__(self) -> str: + return f"" + + @validates('required') + def validate_age(self, key, value): + if not 0 <= value < 2: + raise ValueError(f'Invalid required value {value}. Must be 0 or 1.') + return value + + @validates('reagenttype') + def validate_reagenttype(self, key, value): + if not isinstance(value, ReagentType): + raise ValueError(f'{value} is not a reagenttype') + return value class Reagent(Base): """ @@ -247,10 +262,12 @@ class Reagent(Base): except AttributeError: rtype = "Unknown" return { + "name":self.name, "type": rtype, "lot": self.lot, "expiry": self.expiry.strftime("%Y-%m-%d") } + class Discount(Base): """ @@ -266,6 +283,9 @@ class Discount(Base): name = Column(String(128)) amount = Column(FLOAT(2)) + def __repr__(self) -> str: + return f"" + class SubmissionType(Base): """ Abstract of types of submissions. diff --git a/src/submissions/backend/db/models/organizations.py b/src/submissions/backend/db/models/organizations.py index d3dde7a..a492355 100644 --- a/src/submissions/backend/db/models/organizations.py +++ b/src/submissions/backend/db/models/organizations.py @@ -47,3 +47,6 @@ class Contact(Base): phone = Column(String(32)) #: contact phone number organization = relationship("Organization", back_populates="contacts", uselist=True, secondary=orgs_contacts) #: relationship to joined organization + def __repr__(self) -> str: + return f"" + diff --git a/src/submissions/backend/db/models/submissions.py b/src/submissions/backend/db/models/submissions.py index 84ab791..22610a8 100644 --- a/src/submissions/backend/db/models/submissions.py +++ b/src/submissions/backend/db/models/submissions.py @@ -13,8 +13,6 @@ from sqlalchemy.ext.associationproxy import association_proxy import uuid from pandas import Timestamp from dateutil.parser import parse -import pprint -from tools import check_not_nan logger = logging.getLogger(f"submissions.{__name__}") @@ -348,7 +346,7 @@ class BasicSample(Base): return value def __repr__(self) -> str: - return f"<{self.sample_type.replace('_', ' ').title(). replace(' ', '')}({self.submitter_id})>" + return f"<{self.sample_type.replace('_', ' ').title().replace(' ', '')}({self.submitter_id})>" def set_attribute(self, name, value): # logger.debug(f"Setting {name} to {value}") @@ -417,56 +415,36 @@ class WastewaterSample(BasicSample): logger.debug(f"Validating {key}: {value}") return value or self.submitter_id - # def __init__(self, **kwargs): - # # Had a problem getting collection date from excel as text only. - # if 'collection_date' in kwargs.keys(): - # logger.debug(f"Got collection_date: {kwargs['collection_date']}. Attempting parse.") - # if isinstance(kwargs['collection_date'], str): - # logger.debug(f"collection_date is a string...") - # kwargs['collection_date'] = parse(kwargs['collection_date']) - # logger.debug(f"output is {kwargs['collection_date']}") - # # Due to the plate map being populated with RSL numbers, we have to do some shuffling. - # try: - # kwargs['rsl_number'] = kwargs['submitter_id'] - # except KeyError as e: - # logger.error(f"Error using {kwargs} for submitter_id") - # try: - # check = check_not_nan(kwargs['ww_full_sample_id']) - # except KeyError: - # logger.error(f"Error using {kwargs} for ww_full_sample_id") - # check = False - # if check: - # kwargs['submitter_id'] = kwargs["ww_full_sample_id"] - # super().__init__(**kwargs) - def set_attribute(self, name:str, value): """ Set an attribute of this object. Extends parent. Args: - name (str): _description_ - value (_type_): _description_ + name (str): name of the attribute + value (_type_): value to be set """ # Due to the plate map being populated with RSL numbers, we have to do some shuffling. - # logger.debug(f"Input - {name}:{value}") match name: case "submitter_id": + # If submitter_id already has a value, stop if self.submitter_id != None: return + # otherwise also set rsl_number to the same value else: super().set_attribute("rsl_number", value) case "ww_full_sample_id": + # If value present, set ww_full_sample_id and make this the submitter_id if value != None: super().set_attribute(name, value) name = "submitter_id" case 'collection_date': + # If this is a string use dateutils to parse into date() if isinstance(value, str): logger.debug(f"collection_date {value} is a string. Attempting parse...") value = parse(value) case "rsl_number": if value == None: value = self.submitter_id - # logger.debug(f"Output - {name}:{value}") super().set_attribute(name, value) diff --git a/src/submissions/backend/excel/parser.py b/src/submissions/backend/excel/parser.py index b704074..a3e551d 100644 --- a/src/submissions/backend/excel/parser.py +++ b/src/submissions/backend/excel/parser.py @@ -14,7 +14,7 @@ import re import numpy as np from datetime import date from dateutil.parser import parse, ParserError -from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings +from tools import check_not_nan, RSLNamer, convert_nans_to_nones, Settings, convert_well_to_row_column from frontend.custom_widgets.pop_ups import SubmissionTypeSelector, KitSelector logger = logging.getLogger(f"submissions.{__name__}") @@ -48,14 +48,12 @@ class SheetParser(object): # make decision about type of sample we have self.sub['submission_type'] = self.type_decider() # # grab the info map from the submission type in database - # self.info_map = self.fetch_kit_info_map() self.parse_info() self.import_kit_validation_check() self.parse_reagents() self.import_reagent_validation_check() self.parse_samples() - # self.sub['sample_count'] = len(self.sub['samples']) - + def type_decider(self) -> str: """ @@ -91,7 +89,7 @@ class SheetParser(object): def parse_info(self): """ - _summary_ + Pulls basic information from the excel sheet """ info = InfoParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_info() parser_query = f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}" @@ -101,14 +99,23 @@ class SheetParser(object): except AttributeError: logger.error(f"Couldn't find submission parser: {parser_query}") for k,v in info.items(): - if k != "sample": - self.sub[k] = v + match k: + case "sample": + pass + case _: + self.sub[k] = v logger.debug(f"Parser.sub after info scrape: {pprint.pformat(self.sub)}") def parse_reagents(self): + """ + Pulls reagent info from the excel sheet + """ self.sub['reagents'] = ReagentParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type'], extraction_kit=self.sub['extraction_kit']).parse_reagents() def parse_samples(self): + """ + Pulls sample info from the excel sheet + """ self.sample_result, self.sub['samples'] = SampleParser(ctx=self.ctx, xl=self.xl, submission_type=self.sub['submission_type']['value']).parse_samples() def parse_bacterial_culture(self, input_dict) -> dict: @@ -159,7 +166,7 @@ class SheetParser(object): Returns: List[PydReagent]: List of reagents """ - if not check_not_nan(self.sub['extraction_kit']): + if not check_not_nan(self.sub['extraction_kit']['value']): dlg = KitSelector(ctx=self.ctx, title="Kit Needed", message="At minimum a kit is needed. Please select one.") if dlg.exec(): self.sub['extraction_kit'] = dict(value=dlg.getValues(), parsed=False) @@ -197,7 +204,16 @@ class InfoParser(object): self.xl = xl logger.debug(f"Info map for InfoParser: {pprint.pformat(self.map)}") - def fetch_submission_info_map(self, submission_type:dict) -> dict: + def fetch_submission_info_map(self, submission_type:str|dict) -> dict: + """ + Gets location of basic info from the submission_type object in the database. + + Args: + submission_type (str|dict): name of the submission type or parsed object with value=submission_type + + Returns: + dict: Location map of all info for this submission type + """ if isinstance(submission_type, str): submission_type = dict(value=submission_type, parsed=False) logger.debug(f"Looking up submission type: {submission_type['value']}") @@ -206,6 +222,12 @@ class InfoParser(object): return info_map def parse_info(self) -> dict: + """ + Pulls basic info from the excel sheet. + + Returns: + dict: key:value of basic info + """ dicto = {} for sheet in self.xl.sheet_names: df = self.xl.parse(sheet, header=None) @@ -302,6 +324,8 @@ class SampleParser(object): sample_info_map = self.fetch_sample_info_map(submission_type=submission_type) self.plate_map = self.construct_plate_map(plate_map_location=sample_info_map['plate_map']) self.lookup_table = self.construct_lookup_table(lookup_table_location=sample_info_map['lookup_table']) + if "plates" in sample_info_map: + self.plates = sample_info_map['plates'] self.excel_to_db_map = sample_info_map['xl_db_translation'] self.create_basic_dictionaries_from_plate_map() if isinstance(self.lookup_table, pd.DataFrame): @@ -383,7 +407,7 @@ class SampleParser(object): sample[k] = v logger.debug(f"Output sample dict: {sample}") - def parse_samples(self) -> List[dict]: + def parse_samples(self, generate:bool=True) -> List[dict]: result = None new_samples = [] for ii, sample in enumerate(self.samples): @@ -414,7 +438,10 @@ class SampleParser(object): translated_dict = custom_parser(translated_dict) except AttributeError: logger.error(f"Couldn't get custom parser: {parser_query}") - new_samples.append(self.generate_sample_object(translated_dict)) + if generate: + new_samples.append(self.generate_sample_object(translated_dict)) + else: + new_samples.append(translated_dict) return result, new_samples def generate_sample_object(self, input_dict) -> models.BasicSample: @@ -464,6 +491,7 @@ class SampleParser(object): dict: Updated sample dictionary """ logger.debug(f"Called wastewater sample parser") + return input_dict def parse_wastewater_artic_sample(self, input_dict:dict) -> dict: """ @@ -481,6 +509,24 @@ class SampleParser(object): # at the end, this has to be done here. No moving to sqlalchemy object :( input_dict['submitter_id'] = re.sub(r"\s\(.+\)$", "", str(input_dict['submitter_id'])).strip() return input_dict + + def parse_first_strand_sample(self, input_dict:dict) -> dict: + logger.debug("Called first strand sample parser") + input_dict['well'] = re.search(r"\s\((.*)\)$", input_dict['submitter_id']).groups()[0] + input_dict['submitter_id'] = re.sub(r"\s\(.*\)$", "", str(input_dict['submitter_id'])).strip() + return input_dict + + def grab_plates(self): + plates = [] + for plate in self.plates: + df = self.xl.parse(plate['sheet'], header=None) + if isinstance(df.iat[plate['row']-1, plate['column']-1], str): + output = RSLNamer(ctx=self.ctx, instr=df.iat[plate['row']-1, plate['column']-1]).parsed_name + else: + continue + plates.append(output) + return plates + class PCRParser(object): """ diff --git a/src/submissions/backend/pydant/__init__.py b/src/submissions/backend/pydant/__init__.py index 127d83a..2b525cb 100644 --- a/src/submissions/backend/pydant/__init__.py +++ b/src/submissions/backend/pydant/__init__.py @@ -200,17 +200,25 @@ class PydSubmission(BaseModel, extra=Extra.allow): @field_validator("extraction_kit", mode='before') @classmethod def rescue_kit(cls, value): - # from frontend.custom_widgets.pop_ups import KitSelector - # if check_not_nan(value): - # if isinstance(value, str): - # return dict(value=value, parsed=True) - # elif isinstance(value, dict): - # return value - # else: - # raise ValueError(f"No extraction kit found.") + + if check_not_nan(value): + if isinstance(value, str): + return dict(value=value, parsed=True) + elif isinstance(value, dict): + return value + else: + raise ValueError(f"No extraction kit found.") if value == None: return dict(value=None, parsed=False) return value + + # @field_validator("extraction_kit") + # @classmethod + # def enforce_kit(cls, value, values): + # from frontend.custom_widgets.pop_ups import KitSelector + # if value['value'] == None: + # return dict(value=KitSelector(values.data['ctx'], title="Select Extraction Kit", message="No extraction kit was found, please select from below.")) + # return value @field_validator("submission_type", mode='before') @classmethod diff --git a/src/submissions/frontend/__init__.py b/src/submissions/frontend/__init__.py index e2daf25..84568fb 100644 --- a/src/submissions/frontend/__init__.py +++ b/src/submissions/frontend/__init__.py @@ -64,6 +64,7 @@ class App(QMainWindow): fileMenu = menuBar.addMenu("&File") # Creating menus using a title editMenu = menuBar.addMenu("&Edit") + methodsMenu = menuBar.addMenu("&Methods") reportMenu = menuBar.addMenu("&Reports") maintenanceMenu = menuBar.addMenu("&Monthly") helpMenu = menuBar.addMenu("&Help") @@ -71,6 +72,7 @@ class App(QMainWindow): helpMenu.addAction(self.docsAction) fileMenu.addAction(self.importAction) fileMenu.addAction(self.importPCRAction) + methodsMenu.addAction(self.constructFS) reportMenu.addAction(self.generateReportAction) maintenanceMenu.addAction(self.joinControlsAction) maintenanceMenu.addAction(self.joinExtractionAction) @@ -103,6 +105,7 @@ class App(QMainWindow): self.joinPCRAction = QAction("Link PCR Logs") self.helpAction = QAction("&About", self) self.docsAction = QAction("&Docs", self) + self.constructFS = QAction("Make First Strand", self) def _connectActions(self): @@ -125,6 +128,7 @@ class App(QMainWindow): self.joinPCRAction.triggered.connect(self.linkPCR) self.helpAction.triggered.connect(self.showAbout) self.docsAction.triggered.connect(self.openDocs) + self.constructFS.triggered.connect(self.construct_first_strand) def showAbout(self): """ @@ -290,6 +294,14 @@ class App(QMainWindow): self, result = import_pcr_results_function(self) self.result_reporter(result) + def construct_first_strand(self): + """ + Converts first strand excel sheet to Biomek CSV + """ + from .main_window_functions import construct_first_strand_function + self, result = construct_first_strand_function(self) + self.result_reporter(result) + class AddSubForm(QWidget): def __init__(self, parent): diff --git a/src/submissions/frontend/custom_widgets/misc.py b/src/submissions/frontend/custom_widgets/misc.py index ad00683..964b660 100644 --- a/src/submissions/frontend/custom_widgets/misc.py +++ b/src/submissions/frontend/custom_widgets/misc.py @@ -31,7 +31,7 @@ class AddReagentForm(QDialog): super().__init__() self.ctx = ctx if reagent_lot == None: - reagent_lot = "" + reagent_lot = reagent_type self.setWindowTitle("Add Reagent") @@ -257,7 +257,7 @@ class ControlsDatePicker(QWidget): class ImportReagent(QComboBox): - def __init__(self, ctx:dict, reagent:PydReagent): + def __init__(self, ctx:dict, reagent:PydReagent, extraction_kit:str): super().__init__() self.setEditable(True) # Ensure that all reagenttypes have a name that matches the items in the excel parser @@ -289,7 +289,7 @@ class ImportReagent(QComboBox): relevant_reagents.insert(0, str(reagent.lot)) else: # TODO: look up the last used reagent of this type in the database - looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type) + looked_up_reg = lookup_last_used_reagenttype_lot(ctx=ctx, type_name=reagent.type, extraction_kit=extraction_kit) logger.debug(f"Because there was no reagent listed for {reagent}, we will insert the last lot used: {looked_up_reg}") if looked_up_reg != None: relevant_reagents.remove(str(looked_up_reg.lot)) diff --git a/src/submissions/frontend/main_window_functions.py b/src/submissions/frontend/main_window_functions.py index 96f4ef1..4ea0d9b 100644 --- a/src/submissions/frontend/main_window_functions.py +++ b/src/submissions/frontend/main_window_functions.py @@ -26,12 +26,13 @@ from backend.db.functions import ( construct_submission_info, lookup_reagent, store_submission, lookup_submissions_by_date_range, create_kit_from_yaml, create_org_from_yaml, get_control_subtypes, get_all_controls_by_type, lookup_all_submissions_by_type, get_all_controls, lookup_submission_by_rsl_num, update_subsampassoc_with_pcr, - check_kit_integrity + check_kit_integrity, lookup_sub_samp_association_by_plate_sample, lookup_ww_sample_by_processing_number, + lookup_sample_by_submitter_id, update_last_used ) -from backend.excel.parser import SheetParser, PCRParser +from backend.excel.parser import SheetParser, PCRParser, SampleParser from backend.excel.reports import make_report_html, make_report_xlsx, convert_data_list_to_df from backend.pydant import PydReagent -from tools import check_not_nan +from tools import check_not_nan, convert_well_to_row_column from .custom_widgets.pop_ups import AlertPop, QuestionAsker from .custom_widgets import ReportDatePicker from .custom_widgets.misc import ImportReagent, ParsedQLabel @@ -182,7 +183,7 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None] # reg_label.setObjectName(f"lot_{reagent['type']}_label") reg_label.setObjectName(f"lot_{reagent['value'].type}_label") # create reagent choice widget - add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent['value']) + add_widget = ImportReagent(ctx=obj.ctx, reagent=reagent['value'], extraction_kit=pyd.extraction_kit['value']) add_widget.setObjectName(f"lot_{reagent['value'].type}") logger.debug(f"Widget name set to: {add_widget.objectName()}") obj.table_widget.formlayout.addWidget(reg_label) @@ -277,7 +278,7 @@ def kit_integrity_completion_function(obj:QMainWindow) -> Tuple[QMainWindow, dic for item in obj.missing_reagents: obj.table_widget.formlayout.addWidget(ParsedQLabel({'parsed':False}, item.type, title=False)) reagent = dict(type=item.type, lot=None, exp=date.today(), name=None) - add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent))#item=item) + add_widget = ImportReagent(ctx=obj.ctx, reagent=PydReagent(**reagent), extraction_kit=obj.ext_kit)#item=item) obj.table_widget.formlayout.addWidget(add_widget) submit_btn = QPushButton("Submit") submit_btn.setObjectName("lot_submit_btn") @@ -310,7 +311,6 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: # Lookup any existing reagent of this type with this lot number wanted_reagent = lookup_reagent(ctx=obj.ctx, reagent_lot=reagents[reagent], type_name=reagent) logger.debug(f"Looked up reagent: {wanted_reagent}") - # logger.debug(f"\n\nLooking for {reagent} in {obj.reagents}\n\n") # if reagent not found offer to add to database if wanted_reagent == None: r_lot = reagents[reagent] @@ -328,15 +328,11 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: else: # In this case we will have an empty reagent and the submission will fail kit integrity check logger.debug("Will not add reagent.") - # obj.ctx.database_session.rollback() return obj, dict(message="Failed integrity check", status="critical") - # if wanted_reagent != None: parsed_reagents.append(wanted_reagent) - wanted_reagent.type.last_used = reagents[reagent] # move samples into preliminary submission dict info['samples'] = obj.samples info['uploaded_by'] = getuser() - # info['columns'] = obj.column_count # construct submission object logger.debug(f"Here is the info_dict: {pprint.pformat(info)}") base_submission, result = construct_submission_info(ctx=obj.ctx, info_dict=info) @@ -359,6 +355,7 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: # add reagents to submission object for reagent in parsed_reagents: base_submission.reagents.append(reagent) + update_last_used(ctx=obj.ctx, reagent=reagent, kit=base_submission.extraction_kit) logger.debug(f"Parsed reagents: {pprint.pformat(parsed_reagents)}") logger.debug("Checking kit integrity...") kit_integrity = check_kit_integrity(base_submission) @@ -377,12 +374,8 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.") extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit) logger.debug(f"We have the extraction kit: {extraction_kit.name}") - - # TODO replace below with function in KitType object. Update Kittype associations. - # excel_map = extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')] excel_map = extraction_kit.construct_xl_map_for_use(obj.current_submission_type) logger.debug(f"Extraction kit map:\n\n{pprint.pformat(excel_map)}") - # excel_map.update(extraction_kit.used_for[obj.current_submission_type.replace('_', ' ').title()]) input_reagents = [item.to_reagent_dict(extraction_kit=base_submission.extraction_kit) for item in parsed_reagents] logger.debug(f"Parsed reagents going into autofile: {pprint.pformat(input_reagents)}") autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info, missing_info=obj.missing_info) @@ -881,17 +874,8 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re # pare down reagents to only what's missing logger.debug(f"Checking {[item['type'] for item in reagents]} against {[reagent.type for reagent in missing_reagents]}") relevant_reagents = [item for item in reagents if item['type'] in [reagent.type for reagent in missing_reagents]] - # relevant_reagents = [] - # for item in reagents: - # logger.debug(f"Checking {item['type']} in {[reagent.type for reagent in missing_reagents]}") - # if item['type'] in [reagent.type for reagent in missing_reagents]: - # logger.debug("Hit!") - # relevant_reagents.append(item) - # else: - # logger.debug('Miss.') logger.debug(f"Here are the relevant reagents: {pprint.pformat(relevant_reagents)}") # hacky manipulation of submission type so it looks better. - # info['submission_type'] = info['submission_type'].replace("_", " ").title() # pare down info to just what's missing relevant_info_map = {k:v for k,v in xl_map['info'].items() if k in missing_info and k != 'samples'} relevant_info = {k:v for k,v in info.items() if k in missing_info} @@ -910,9 +894,9 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re # name is only present for Bacterial Culture try: new_reagent['name'] = relevant_reagent_map[new_reagent['type']]['name'] - new_reagent['name']['value'] = reagent['type'] - except: - pass + new_reagent['name']['value'] = reagent['name'] + except Exception as e: + logger.error(f"Couldn't get name due to {e}") new_reagents.append(new_reagent) # construct new info objects to put into excel sheets new_info = [] @@ -936,21 +920,98 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re # Get relevant reagents for that sheet sheet_reagents = [item for item in new_reagents if sheet in item['sheet']] for reagent in sheet_reagents: - logger.debug(f"Attempting: {reagent['type']}:") + logger.debug(f"Attempting to write lot {reagent['lot']['value']} in: row {reagent['lot']['row']}, column {reagent['lot']['column']}") worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value']) + logger.debug(f"Attempting to write expiry {reagent['expiry']['value']} in: row {reagent['expiry']['row']}, column {reagent['expiry']['column']}") worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value']) try: - worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value'].replace("_", " ").upper()) - except: - pass + logger.debug(f"Attempting to write name {reagent['name']['value']} in: row {reagent['name']['row']}, column {reagent['name']['column']}") + worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value']) + except Exception as e: + logger.error(f"Could not write name {reagent['name']['value']} due to {e}") # Get relevant info for that sheet sheet_info = [item for item in new_info if sheet in item['location']['sheets']] for item in sheet_info: logger.debug(f"Attempting: {item['type']}") worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) - # Hacky way to + # Hacky way to pop in 'signed by' if info['submission_type'] == "Bacterial Culture": workbook["Sample List"].cell(row=14, column=2, value=getuser()[0:2].upper()) fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx") workbook.save(filename=fname.__str__()) +def construct_first_strand_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]: + + def get_plates(input_sample_number:str, plates:list) -> Tuple[int, str]: + logger.debug(f"Looking up {input_sample_number} in {plates}") + samp = lookup_ww_sample_by_processing_number(ctx=obj.ctx, processing_number=input_sample_number) + if samp == None: + samp = lookup_sample_by_submitter_id(ctx=obj.ctx, submitter_id=input_sample_number) + logger.debug(f"Got sample: {samp}") + # if samp != None: + new_plates = [(iii+1, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate))) for iii, plate in enumerate(plates)] + # for iii, plate in enumerate(plates): + # lplate = lookup_submission_by_rsl_num(ctx=obj.ctx, rsl_num=plate) + # if lplate == None: + # continue + # else: + # logger.debug(f"Got a plate: {lplate}") + # new_plates.append((iii, lookup_sub_samp_association_by_plate_sample(ctx=obj.ctx, rsl_sample_num=samp, rsl_plate_num=lplate))) + logger.debug(f"Associations: {pprint.pformat(new_plates)}") + try: + plate_num, plate = next(assoc for assoc in new_plates if assoc[1] is not None) + except StopIteration: + plate_num, plate = None, None + logger.debug(f"Plate number {plate_num} is {plate}") + return plate_num, plate + + + fname = select_open_file(obj=obj, file_extension="xlsx") + xl = pd.ExcelFile(fname) + sprsr = SampleParser(ctx=obj.ctx, xl=xl, submission_type="First Strand") + _, samples = sprsr.parse_samples(generate=False) + plates = sprsr.grab_plates() + output_samples = [] + logger.debug(f"Samples: {pprint.pformat(samples)}") + old_plate_number = 1 + for item in samples: + new_dict = {} + new_dict['sample'] = item['submitter_id'] + if item['submitter_id'] == "NTC1": + new_dict['destination_row'] = 8 + new_dict['destination_column'] = 2 + new_dict['plate_number'] = 'control' + elif item['submitter_id'] == "NTC2": + new_dict['destination_row'] = 8 + new_dict['destination_column'] = 5 + new_dict['plate_number'] = 'control' + else: + new_dict['destination_row'] = item['row'] + new_dict['destination_column'] = item['column'] + # assocs = [(iii, lookup_ww_sample_by_processing_number_and_plate(ctx=obj.ctx, processing_number=new_dict['sample'], plate_number=plate)) for iii, plate in enumerate(plates)] + plate_num, plate = get_plates(input_sample_number=new_dict['sample'], plates=plates) + if plate_num == None: + plate_num = str(old_plate_number) + "*" + else: + old_plate_number = plate_num + logger.debug(f"Got plate number: {plate_num}, plate: {plate}") + if plate == None: + try: + new_dict['source_row'], new_dict['source_column'] = convert_well_to_row_column(item['well']) + new_dict['plate_number'] = plate_num + except KeyError: + pass + else: + new_dict['plate_number'] = plate_num + new_dict['plate'] = plate.submission.rsl_plate_num + new_dict['source_row'] = plate.row + new_dict['source_column'] = plate.column + output_samples.append(new_dict) + df = pd.DataFrame.from_records(output_samples) + df.sort_values(by=['destination_column', 'destination_row'], ascending=True, inplace=True) + columnsTitles = ['sample', 'destination_column', 'destination_row', 'plate_number', 'plate', "source_column", 'source_row'] + df = df.reindex(columns=columnsTitles) + ofname = select_save_file(obj=obj, default_name=f"First Strand {date.today()}", extension="csv") + df.to_csv(ofname, index=False) + return obj, None + diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 42d4e0a..ee468c2 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -621,4 +621,21 @@ def check_if_app(ctx:Settings=None) -> bool: return True else: return False - \ No newline at end of file + +def convert_well_to_row_column(input_str:str) -> Tuple[int, int]: + """ + Converts typical alphanumeric (i.e. "A2") to row, column + + Args: + input_str (str): Input string. Ex. "A2" + + Returns: + Tuple[int, int]: row, column + """ + row_keys = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8) + try: + row = int(row_keys[input_str[0].upper()]) + column = int(input_str[1:]) + except IndexError: + return None, None + return row, column \ No newline at end of file