Autofill excel sheet

This commit is contained in:
Landon Wark
2023-07-21 13:55:15 -05:00
parent ba35696055
commit f6e270f52d
10 changed files with 189 additions and 67 deletions

View File

@@ -1,3 +1,8 @@
## 202307.03
- Auto-filling of some empty cells in excel file.
- Better pydantic validations of missing data.
## 202307.02 ## 202307.02
- Better column counting for cost recovery purposes. - Better column counting for cost recovery purposes.

View File

@@ -1,3 +1,4 @@
- [x] Put in SN controls I guess.
- [x] Code clean-up and refactor (2023-07). - [x] Code clean-up and refactor (2023-07).
- [ ] Migrate context settings to pydantic-settings model. - [ ] Migrate context settings to pydantic-settings model.
- [x] Insert column into reagent type to indicate if reagent is required for kit. - [x] Insert column into reagent type to indicate if reagent is required for kit.

View File

@@ -4,7 +4,7 @@ from pathlib import Path
# Version of the realpython-reader package # Version of the realpython-reader package
__project__ = "submissions" __project__ = "submissions"
__version__ = "202307.2b" __version__ = "202307.3b"
__author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"} __author__ = {"name":"Landon Wark", "email":"Landon.Wark@phac-aspc.gc.ca"}
__copyright__ = "2022-2023, Government of Canada" __copyright__ = "2022-2023, Government of Canada"

View File

@@ -45,7 +45,7 @@ def store_submission(ctx:dict, base_submission:models.BasicSubmission) -> None|d
from tools import RSLNamer from tools import RSLNamer
logger.debug(f"Hello from store_submission") logger.debug(f"Hello from store_submission")
# Add all samples to sample table # Add all samples to sample table
typer = RSLNamer(base_submission.rsl_plate_num) typer = RSLNamer(ctx=ctx, instr=base_submission.rsl_plate_num)
base_submission.rsl_plate_num = typer.parsed_name base_submission.rsl_plate_num = typer.parsed_name
for sample in base_submission.samples: for sample in base_submission.samples:
logger.debug(f"Typer: {typer.submission_type}") logger.debug(f"Typer: {typer.submission_type}")
@@ -116,7 +116,7 @@ def construct_submission_info(ctx:dict, info_dict:dict) -> models.BasicSubmissio
return instance, {'code': 2, 'message': "A proper RSL plate number is required."} return instance, {'code': 2, 'message': "A proper RSL plate number is required."}
else: else:
# enforce conventions on the rsl plate number from the form # enforce conventions on the rsl plate number from the form
info_dict['rsl_plate_num'] = RSLNamer(info_dict["rsl_plate_num"]).parsed_name info_dict['rsl_plate_num'] = RSLNamer(ctx=ctx, instr=info_dict["rsl_plate_num"]).parsed_name
# check database for existing object # check database for existing object
instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first() instance = ctx['database_session'].query(models.BasicSubmission).filter(models.BasicSubmission.rsl_plate_num==info_dict['rsl_plate_num']).first()
# get model based on submission type converted above # get model based on submission type converted above

View File

@@ -38,7 +38,7 @@ class SheetParser(object):
self.filepath = filepath self.filepath = filepath
# Open excel file # Open excel file
try: try:
self.xl = pd.ExcelFile(filepath.__str__()) self.xl = pd.ExcelFile(filepath)
except ValueError as e: except ValueError as e:
logger.error(f"Incorrect value: {e}") logger.error(f"Incorrect value: {e}")
self.xl = None self.xl = None
@@ -95,7 +95,7 @@ class SheetParser(object):
submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object) submission_info = self.xl.parse(sheet_name=sheet_name, dtype=object)
self.sub['submitter_plate_num'] = submission_info.iloc[0][1] self.sub['submitter_plate_num'] = submission_info.iloc[0][1]
if check_not_nan(submission_info.iloc[10][1]): if check_not_nan(submission_info.iloc[10][1]):
self.sub['rsl_plate_num'] = RSLNamer(submission_info.iloc[10][1]).parsed_name self.sub['rsl_plate_num'] = RSLNamer(ctx=self.ctx, instr=submission_info.iloc[10][1]).parsed_name
else: else:
# self.sub['rsl_plate_num'] = RSLNamer(self.filepath).parsed_name # self.sub['rsl_plate_num'] = RSLNamer(self.filepath).parsed_name
self.sub['rsl_plate_num'] = None self.sub['rsl_plate_num'] = None
@@ -103,6 +103,10 @@ class SheetParser(object):
self.sub['submitting_lab'] = submission_info.iloc[0][3] self.sub['submitting_lab'] = submission_info.iloc[0][3]
self.sub['sample_count'] = submission_info.iloc[2][3] self.sub['sample_count'] = submission_info.iloc[2][3]
self.sub['extraction_kit'] = submission_info.iloc[3][3] self.sub['extraction_kit'] = submission_info.iloc[3][3]
if check_not_nan(submission_info.iloc[1][3]):
self.sub['submission_type'] = dict(value=submission_info.iloc[1][3], parsed=True)
else:
self.sub['submission_type'] = dict(value=self.sub['submission_type'], parsed=False)
return submission_info return submission_info
def parse_bacterial_culture(self) -> None: def parse_bacterial_culture(self) -> None:
@@ -170,7 +174,7 @@ class SheetParser(object):
parse_reagents(reagent_range) parse_reagents(reagent_range)
# get individual sample info # get individual sample info
sample_parser = SampleParser(self.ctx, submission_info.iloc[16:112]) sample_parser = SampleParser(self.ctx, submission_info.iloc[16:112])
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].replace(' ', '_').lower()}_samples")
logger.debug(f"Parser result: {self.sub}") logger.debug(f"Parser result: {self.sub}")
self.sample_result, self.sub['samples'] = sample_parse() self.sample_result, self.sub['samples'] = sample_parse()
@@ -234,14 +238,18 @@ class SheetParser(object):
# set qpcr reagent range # set qpcr reagent range
pcr_reagent_range = qprc_info.iloc[0:5, 9:20] pcr_reagent_range = qprc_info.iloc[0:5, 9:20]
# compile technician info from all sheets # compile technician info from all sheets
self.sub['technician'] = f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}" if all(map(check_not_nan, [enrichment_info.columns[2], extraction_info.columns[2], qprc_info.columns[2]])):
parsed = True
else:
parsed = False
self.sub['technician'] = dict(value=f"Enr: {enrichment_info.columns[2]}, Ext: {extraction_info.columns[2]}, PCR: {qprc_info.columns[2]}", parsed=parsed)
self.sub['reagents'] = [] self.sub['reagents'] = []
parse_reagents(enr_reagent_range) parse_reagents(enr_reagent_range)
parse_reagents(ext_reagent_range) parse_reagents(ext_reagent_range)
parse_reagents(pcr_reagent_range) parse_reagents(pcr_reagent_range)
# parse samples # parse samples
sample_parser = SampleParser(self.ctx, submission_info.iloc[16:], elution_map=retrieve_elution_map()) sample_parser = SampleParser(self.ctx, submission_info.iloc[16:], elution_map=retrieve_elution_map())
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].lower()}_samples")
self.sample_result, self.sub['samples'] = sample_parse() self.sample_result, self.sub['samples'] = sample_parse()
self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object) self.sub['csv'] = self.xl.parse("Copy to import file", dtype=object)
@@ -249,6 +257,7 @@ class SheetParser(object):
""" """
pulls info specific to wastewater_arctic submission type pulls info specific to wastewater_arctic submission type
""" """
self.sub['submission_type'] = dict(value=self.sub['submission_type'], parsed=True)
def parse_reagents(df:pd.DataFrame): def parse_reagents(df:pd.DataFrame):
logger.debug(df) logger.debug(df)
for ii, row in df.iterrows(): for ii, row in df.iterrows():
@@ -306,7 +315,7 @@ class SheetParser(object):
sub_reagent_range = submission_info.iloc[56:, 1:4].dropna(how='all') sub_reagent_range = submission_info.iloc[56:, 1:4].dropna(how='all')
biomek_reagent_range = biomek_info.iloc[60:, 0:3].dropna(how='all') biomek_reagent_range = biomek_info.iloc[60:, 0:3].dropna(how='all')
self.sub['submitter_plate_num'] = "" self.sub['submitter_plate_num'] = ""
self.sub['rsl_plate_num'] = RSLNamer(self.filepath.__str__()).parsed_name self.sub['rsl_plate_num'] = RSLNamer(ctx=self.ctx, instr=self.filepath.__str__()).parsed_name
self.sub['submitted_date'] = biomek_info.iloc[1][1] self.sub['submitted_date'] = biomek_info.iloc[1][1]
self.sub['submitting_lab'] = "Enterics Wastewater Genomics" self.sub['submitting_lab'] = "Enterics Wastewater Genomics"
self.sub['sample_count'] = submission_info.iloc[4][6] self.sub['sample_count'] = submission_info.iloc[4][6]
@@ -317,7 +326,7 @@ class SheetParser(object):
parse_reagents(biomek_reagent_range) parse_reagents(biomek_reagent_range)
samples = massage_samples(biomek_info.iloc[22:31, 0:]) samples = massage_samples(biomek_info.iloc[22:31, 0:])
sample_parser = SampleParser(self.ctx, pd.DataFrame.from_records(samples)) sample_parser = SampleParser(self.ctx, pd.DataFrame.from_records(samples))
sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type'].lower()}_samples") sample_parse = getattr(sample_parser, f"parse_{self.sub['submission_type']['value'].lower()}_samples")
self.sample_result, self.sub['samples'] = sample_parse() self.sample_result, self.sub['samples'] = sample_parse()
def to_pydantic(self) -> PydSubmission: def to_pydantic(self) -> PydSubmission:
@@ -497,7 +506,7 @@ class PCRParser(object):
return return
# self.pcr = OrderedDict() # self.pcr = OrderedDict()
self.pcr = {} self.pcr = {}
namer = RSLNamer(filepath.__str__()) namer = RSLNamer(ctx=self.ctx, instr=filepath.__str__())
self.plate_num = namer.parsed_name self.plate_num = namer.parsed_name
self.submission_type = namer.submission_type self.submission_type = namer.submission_type
logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}") logger.debug(f"Set plate number to {self.plate_num} and type to {self.submission_type}")

View File

@@ -47,14 +47,14 @@ class PydReagent(BaseModel):
class PydSubmission(BaseModel, extra=Extra.allow): class PydSubmission(BaseModel, extra=Extra.allow):
ctx: dict ctx: dict
filepath: Path filepath: Path
submission_type: str submission_type: str|dict|None
submitter_plate_num: str|None submitter_plate_num: str|None
rsl_plate_num: str|dict|None rsl_plate_num: str|dict|None
submitted_date: date submitted_date: date
submitting_lab: str|None submitting_lab: str|None
sample_count: int sample_count: int
extraction_kit: str|dict|None extraction_kit: str|dict|None
technician: str|None technician: str|dict|None
reagents: List[PydReagent] = [] reagents: List[PydReagent] = []
samples: List[Any] samples: List[Any]
# missing_fields: List[str] = [] # missing_fields: List[str] = []
@@ -91,14 +91,20 @@ class PydSubmission(BaseModel, extra=Extra.allow):
else: else:
return value return value
else: else:
logger.debug(f"Pydant values:{type(values)}\n{values}") # logger.debug(f"Pydant values:{type(values)}\n{values}")
return dict(value=RSLNamer(values.data['filepath'].__str__()).parsed_name, parsed=False) return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).parsed_name, parsed=False)
@field_validator("technician") @field_validator("technician", mode="before")
@classmethod @classmethod
def enforce_tech(cls, value): def enforce_tech(cls, value):
if value == "nan" or value == "None": if check_not_nan(value):
value = "Unknown" if isinstance(value, dict):
value['value'] = re.sub(r"\: \d", "", value['value'])
return value
else:
return dict(value=re.sub(r"\: \d", "", value), parsed=True)
else:
return dict(value="Unnamed", parsed=False)
return value return value
@field_validator("reagents") @field_validator("reagents")
@@ -140,6 +146,19 @@ class PydSubmission(BaseModel, extra=Extra.allow):
return dict(value=dlg.getValues(), parsed=False) return dict(value=dlg.getValues(), parsed=False)
else: else:
raise ValueError("Extraction kit needed.") raise ValueError("Extraction kit needed.")
@field_validator("submission_type", mode='before')
@classmethod
def make_submission_type(cls, value, values):
if check_not_nan(value):
if isinstance(value, dict):
value['value'] = value['value'].title()
return value
elif isinstance(value, str):
return dict(value=value.title(), parsed=False)
else:
return dict(value=RSLNamer(ctx=values.data['ctx'], instr=values.data['filepath'].__str__()).submission_type.title(), parsed=False)
# @model_validator(mode="after") # @model_validator(mode="after")
# def ensure_kit(cls, values): # def ensure_kit(cls, values):

View File

@@ -35,7 +35,10 @@ def select_save_file(obj:QMainWindow, default_name:str, extension:str) -> Path:
Returns: Returns:
Path: Path of file to be opened Path: Path of file to be opened
""" """
home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__() try:
home_dir = Path(obj.ctx["directory_path"]).joinpath(default_name).resolve().__str__()
except FileNotFoundError:
home_dir = Path.home().resolve().__str__()
fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0]) fname = Path(QFileDialog.getSaveFileName(obj, "Save File", home_dir, filter = f"{extension}(*.{extension})")[0])
return fname return fname

View File

@@ -314,7 +314,12 @@ class SubmissionDetails(QDialog):
self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8') self.base_dict['platemap'] = base64.b64encode(image_io.getvalue()).decode('utf-8')
logger.debug(self.base_dict) logger.debug(self.base_dict)
html = template.render(sub=self.base_dict) html = template.render(sub=self.base_dict)
home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__() with open("test.html", "w") as f:
f.write(html)
try:
home_dir = Path(self.ctx["directory_path"]).joinpath(f"Submission_Details_{self.base_dict['Plate Number']}.pdf").resolve().__str__()
except FileNotFoundError:
home_dir = Path.home().resolve().__str__()
fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0]) fname = Path(QFileDialog.getSaveFileName(self, "Save File", home_dir, filter=".pdf")[0])
if fname.__str__() == ".": if fname.__str__() == ".":
logger.debug("Saving pdf was cancelled.") logger.debug("Saving pdf was cancelled.")

View File

@@ -87,7 +87,7 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# logger.debug(f"Sample from import: {sample.elution_well}") # logger.debug(f"Sample from import: {sample.elution_well}")
# I don't remember why this is here. # I don't remember why this is here.
missing_info = [k for k,v in pyd if v == None] missing_info = [k for k,v in pyd if v == None]
obj.current_submission_type = pyd.submission_type obj.current_submission_type = pyd.submission_type['value']
# destroy any widgets from previous imports # destroy any widgets from previous imports
for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget): for item in obj.table_widget.formlayout.parentWidget().findChildren(QWidget):
item.setParent(None) item.setParent(None)
@@ -98,9 +98,12 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
for field in fields: for field in fields:
value = getattr(pyd, field) value = getattr(pyd, field)
logger.debug(f"Checking: {field}: {value}") logger.debug(f"Checking: {field}: {value}")
# No longer necessary with addition of pydantic validations # Get from pydantic model whether field was completed in the form
# if not check_not_nan(value): if isinstance(value, dict) and field != 'ctx':
# continue logger.debug(f"The field {field} is a dictionary: {value}")
if not value['parsed']:
missing_info.append(field)
value = value['value']
match field: match field:
case 'submitting_lab': case 'submitting_lab':
# create label # create label
@@ -120,19 +123,18 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
# create label # create label
label = QLabel(field.replace("_", " ").title()) label = QLabel(field.replace("_", " ").title())
# if extraction kit not available, all other values fail # if extraction kit not available, all other values fail
if not check_not_nan(value['value']): if not check_not_nan(value):
msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning") msg = AlertPop(message="Make sure to check your extraction kit in the excel sheet!", status="warning")
msg.exec() msg.exec()
if not value['parsed']:
missing_info.append(field)
# create combobox to hold looked up kits # create combobox to hold looked up kits
add_widget = QComboBox() add_widget = QComboBox()
# lookup existing kits by 'submission_type' decided on by sheetparser # lookup existing kits by 'submission_type' decided on by sheetparser
uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type)] uses = [item.__str__() for item in lookup_kittype_by_use(ctx=obj.ctx, used_by=pyd.submission_type['value'].lower())]
logger.debug(f"Kits received for {pyd.submission_type}: {uses}")
if check_not_nan(value): if check_not_nan(value):
logger.debug(f"The extraction kit in parser was: {value['value']}") logger.debug(f"The extraction kit in parser was: {value}")
uses.insert(0, uses.pop(uses.index(value['value']))) uses.insert(0, uses.pop(uses.index(value)))
obj.ext_kit = value['value'] obj.ext_kit = value
else: else:
logger.error(f"Couldn't find {prsr.sub['extraction_kit']}") logger.error(f"Couldn't find {prsr.sub['extraction_kit']}")
obj.ext_kit = uses[0] obj.ext_kit = uses[0]
@@ -173,13 +175,11 @@ def import_submission_function(obj:QMainWindow) -> Tuple[QMainWindow, dict|None]
obj.table_widget.formlayout.addWidget(add_widget) obj.table_widget.formlayout.addWidget(add_widget)
obj.reagents[reagent.type] = reagent obj.reagents[reagent.type] = reagent
continue continue
case "rsl_plate_num": # case "rsl_plate_num":
if not value['parsed']: # label = QLabel(field.replace("_", " ").title())
missing_info.append(field) # add_widget = QLineEdit()
label = QLabel(field.replace("_", " ").title()) # logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}")
add_widget = QLineEdit() # add_widget.setText(str(value['value']).replace("_", " "))
logger.debug(f"Setting widget text to {str(value['value']).replace('_', ' ')}")
add_widget.setText(str(value['value']).replace("_", " "))
case _: case _:
# anything else gets added in as a line edit # anything else gets added in as a line edit
label = QLabel(field.replace("_", " ").title()) label = QLabel(field.replace("_", " ").title())
@@ -352,8 +352,8 @@ def submit_new_sample_function(obj:QMainWindow) -> Tuple[QMainWindow, dict]:
logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.") logger.debug(f"We have blank reagents in the excel sheet.\n\tLet's try to fill them in.")
extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit) extraction_kit = lookup_kittype_by_name(obj.ctx, name=obj.ext_kit)
logger.debug(f"We have the extraction kit: {extraction_kit.name}") logger.debug(f"We have the extraction kit: {extraction_kit.name}")
logger.debug(f"Extraction kit map:\n\n{extraction_kit.used_for[obj.current_submission_type]}") logger.debug(f"Extraction kit map:\n\n{extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')]}")
excel_map = extraction_kit.used_for[obj.current_submission_type] excel_map = extraction_kit.used_for[obj.current_submission_type.replace('_', ' ')]
input_reagents = [item.to_reagent_dict() for item in parsed_reagents] input_reagents = [item.to_reagent_dict() for item in parsed_reagents]
autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info) autofill_excel(obj=obj, xl_map=excel_map, reagents=input_reagents, missing_reagents=obj.missing_reagents, info=info)
if hasattr(obj, 'csv'): if hasattr(obj, 'csv'):
@@ -823,6 +823,7 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
xl_map (dict): Map of where each reagent goes in the excel workbook. xl_map (dict): Map of where each reagent goes in the excel workbook.
reagents (List[dict]): All reagents used in the kit. reagents (List[dict]): All reagents used in the kit.
missing_reagents (List[str]): Reagents that are required for the kit that were not present. missing_reagents (List[str]): Reagents that are required for the kit that were not present.
info (dict): Dictionary of misc info from submission
""" """
# logger.debug(reagents) # logger.debug(reagents)
@@ -831,6 +832,7 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
relevant_map = {k:v for k,v in xl_map.items() if k in missing_reagents} relevant_map = {k:v for k,v in xl_map.items() if k in missing_reagents}
# logger.debug(relevant_map) # logger.debug(relevant_map)
relevant_reagents = [item for item in reagents if item['type'] in missing_reagents] relevant_reagents = [item for item in reagents if item['type'] in missing_reagents]
info['submission_type'] = info['submission_type'].replace("_", " ").title()
relevant_info = {k:v for k,v in info.items() if k in missing_reagents} relevant_info = {k:v for k,v in info.items() if k in missing_reagents}
logger.debug(f"Here is the relevant info: {pprint.pformat(relevant_info)}") logger.debug(f"Here is the relevant info: {pprint.pformat(relevant_info)}")
# logger.debug(f"Relevant reagents:\n{relevant_reagents}") # logger.debug(f"Relevant reagents:\n{relevant_reagents}")
@@ -844,6 +846,11 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
new_reagent['expiry'] = relevant_map[new_reagent['type']]['expiry'] new_reagent['expiry'] = relevant_map[new_reagent['type']]['expiry']
new_reagent['expiry']['value'] = reagent['expiry'] new_reagent['expiry']['value'] = reagent['expiry']
new_reagent['sheet'] = relevant_map[new_reagent['type']]['sheet'] new_reagent['sheet'] = relevant_map[new_reagent['type']]['sheet']
try:
new_reagent['name'] = relevant_map[new_reagent['type']]['name']
new_reagent['name']['value'] = reagent['type']
except:
pass
new_reagents.append(new_reagent) new_reagents.append(new_reagent)
new_info = [] new_info = []
for item in relevant_info: for item in relevant_info:
@@ -858,14 +865,20 @@ def autofill_excel(obj:QMainWindow, xl_map:dict, reagents:List[dict], missing_re
logger.debug(workbook.sheetnames) logger.debug(workbook.sheetnames)
for sheet in sheets: for sheet in sheets:
worksheet=workbook[sheet] worksheet=workbook[sheet]
sheet_reagents = [item for item in new_reagents if item['sheet'] == sheet] sheet_reagents = [item for item in new_reagents if sheet in item['sheet']]
for reagent in sheet_reagents: for reagent in sheet_reagents:
logger.debug(f"Attempting: {reagent['type']}:") logger.debug(f"Attempting: {reagent['type']}:")
worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value']) worksheet.cell(row=reagent['lot']['row'], column=reagent['lot']['column'], value=reagent['lot']['value'])
worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value']) worksheet.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column'], value=reagent['expiry']['value'])
sheet_info = [item for item in new_info if item['location']['sheet'] == sheet] try:
worksheet.cell(row=reagent['name']['row'], column=reagent['name']['column'], value=reagent['name']['value'].replace("_", " ").upper())
except:
pass
sheet_info = [item for item in new_info if sheet in item['location']['sheets']]
for item in sheet_info: for item in sheet_info:
logger.debug(f"Attempting: {item['type']}") logger.debug(f"Attempting: {item['type']}")
worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value']) worksheet.cell(row=item['location']['row'], column=item['location']['column'], value=item['value'])
fname = select_save_file(obj=obj, default_name=Path(obj.xl).stem, extension="xlsx") if info['submission_type'] == "Bacterial Culture":
workbook["Sample List"].cell(row=14, column=2, value=getuser())
fname = select_save_file(obj=obj, default_name=info['rsl_plate_num'], extension="xlsx")
workbook.save(filename=fname.__str__()) workbook.save(filename=fname.__str__())

View File

@@ -10,6 +10,7 @@ import getpass
from backend.db.models import BasicSubmission, KitType from backend.db.models import BasicSubmission, KitType
import pandas as pd import pandas as pd
from typing import Tuple from typing import Tuple
from datetime import datetime
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -25,13 +26,16 @@ def check_not_nan(cell_contents) -> bool:
""" """
# check for nan as a string first # check for nan as a string first
try: try:
if "Unnamed:" in cell_contents:
cell_contents = np.nan
cell_contents = cell_contents.lower() cell_contents = cell_contents.lower()
except AttributeError: except (TypeError, AttributeError):
pass pass
if cell_contents == 'nan': if cell_contents == 'nan':
cell_contents = np.nan cell_contents = np.nan
if cell_contents == None: if cell_contents == None:
cell_contents = np.nan cell_contents = np.nan
try: try:
if pd.isnull(cell_contents): if pd.isnull(cell_contents):
cell_contents = np.nan cell_contents = np.nan
@@ -197,7 +201,8 @@ class RSLNamer(object):
""" """
Object that will enforce proper formatting on RSL plate names. Object that will enforce proper formatting on RSL plate names.
""" """
def __init__(self, instr:str): def __init__(self, ctx:dict, instr:str):
self.ctx = ctx
self.retrieve_rsl_number(in_str=instr) self.retrieve_rsl_number(in_str=instr)
if self.submission_type != None: if self.submission_type != None:
parser = getattr(self, f"enforce_{self.submission_type}") parser = getattr(self, f"enforce_{self.submission_type}")
@@ -205,7 +210,7 @@ class RSLNamer(object):
self.parsed_name = self.parsed_name.replace("_", "-") self.parsed_name = self.parsed_name.replace("_", "-")
def retrieve_rsl_number(self, in_str:str|Path) -> Tuple[str, str]: def retrieve_rsl_number(self, in_str:str|Path):
""" """
Uses regex to retrieve the plate number and submission type from an input string Uses regex to retrieve the plate number and submission type from an input string
@@ -215,37 +220,57 @@ class RSLNamer(object):
Returns: Returns:
Tuple[str, str]: tuple of (output rsl number, submission_type) Tuple[str, str]: tuple of (output rsl number, submission_type)
""" """
if isinstance(in_str, Path): if not isinstance(in_str, Path):
in_str = in_str.stem in_str = Path(in_str)
logger.debug(f"Attempting split of {in_str}") out_str = in_str.stem
try: # else:
in_str = in_str.split("\\")[-1] # in_str = Path(in_str)
except AttributeError: # logger.debug(f"Attempting split of {in_str}")
self.parsed_name = None # try:
self.submission_type = None # out_str = in_str.split("\\")[-1]
return # except AttributeError:
logger.debug(f"Attempting match of {in_str}") # self.parsed_name = None
logger.debug(f"The initial plate name is: {in_str}") # self.submission_type = None
# return
logger.debug(f"Attempting match of {out_str}")
logger.debug(f"The initial plate name is: {out_str}")
regex = re.compile(r""" regex = re.compile(r"""
# (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)| # (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(?:_|-)\d?((?!\d)|R)?\d(?!\d))?)|
(?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)| (?P<wastewater>RSL(?:-|_)?WW(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)|
(?P<bacterial_culture>RSL-?\d{2}-?\d{4})| (?P<bacterial_culture>RSL-?\d{2}-?\d{4})|
(?P<wastewater_artic>(\d{4}-\d{2}-\d{2}_(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?)) (?P<wastewater_artic>(\d{4}-\d{2}-\d{2}_(?:\d_)?artic)|(RSL(?:-|_)?AR(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)\d?(\D|$)R?\d?)?))
""", flags = re.IGNORECASE | re.VERBOSE) """, flags = re.IGNORECASE | re.VERBOSE)
m = regex.search(in_str) m = regex.search(out_str)
try: if m != None:
self.parsed_name = m.group().upper().strip(".") try:
logger.debug(f"Got parsed submission name: {self.parsed_name}") self.parsed_name = m.group().upper().strip(".")
self.submission_type = m.lastgroup logger.debug(f"Got parsed submission name: {self.parsed_name}")
except AttributeError as e: self.submission_type = m.lastgroup
logger.critical("No RSL plate number found or submission type found!") except AttributeError as e:
logger.debug(f"The cause of the above error was: {e}") logger.critical("No RSL plate number found or submission type found!")
logger.debug(f"The cause of the above error was: {e}")
else:
logger.warning(f"We're going to have to create the submission type from the excel sheet properties...")
if in_str.exists():
my_xl = pd.ExcelFile(in_str)
if my_xl.book.properties.category != None:
categories = [item.strip().title() for item in my_xl.book.properties.category.split(";")]
self.submission_type = categories[0].replace(" ", "_").lower()
else:
raise AttributeError(f"File {in_str.__str__()} has no categories.")
else:
raise FileNotFoundError()
def enforce_wastewater(self): def enforce_wastewater(self):
""" """
Uses regex to enforce proper formatting of wastewater samples Uses regex to enforce proper formatting of wastewater samples
""" """
self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name) try:
self.parsed_name = re.sub(r"PCR(-|_)", "", self.parsed_name)
except AttributeError as e:
self.parsed_name = self.construct_wastewater()
self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW") self.parsed_name = self.parsed_name.replace("RSLWW", "RSL-WW")
self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE) self.parsed_name = re.sub(r"WW(\d{4})", r"WW-\1", self.parsed_name, flags=re.IGNORECASE)
self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name) self.parsed_name = re.sub(r"(\d{4})-(\d{2})-(\d{2})", r"\1\2\3", self.parsed_name)
@@ -267,13 +292,55 @@ class RSLNamer(object):
self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "") self.parsed_name = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", self.parsed_name).replace(" ", "")
def construct_wastewater(self):
today = datetime.now()
return f"RSL-WW-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}"
def enforce_bacterial_culture(self): def enforce_bacterial_culture(self):
""" """
Uses regex to enforce proper formatting of bacterial culture samples Uses regex to enforce proper formatting of bacterial culture samples
""" """
self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE) try:
self.parsed_name = re.sub(r"RSL(\d{2})", r"RSL-\1", self.parsed_name, flags=re.IGNORECASE)
except AttributeError as e:
self.parsed_name = self.construct_bacterial_culture_rsl()
# year = datetime.now().year
# self.parsed_name = f"RSL-{str(year)[-2:]}-0000"
self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE) self.parsed_name = re.sub(r"RSL-(\d{2})(\d{4})", r"RSL-\1-\2", self.parsed_name, flags=re.IGNORECASE)
def construct_bacterial_culture_rsl(self) -> str:
"""
DEPRECIATED due to slowness. Search for the largest rsl number and increment by 1
Returns:
str: new RSL number
"""
logger.debug(f"Attempting to construct RSL number from scratch...")
directory = Path(self.ctx['directory_path']).joinpath("Bacteria")
year = str(datetime.now().year)[-2:]
if directory.exists():
logger.debug(f"Year: {year}")
relevant_rsls = []
all_xlsx = [item.stem for item in directory.rglob("*.xlsx") if bool(re.search(r"RSL-\d{2}-\d{4}", item.stem)) and year in item.stem[4:6]]
logger.debug(f"All rsls: {all_xlsx}")
for item in all_xlsx:
try:
relevant_rsls.append(re.match(r"RSL-\d{2}-\d{4}", item).group(0))
except Exception as e:
logger.error(f"Regex error: {e}")
continue
logger.debug(f"Initial xlsx: {relevant_rsls}")
max_number = max([int(item[-4:]) for item in relevant_rsls])
logger.debug(f"The largest sample number is: {max_number}")
return f"RSL-{year}-{str(max_number+1).zfill(4)}"
else:
# raise FileNotFoundError(f"Unable to locate the directory: {directory.__str__()}")
return f"RSL-{year}-0000"
def enforce_wastewater_artic(self): def enforce_wastewater_artic(self):
""" """
Uses regex to enforce proper formatting of wastewater samples Uses regex to enforce proper formatting of wastewater samples