Removed Kittype.

This commit is contained in:
lwark
2025-08-12 11:14:33 -05:00
parent 087bf9bcb7
commit 6380f1e2a9
22 changed files with 1129 additions and 2231 deletions

View File

@@ -612,8 +612,8 @@ class BaseClass(Base):
relevant = {k: v for k, v in self.__class__.__dict__.items() if
isinstance(v, InstrumentedAttribute) or isinstance(v, AssociationProxy)}
output = OrderedDict()
output['excluded'] = ["excluded", "misc_info", "_misc_info", "id"]
# output = OrderedDict()
output = dict(excluded = ["excluded", "misc_info", "_misc_info", "id"])
for k, v in relevant.items():
try:
check = v.foreign_keys
@@ -625,11 +625,16 @@ class BaseClass(Base):
value = getattr(self, k)
except AttributeError:
continue
# try:
# logger.debug(f"Setting {k} to {value} for details dict.")
# except AttributeError as e:
# logger.error(f"Can't log {k} value due to {type(e)}")
# continue
output[k.strip("_")] = value
if self._misc_info:
for key, value in self._misc_info.items():
# logger.debug(f"Misc info key {key}")
output[key] = value
return output
@classmethod
@@ -750,7 +755,7 @@ class ConfigItem(BaseClass):
from .controls import *
# NOTE: import order must go: orgs, kittype, run due to circular import issues
from .organizations import *
from .kits import *
from .procedures import *
from .submissions import *
from .audit import AuditLog

View File

@@ -20,9 +20,7 @@ from pandas import DataFrame
from sqlalchemy.ext.hybrid import hybrid_property
from frontend.widgets.functions import select_save_file
from . import Base, BaseClass, Reagent, SubmissionType, KitType, ClientLab, Contact, LogMixin, Procedure, \
kittype_procedure
from . import Base, BaseClass, Reagent, SubmissionType, ClientLab, Contact, LogMixin, Procedure
from sqlalchemy import Column, String, TIMESTAMP, INTEGER, ForeignKey, JSON, FLOAT, case, func, Table, Sequence
from sqlalchemy.orm import relationship, validates, Query
from sqlalchemy.orm.attributes import flag_modified
@@ -42,7 +40,7 @@ from jinja2 import Template
from PIL import Image
if TYPE_CHECKING:
from backend.db.models.kits import ProcedureType, Procedure
from backend.db.models.procedures import ProcedureType, Procedure
logger = logging.getLogger(f"submissions.{__name__}")
@@ -871,8 +869,8 @@ class Run(BaseClass, LogMixin):
value (_type_): value of attribute
"""
match key:
case "kittype":
field_value = KitType.query(name=value)
# case "kittype":
# field_value = KitType.query(name=value)
case "clientlab":
field_value = ClientLab.query(name=value)
case "contact":
@@ -1241,25 +1239,12 @@ class Run(BaseClass, LogMixin):
def add_procedure(self, obj, proceduretype_name: str):
from frontend.widgets.procedure_creation import ProcedureCreation
procedure_type = next(
procedure_type: ProcedureType = next(
(proceduretype for proceduretype in self.allowed_procedures if proceduretype.name == proceduretype_name))
logger.debug(f"Got ProcedureType: {procedure_type}")
dlg = ProcedureCreation(parent=obj, procedure=procedure_type.construct_dummy_procedure(run=self))
if dlg.exec():
sql, _ = dlg.return_sql(new=True)
# logger.debug(f"Output run samples:\n{pformat(sql.run.sample)}")
# previous = [proc for proc in self.procedure if proc.proceduretype == procedure_type]
# repeats = len([proc for proc in previous if proc.repeat])
# if sql.repeat:
# repeats += 1
# if repeats > 0:
# suffix = f"-{str(len(previous))}R{repeats}"
# else:
# suffix = f"-{str(len(previous)+1)}"
# sql.name = f"{sql.repeat}{suffix}"
# else:
# suffix = f"-{str(len(previous)+1)}"
# sql.name = f"{self.name}-{proceduretype_name}{suffix}"
sql.save()
obj.set_data()

View File

@@ -2,7 +2,7 @@
Contains pandas and openpyxl convenience functions for interacting with excel workbooks
'''
from .parser import *
from backend.excel.parsers.clientsubmission_parser import *
from .reports import *
from .writer import *
# from .parser import *
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser, ClientSubmissionSampleParser
# from .reports import *
# from .writer import *

View File

@@ -1,698 +0,0 @@
"""
contains clientsubmissionparser objects for pulling values from client generated procedure sheets.
"""
import logging
from copy import copy
from getpass import getuser
from pprint import pformat
from typing import List
from openpyxl import load_workbook, Workbook
from pathlib import Path
from backend.db.models import *
from backend.validators import PydRun, RSLNamer
from collections import OrderedDict
from tools import check_not_nan, is_missing, check_key_or_attr
logger = logging.getLogger(f"submissions.{__name__}")
class SheetParser(object):
"""
object to pull and contain data from excel file
"""
def __init__(self, filepath: Path | None = None):
"""
Args:
filepath (Path | None, optional): file path to excel sheet. Defaults to None.
"""
logger.info(f"\n\nParsing {filepath.__str__()}\n\n")
match filepath:
case Path():
self.filepath = filepath
case str():
self.filepath = Path(filepath)
case _:
logger.error(f"No filepath given.")
raise ValueError("No filepath given.")
try:
self.xl = load_workbook(filepath, data_only=True)
except ValueError as e:
logger.error(f"Incorrect value: {e}")
raise FileNotFoundError(f"Couldn't parse file {self.filepath}")
self.sub = OrderedDict()
# NOTE: make decision about type of sample we have
self.sub['proceduretype'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath),
missing=True)
self.submission_type = SubmissionType.query(name=self.sub['proceduretype'])
self.sub_object = Run.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
# NOTE: grab the info map from the procedure type in database
self.parse_info()
self.import_kit_validation_check()
self.parse_reagents()
self.parse_samples()
self.parse_equipment()
self.parse_tips()
def parse_info(self):
"""
Pulls basic information from the excel sheet
"""
parser = InfoParser(xl=self.xl, submission_type=self.submission_type, sub_object=self.sub_object)
self.info_map = parser.info_map
# NOTE: in order to accommodate generic procedure types we have to check for the type in the excel sheet and rerun accordingly
try:
check = parser.parsed_info['proceduretype']['value'] not in [None, "None", "", " "]
except KeyError as e:
logger.error(f"Couldn't check procedure type due to KeyError: {e}")
return
logger.info(
f"Checking for updated procedure type: {self.submission_type.name} against new: {parser.parsed_info['proceduretype']['value']}")
if self.submission_type.name != parser.parsed_info['proceduretype']['value']:
if check:
# NOTE: If initial procedure type doesn't match parsed procedure type, defer to parsed procedure type.
self.submission_type = SubmissionType.query(name=parser.parsed_info['proceduretype']['value'])
logger.info(f"Updated self.proceduretype to {self.submission_type}. Rerunning parse.")
self.parse_info()
else:
self.submission_type = RSLNamer.retrieve_submission_type(filename=self.filepath)
self.parse_info()
for k, v in parser.parsed_info.items():
self.sub.__setitem__(k, v)
def parse_reagents(self, extraction_kit: str | None = None):
"""
Calls reagent clientsubmissionparser class to pull info from the excel sheet
Args:
extraction_kit (str | None, optional): Relevant extraction kittype for reagent map. Defaults to None.
"""
if extraction_kit is None:
extraction_kit = self.sub['kittype']
parser = ReagentParser(xl=self.xl, submission_type=self.submission_type,
extraction_kit=extraction_kit)
self.sub['reagents'] = parser.parsed_reagents
def parse_samples(self):
"""
Calls sample clientsubmissionparser to pull info from the excel sheet
"""
parser = SampleParser(xl=self.xl, submission_type=self.submission_type)
self.sub['sample'] = parser.parsed_samples
def parse_equipment(self):
"""
Calls equipment clientsubmissionparser to pull info from the excel sheet
"""
parser = EquipmentParser(xl=self.xl, submission_type=self.submission_type)
self.sub['equipment'] = parser.parsed_equipment
def parse_tips(self):
"""
Calls tips clientsubmissionparser to pull info from the excel sheet
"""
parser = TipParser(xl=self.xl, submission_type=self.submission_type)
self.sub['tips'] = parser.parsed_tips
def import_kit_validation_check(self):
"""
Enforce that the clientsubmissionparser has an extraction kittype
"""
if 'kittype' not in self.sub.keys() or not check_not_nan(self.sub['kittype']['value']):
from frontend.widgets.pop_ups import ObjectSelector
dlg = ObjectSelector(title="Kit Needed", message="At minimum a kittype is needed. Please select one.",
obj_type=KitType)
if dlg.exec():
self.sub['kittype'] = dict(value=dlg.parse_form(), missing=True)
else:
raise ValueError("Extraction kittype needed.")
else:
if isinstance(self.sub['kittype'], str):
self.sub['kittype'] = dict(value=self.sub['kittype'], missing=True)
def to_pydantic(self) -> PydRun:
"""
Generates a pydantic model of scraped data for validation
Returns:
PydSubmission: output pydantic model
"""
return PydRun(filepath=self.filepath, run_custom=True, **self.sub)
class InfoParser(object):
"""
Object to parse generic info from excel sheet.
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, sub_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sub_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if sub_object is None:
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
self.submission_type_obj = submission_type
self.submission_type = dict(value=self.submission_type_obj.name, missing=True)
self.sub_object = sub_object
self.xl = xl
@property
def info_map(self) -> dict:
"""
Gets location of basic info from the proceduretype object in the database.
Returns:
dict: Location map of all info for this procedure type
"""
# NOTE: Get the parse_info method from the procedure type specified
return self.sub_object.construct_info_map(submission_type=self.submission_type_obj, mode="read")
@property
def parsed_info(self) -> dict:
"""
Pulls basic info from the excel sheet.
Returns:
dict: key:value of basic info
"""
dicto = {}
# NOTE: This loop parses generic info
for sheet in self.xl.sheetnames:
ws = self.xl[sheet]
relevant = []
for k, v in self.info_map.items():
# NOTE: If the value is hardcoded put it in the dictionary directly. Ex. Artic kittype
if k == "custom":
continue
if isinstance(v, str):
dicto[k] = dict(value=v, missing=False)
continue
for location in v:
try:
check = location['sheet'] == sheet
except TypeError:
logger.warning(f"Location is likely a string, skipping")
dicto[k] = dict(value=location, missing=False)
check = False
if check:
new = location
new['name'] = k
relevant.append(new)
# NOTE: make sure relevant is not an empty list.
if not relevant:
continue
for item in relevant:
# NOTE: Get cell contents at this location
value = ws.cell(row=item['row'], column=item['column']).value
match item['name']:
case "proceduretype":
value, missing = is_missing(value)
value = value.title()
case "submitted_date":
value, missing = is_missing(value)
# NOTE: is field a JSON? Includes: Extraction info, PCR info, comment, custom
case thing if thing in self.sub_object.jsons:
value, missing = is_missing(value)
if missing: continue
value = dict(name=f"Parser_{sheet}", text=value, time=datetime.now())
try:
dicto[item['name']]['value'] += value
continue
except KeyError:
logger.error(f"New value for {item['name']}")
case _:
value, missing = is_missing(value)
if item['name'] not in dicto.keys():
try:
dicto[item['name']] = dict(value=value, missing=missing)
except (KeyError, IndexError):
continue
# NOTE: Return after running the clientsubmissionparser components held in procedure object.
return self.sub_object.custom_info_parser(input_dict=dicto, xl=self.xl, custom_fields=self.info_map['custom'])
class ReagentParser(object):
"""
Object to pull reagents from excel sheet.
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, extraction_kit: str,
run_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str|SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (str): Extraction kittype used.
run_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type_obj = submission_type
if not run_object:
run_object = submission_type.submission_class
self.run_object = run_object
if isinstance(extraction_kit, dict):
extraction_kit = extraction_kit['value']
self.kit_object = KitType.query(name=extraction_kit)
self.xl = xl
@property
def kit_map(self) -> dict:
"""
Gets location of kittype reagents from database
Args:
proceduretype (str): Name of procedure type.
Returns:
dict: locations of reagent info for the kittype.
"""
associations, self.kit_object = self.kit_object.construct_xl_map_for_use(
proceduretype=self.submission_type_obj)
reagent_map = {k: v for k, v in associations.items() if k != 'info'}
try:
del reagent_map['info']
except KeyError:
pass
return reagent_map
@property
def parsed_reagents(self) -> Generator[dict, None, None]:
"""
Extracts reagent information from the Excel form.
Returns:
List[PydReagent]: List of parsed reagents.
"""
for sheet in self.xl.sheetnames:
ws = self.xl[sheet]
relevant = {k.strip(): v for k, v in self.kit_map.items() if sheet in self.kit_map[k]['sheet']}
if not relevant:
continue
for item in relevant:
try:
reagent = relevant[item]
name = ws.cell(row=reagent['name']['row'], column=reagent['name']['column']).value
lot = ws.cell(row=reagent['lot']['row'], column=reagent['lot']['column']).value
expiry = ws.cell(row=reagent['expiry']['row'], column=reagent['expiry']['column']).value
if 'comment' in relevant[item].keys():
comment = ws.cell(row=reagent['comment']['row'], column=reagent['comment']['column']).value
else:
comment = ""
except (KeyError, IndexError):
yield dict(role=item.strip(), lot=None, expiry=None, name=None, comment="", missing=True)
# NOTE: If the cell is blank tell the PydReagent
if check_not_nan(lot):
missing = False
else:
missing = True
lot = str(lot)
try:
check = name.lower() != "not applicable"
except AttributeError:
logger.warning(f"name is not a string.")
check = True
if check:
yield dict(role=item.strip(), lot=lot, expiry=expiry, name=name, comment=comment,
missing=missing)
class SampleParser(object):
"""
Object to pull data for sample in excel sheet and construct individual sample objects
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType, sample_map: dict | None = None,
sub_object: Run | None = None) -> None:
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sample_map (dict | None, optional): Locations in database where sample are found. Defaults to None.
sub_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
self.samples = []
self.xl = xl
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type.name
self.submission_type_obj = submission_type
if sub_object is None:
logger.warning(
f"Sample clientsubmissionparser attempting to fetch procedure class with polymorphic identity: {self.submission_type}")
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = sub_object
self.sample_type = self.sub_object.get_default_info("sampletype", submission_type=submission_type)
self.samp_object = Sample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)
@property
def sample_map(self) -> dict:
"""
Gets info locations in excel book for procedure type.
Args:
proceduretype (str): procedure type
Returns:
dict: Info locations.
"""
return self.sub_object.construct_sample_map(submission_type=self.submission_type_obj)
@property
def plate_map_samples(self) -> List[dict]:
"""
Parse sample location/name from plate map
Returns:
List[dict]: List of sample ids and locations.
"""
invalids = [0, "0", "EMPTY"]
smap = self.sample_map['plate_map']
ws = self.xl[smap['sheet']]
plate_map_samples = []
for ii, row in enumerate(range(smap['start_row'], smap['end_row'] + 1), start=1):
for jj, column in enumerate(range(smap['start_column'], smap['end_column'] + 1), start=1):
id = str(ws.cell(row=row, column=column).value)
if check_not_nan(id):
if id not in invalids:
sample_dict = dict(id=id, row=ii, column=jj)
sample_dict['sampletype'] = self.sample_type
plate_map_samples.append(sample_dict)
else:
pass
else:
pass
return plate_map_samples
@property
def lookup_samples(self) -> List[dict]:
"""
Parse misc info from lookup table.
Returns:
List[dict]: List of basic sample info.
"""
lmap = self.sample_map['lookup_table']
ws = self.xl[lmap['sheet']]
lookup_samples = []
for ii, row in enumerate(range(lmap['start_row'], lmap['end_row'] + 1), start=1):
row_dict = {k: ws.cell(row=row, column=v).value for k, v in lmap['sample_columns'].items()}
try:
row_dict[lmap['merge_on_id']] = str(row_dict[lmap['merge_on_id']])
except KeyError:
pass
row_dict['sampletype'] = self.sample_type
row_dict['submission_rank'] = ii
try:
check = check_not_nan(row_dict[lmap['merge_on_id']])
except KeyError:
check = False
if check:
lookup_samples.append(self.samp_object.parse_sample(row_dict))
return lookup_samples
@property
def parsed_samples(self) -> Generator[dict, None, None]:
"""
Merges sample info from lookup table and plate map.
Returns:
List[dict]: Reconciled sample
"""
if not self.plate_map_samples or not self.lookup_samples:
logger.warning(f"No separate sample")
samples = self.lookup_samples or self.plate_map_samples
for new in samples:
if not check_key_or_attr(key='sample_id', interest=new, check_none=True):
new['sample_id'] = new['id']
new = self.sub_object.parse_samples(new)
try:
del new['id']
except KeyError:
pass
yield new
else:
merge_on_id = self.sample_map['lookup_table']['merge_on_id']
logger.info(f"Merging sample info using {merge_on_id}")
plate_map_samples = sorted(copy(self.plate_map_samples), key=itemgetter('id'))
lookup_samples = sorted(copy(self.lookup_samples), key=itemgetter(merge_on_id))
for ii, psample in enumerate(plate_map_samples):
# NOTE: See if we can do this the easy way and just use the same list index.
try:
check = psample['id'] == lookup_samples[ii][merge_on_id]
except (KeyError, IndexError):
check = False
if check:
new = lookup_samples[ii] | psample
lookup_samples[ii] = {}
else:
logger.warning(f"Match for {psample['id']} not direct, running search.")
searchables = [(jj, sample) for jj, sample in enumerate(lookup_samples)
if merge_on_id in sample.keys()]
jj, new = next(((jj, lsample | psample) for jj, lsample in searchables
if lsample[merge_on_id] == psample['id']), (-1, psample))
if jj >= 0:
lookup_samples[jj] = {}
if not check_key_or_attr(key='sample_id', interest=new, check_none=True):
new['sample_id'] = psample['id']
new = self.sub_object.parse_samples(new)
try:
del new['id']
except KeyError:
pass
yield new
class EquipmentParser(object):
"""
Object to pull data for equipment in excel sheet
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType) -> None:
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
@property
def equipment_map(self) -> dict:
"""
Gets the map of equipment locations in the procedure type's spreadsheet
Returns:
List[dict]: List of locations
"""
return {k: v for k, v in self.submission_type.construct_field_map("equipment")}
def get_asset_number(self, input: str) -> str:
"""
Pulls asset number from string.
Args:
input (str): String to be scraped
Returns:
str: asset number
"""
regex = Equipment.manufacturer_regex
try:
return regex.search(input).group().strip("-")
except AttributeError as e:
logger.error(f"Error getting asset number for {input}: {e}")
return input
@property
def parsed_equipment(self) -> Generator[dict, None, None]:
"""
Scrapes equipment from xl sheet
Returns:
List[dict]: list of equipment
"""
for sheet in self.xl.sheetnames:
ws = self.xl[sheet]
try:
relevant = {k: v for k, v in self.equipment_map.items() if v['sheet'] == sheet}
except (TypeError, KeyError) as e:
logger.error(f"Error creating relevant equipment list: {e}")
continue
previous_asset = ""
for k, v in relevant.items():
asset = ws.cell(v['name']['row'], v['name']['column']).value
if not check_not_nan(asset):
asset = previous_asset
else:
previous_asset = asset
asset = self.get_asset_number(input=asset)
eq = Equipment.query(asset_number=asset)
if eq is None:
eq = Equipment.query(name=asset)
process = ws.cell(row=v['process']['row'], column=v['process']['column']).value
try:
yield dict(name=eq.name, processes=[process], role=k, asset_number=eq.asset_number,
nickname=eq.nickname)
except AttributeError:
logger.error(f"Unable to add {eq} to list.")
continue
class TipParser(object):
"""
Object to pull data for tips in excel sheet
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType) -> None:
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
@property
def tip_map(self) -> dict:
"""
Gets the map of equipment locations in the procedure type's spreadsheet
Returns:
List[dict]: List of locations
"""
return {k: v for k, v in self.submission_type.construct_field_map("tip")}
@property
def parsed_tips(self) -> Generator[dict, None, None]:
"""
Scrapes equipment from xl sheet
Returns:
List[dict]: list of equipment
"""
for sheet in self.xl.sheetnames:
ws = self.xl[sheet]
try:
relevant = {k: v for k, v in self.tip_map.items() if v['sheet'] == sheet}
except (TypeError, KeyError) as e:
logger.error(f"Error creating relevant equipment list: {e}")
continue
previous_asset = ""
for k, v in relevant.items():
asset = ws.cell(v['name']['row'], v['name']['column']).value
if "lot" in v.keys():
lot = ws.cell(v['lot']['row'], v['lot']['column']).value
else:
lot = None
if not check_not_nan(asset):
asset = previous_asset
else:
previous_asset = asset
eq = Tips.query(lot=lot, name=asset, limit=1)
try:
yield dict(name=eq.name, role=k, lot=lot)
except AttributeError:
logger.error(f"Unable to add {eq} to PydTips list.")
class PCRParser(object):
"""Object to pull data from Design and Analysis PCR export file."""
def __init__(self, filepath: Path | None = None, submission: Run | None = None) -> None:
"""
Args:
filepath (Path | None, optional): file to parse. Defaults to None.
submission (BasicRun | None, optional): Submission parsed data to be added to.
"""
if filepath is None:
logger.error('No filepath given.')
self.xl = None
else:
try:
self.xl = load_workbook(filepath)
except ValueError as e:
logger.error(f'Incorrect value: {e}')
self.xl = None
except PermissionError:
logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.")
return None
if submission is None:
self.submission_obj = Wastewater
rsl_plate_number = None
else:
self.submission_obj = submission
rsl_plate_number = self.submission_obj.rsl_plate_number
self.samples = self.submission_obj.parse_pcr(xl=self.xl, rsl_plate_number=rsl_plate_number)
self.controls = self.submission_obj.parse_pcr_controls(xl=self.xl, rsl_plate_number=rsl_plate_number)
@property
def pcr_info(self) -> dict:
"""
Parse general info rows for all types of PCR results
"""
info_map = self.submission_obj.get_submission_type().sample_map['pcr_general_info']
sheet = self.xl[info_map['sheet']]
iter_rows = sheet.iter_rows(min_row=info_map['start_row'], max_row=info_map['end_row'])
pcr = {}
for row in iter_rows:
try:
key = row[0].value.lower().replace(' ', '_')
except AttributeError as e:
logger.error(f"No key: {row[0].value} due to {e}")
continue
value = row[1].value or ""
pcr[key] = value
pcr['imported_by'] = getuser()
return pcr
class ConcentrationParser(object):
def __init__(self, filepath: Path | None = None, run: Run | None = None) -> None:
if filepath is None:
logger.error('No filepath given.')
self.xl = None
else:
try:
self.xl = load_workbook(filepath)
except ValueError as e:
logger.error(f'Incorrect value: {e}')
self.xl = None
except PermissionError:
logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.")
return None
if run is None:
self.submission_obj = Run()
rsl_plate_number = None
else:
self.submission_obj = run
rsl_plate_number = self.submission_obj.rsl_plate_number
self.samples = self.submission_obj.parse_concentration(xl=self.xl, rsl_plate_number=rsl_plate_number)
# NOTE: Generified parsers below
class InfoParserV2(object):
"""
Object for retrieving submitter info from sample list sheet
"""
default_range = dict(
start_row=2,
end_row=18,
start_column=7,
end_column=8,
sheet="Sample List"
)

View File

@@ -77,8 +77,10 @@ class SubmissionTyperMixin(object):
SubmissionType: The determined submissiontype
"""
from backend.db.models import SubmissionType
parser = ClientSubmissionInfoParser(filepath=filepath, submissiontype=SubmissionType.query(name="Test"))
parser = ClientSubmissionInfoParser(filepath=filepath, submissiontype=SubmissionType.query(name="Default"))
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype" or k == "submission_type"), None)
if isinstance(sub_type, dict):
sub_type = sub_type['value']
sub_type = SubmissionType.query(name=sub_type.title())
if isinstance(sub_type, list):
return

View File

@@ -1,500 +0,0 @@
"""
contains writer objects for pushing values to procedure sheet templates.
"""
import logging
from copy import copy
from datetime import datetime
from operator import itemgetter
from pprint import pformat
from typing import List, Generator, Tuple
from openpyxl import load_workbook, Workbook
from backend.db.models import SubmissionType, KitType, Run
from backend.validators.pydant import PydRun
from io import BytesIO
from collections import OrderedDict
logger = logging.getLogger(f"submissions.{__name__}")
class SheetWriter(object):
"""
object to manage data placement into excel file
"""
def __init__(self, submission: PydRun):
"""
Args:
submission (PydSubmission): Object containing procedure information.
"""
self.sub = OrderedDict(submission.improved_dict())
# NOTE: Set values from pydantic object.
for k, v in self.sub.items():
match k:
case 'filepath':
self.__setattr__(k, v)
case 'proceduretype':
self.sub[k] = v['value']
self.submission_type = SubmissionType.query(name=v['value'])
self.run_object = Run.find_polymorphic_subclass(
polymorphic_identity=self.submission_type)
case _:
if isinstance(v, dict):
self.sub[k] = v['value']
else:
self.sub[k] = v
template = self.submission_type.template_file
if not template:
logger.error(f"No template file found, falling back to Bacterial Culture")
template = SubmissionType.basic_template
workbook = load_workbook(BytesIO(template))
self.xl = workbook
self.write_info()
self.write_reagents()
self.write_samples()
self.write_equipment()
self.write_tips()
def write_info(self):
"""
Calls info writer
"""
disallowed = ['filepath', 'reagents', 'sample', 'equipment', 'control']
info_dict = {k: v for k, v in self.sub.items() if k not in disallowed}
writer = InfoWriter(xl=self.xl, submission_type=self.submission_type, info_dict=info_dict)
self.xl = writer.write_info()
def write_reagents(self):
"""
Calls reagent writer
"""
reagent_list = self.sub['reagents']
writer = ReagentWriter(xl=self.xl, submission_type=self.submission_type,
extraction_kit=self.sub['kittype'], reagent_list=reagent_list)
self.xl = writer.write_reagents()
def write_samples(self):
"""
Calls sample writer
"""
sample_list = self.sub['sample']
writer = SampleWriter(xl=self.xl, submission_type=self.submission_type, sample_list=sample_list)
self.xl = writer.write_samples()
def write_equipment(self):
"""
Calls equipment writer
"""
equipment_list = self.sub['equipment']
writer = EquipmentWriter(xl=self.xl, submission_type=self.submission_type, equipment_list=equipment_list)
self.xl = writer.write_equipment()
def write_tips(self):
"""
Calls tip writer
"""
tips_list = self.sub['tips']
writer = TipWriter(xl=self.xl, submission_type=self.submission_type, tips_list=tips_list)
self.xl = writer.write_tips()
class InfoWriter(object):
"""
object to write general procedure info into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, info_dict: dict,
sub_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
info_dict (dict): Dictionary of information to write.
sub_object (BasicRun | None, optional): Submission object containing methods. Defaults to None.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if sub_object is None:
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
self.submission_type = submission_type
self.sub_object = sub_object
self.xl = xl
self.info_map = submission_type.construct_info_map(mode='write')
self.info = self.reconcile_map(info_dict, self.info_map)
def reconcile_map(self, info_dict: dict, info_map: dict) -> Generator[(Tuple[str, dict]), None, None]:
"""
Merge info with its locations
Args:
info_dict (dict): dictionary of info items
info_map (dict): dictionary of info locations
Returns:
dict: merged dictionary
"""
for k, v in info_dict.items():
if v is None:
continue
if k == "custom":
continue
dicto = {}
try:
dicto['locations'] = info_map[k]
except KeyError:
pass
dicto['value'] = v
if len(dicto) > 0:
yield k, dicto
def write_info(self) -> Workbook:
"""
Performs write operations
Returns:
Workbook: workbook with info written.
"""
final_info = {}
for k, v in self.info:
match k:
case "custom":
continue
case "comment":
# NOTE: merge all comments to fit in single cell.
if isinstance(v['value'], list):
json_join = [item['text'] for item in v['value'] if 'text' in item.keys()]
v['value'] = "\n".join(json_join)
case thing if thing in self.sub_object.timestamps:
v['value'] = v['value'].date()
case _:
pass
final_info[k] = v
try:
locations = v['locations']
except KeyError:
logger.error(f"No locations for {k}, skipping")
continue
for loc in locations:
sheet = self.xl[loc['sheet']]
try:
# logger.debug(f"Writing {v['value']} to row {loc['row']} and column {loc['column']}")
sheet.cell(row=loc['row'], column=loc['column'], value=v['value'])
except AttributeError as e:
logger.error(f"Can't write {k} to that cell due to AttributeError: {e}")
except ValueError as e:
logger.error(f"Can't write {v} to that cell due to ValueError: {e}")
sheet.cell(row=loc['row'], column=loc['column'], value=v['value'].name)
return self.sub_object.custom_info_writer(self.xl, info=final_info, custom_fields=self.info_map['custom'])
class ReagentWriter(object):
"""
object to write reagent data into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, extraction_kit: KitType | str,
reagent_list: list):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (KitType | str): Extraction kittype used.
reagent_list (list): List of reagent dicts to be written to excel.
"""
self.xl = xl
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type_obj = submission_type
if isinstance(extraction_kit, str):
extraction_kit = KitType.query(name=extraction_kit)
self.kit_object = extraction_kit
associations, self.kit_object = self.kit_object.construct_xl_map_for_use(
proceduretype=self.submission_type_obj)
reagent_map = {k: v for k, v in associations.items()}
self.reagents = self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map)
def reconcile_map(self, reagent_list: List[dict], reagent_map: dict) -> Generator[dict, None, None]:
"""
Merge reagents with their locations
Args:
reagent_list (List[dict]): List of reagent dictionaries
reagent_map (dict): Reagent locations
Returns:
List[dict]: merged dictionary
"""
filled_roles = [item['reagentrole'] for item in reagent_list]
for map_obj in reagent_map.keys():
if map_obj not in filled_roles:
reagent_list.append(dict(name="Not Applicable", role=map_obj, lot="Not Applicable", expiry="Not Applicable"))
for reagent in reagent_list:
try:
mp_info = reagent_map[reagent['reagentrole']]
except KeyError:
continue
placeholder = copy(reagent)
for k, v in reagent.items():
try:
dicto = dict(value=v, row=mp_info[k]['row'], column=mp_info[k]['column'])
except KeyError as e:
logger.error(f"KeyError: {e}")
dicto = v
placeholder[k] = dicto
placeholder['sheet'] = mp_info['sheet']
yield placeholder
def write_reagents(self) -> Workbook:
"""
Performs write operations
Returns:
Workbook: Workbook with reagents written
"""
for reagent in self.reagents:
sheet = self.xl[reagent['sheet']]
for v in reagent.values():
if not isinstance(v, dict):
continue
match v['value']:
case datetime():
v['value'] = v['value'].date()
case _:
pass
sheet.cell(row=v['row'], column=v['column'], value=v['value'])
return self.xl
class SampleWriter(object):
"""
object to write sample data into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, sample_list: list):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sample_list (list): List of sample dictionaries to be written to excel file.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
self.sample_map = submission_type.sample_map['lookup_table']
# NOTE: exclude any sample without a procedure rank.
samples = [item for item in self.reconcile_map(sample_list) if item['submission_rank'] > 0]
self.samples = sorted(samples, key=itemgetter('submission_rank'))
self.blank_lookup_table()
def reconcile_map(self, sample_list: list) -> Generator[dict, None, None]:
"""
Merge sample info with locations
Args:
sample_list (list): List of sample information
Returns:
List[dict]: List of merged dictionaries
"""
multiples = ['row', 'column', 'assoc_id', 'submission_rank']
for sample in sample_list:
sample = self.submission_type.submission_class.custom_sample_writer(sample)
for assoc in zip(sample['row'], sample['column'], sample['submission_rank']):
new = dict(row=assoc[0], column=assoc[1], submission_rank=assoc[2])
for k, v in sample.items():
if k in multiples:
continue
new[k] = v
yield new
def blank_lookup_table(self):
"""
Blanks out columns in the lookup table to ensure help values are removed before writing.
"""
sheet = self.xl[self.sample_map['sheet']]
for row in range(self.sample_map['start_row'], self.sample_map['end_row'] + 1):
for column in self.sample_map['sample_columns'].values():
if sheet.cell(row, column).data_type != 'f':
sheet.cell(row=row, column=column, value="")
def write_samples(self) -> Workbook:
"""
Performs writing operations.
Returns:
Workbook: Workbook with sample written
"""
sheet = self.xl[self.sample_map['sheet']]
columns = self.sample_map['sample_columns']
for sample in self.samples:
row = self.sample_map['start_row'] + (sample['submission_rank'] - 1)
for k, v in sample.items():
if isinstance(v, dict):
try:
v = v['value']
except KeyError:
logger.error(f"Cant convert {v} to single string.")
try:
column = columns[k]
except KeyError:
continue
sheet.cell(row=row, column=column, value=v)
return self.xl
class EquipmentWriter(object):
"""
object to write equipment data into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, equipment_list: list):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
equipment_list (list): List of equipment dictionaries to write to excel file.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
equipment_map = {k: v for k, v in self.submission_type.construct_field_map("equipment")}
self.equipment = self.reconcile_map(equipment_list=equipment_list, equipment_map=equipment_map)
def reconcile_map(self, equipment_list: list, equipment_map: dict) -> Generator[dict, None, None]:
"""
Merges equipment with location data
Args:
equipment_list (list): List of equipment
equipment_map (dict): Dictionary of equipment locations
Returns:
List[dict]: List of merged dictionaries
"""
if equipment_list is None:
return
for ii, equipment in enumerate(equipment_list, start=1):
try:
mp_info = equipment_map[equipment['reagentrole']]
except KeyError:
logger.error(f"No {equipment['reagentrole']} in {pformat(equipment_map)}")
mp_info = None
placeholder = copy(equipment)
if not mp_info:
for jj, (k, v) in enumerate(equipment.items(), start=1):
dicto = dict(value=v, row=ii, column=jj)
placeholder[k] = dicto
else:
for jj, (k, v) in enumerate(equipment.items(), start=1):
try:
dicto = dict(value=v, row=mp_info[k]['row'], column=mp_info[k]['column'])
except KeyError as e:
continue
placeholder[k] = dicto
if "asset_number" not in mp_info.keys():
placeholder['name']['value'] = f"{equipment['name']} - {equipment['asset_number']}"
try:
placeholder['sheet'] = mp_info['sheet']
except KeyError:
placeholder['sheet'] = "Equipment"
yield placeholder
def write_equipment(self) -> Workbook:
"""
Performs write operations
Returns:
Workbook: Workbook with equipment written
"""
for equipment in self.equipment:
if not equipment['sheet'] in self.xl.sheetnames:
self.xl.create_sheet("Equipment")
sheet = self.xl[equipment['sheet']]
for k, v in equipment.items():
if not isinstance(v, dict):
continue
if isinstance(v['value'], list):
v['value'] = v['value'][0]
try:
sheet.cell(row=v['row'], column=v['column'], value=v['value'])
except AttributeError as e:
logger.error(f"Couldn't write to {equipment['sheet']}, row: {v['row']}, column: {v['column']}")
logger.error(e)
return self.xl
class TipWriter(object):
"""
object to write tips data into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, tips_list: list):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
tips_list (list): List of tip dictionaries to write to the excel file.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
self.submission_type = submission_type
self.xl = xl
tips_map = {k: v for k, v in self.submission_type.construct_field_map("tip")}
self.tips = self.reconcile_map(tips_list=tips_list, tips_map=tips_map)
def reconcile_map(self, tips_list: List[dict], tips_map: dict) -> Generator[dict, None, None]:
"""
Merges tips with location data
Args:
tips_list (List[dict]): List of tips
tips_map (dict): Tips locations
Returns:
List[dict]: List of merged dictionaries
"""
if tips_list is None:
return
for ii, tips in enumerate(tips_list, start=1):
mp_info = tips_map[tips.role]
placeholder = {}
if mp_info == {}:
for jj, (k, v) in enumerate(tips.__dict__.items(), start=1):
dicto = dict(value=v, row=ii, column=jj)
placeholder[k] = dicto
else:
for jj, (k, v) in enumerate(tips.__dict__.items(), start=1):
try:
dicto = dict(value=v, row=mp_info[k]['row'], column=mp_info[k]['column'])
except KeyError as e:
continue
placeholder[k] = dicto
try:
placeholder['sheet'] = mp_info['sheet']
except KeyError:
placeholder['sheet'] = "Tips"
yield placeholder
def write_tips(self) -> Workbook:
"""
Performs write operations
Returns:
Workbook: Workbook with tips written
"""
for tips in self.tips:
if not tips['sheet'] in self.xl.sheetnames:
self.xl.create_sheet("Tips")
sheet = self.xl[tips['sheet']]
for k, v in tips.items():
if not isinstance(v, dict):
continue
if isinstance(v['value'], list):
v['value'] = v['value'][0]
try:
sheet.cell(row=v['row'], column=v['column'], value=v['value'])
except AttributeError as e:
logger.error(f"Couldn't write to {tips['sheet']}, row: {v['row']}, column: {v['column']}")
logger.error(e)
return self.xl

View File

@@ -47,7 +47,7 @@ class ClientSubmissionNamer(DefaultNamer):
sub_type = self.get_subtype_from_regex()
if not sub_type:
logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.")
sub_type = SubmissionType.query(name="Test")
sub_type = SubmissionType.query(name="Default")
logger.debug(f"Submission Type: {sub_type}")
sys.exit()
return sub_type

View File

@@ -250,7 +250,7 @@ class PydReagent(PydBaseClass):
report = Report()
if self.model_extra is not None:
self.__dict__.update(self.model_extra)
reagent, new = Reagent.query_or_create(lot=self.lot, name=self.name)
reagent, new = ReagentLot.query_or_create(lot=self.lot, name=self.name)
if new:
reagentrole = ReagentRole.query(name=self.reagentrole)
reagent.reagentrole = reagentrole
@@ -374,7 +374,7 @@ class PydEquipment(PydBaseClass):
name: str
nickname: str | None
# process: List[dict] | None
process: PydProcess | None
process: List[PydProcess] | PydProcess | None
equipmentrole: str | PydEquipmentRole | None
tips: List[PydTips] | PydTips | None = Field(default=[])
@@ -402,7 +402,7 @@ class PydEquipment(PydBaseClass):
value = convert_nans_to_nones(value)
if not value:
value = ['']
logger.debug(value)
# logger.debug(value)
try:
# value = [item.strip() for item in value]
value = next((PydProcess(**process.details_dict()) for process in value))
@@ -435,7 +435,7 @@ class PydEquipment(PydBaseClass):
return value
@report_result
def to_sql(self, procedure: Procedure | str = None, kittype: KitType | str = None) -> Tuple[
def to_sql(self, procedure: Procedure | str = None, proceduretype: ProcedureType | str = None) -> Tuple[
Equipment, ProcedureEquipmentAssociation]:
"""
Creates Equipment and SubmssionEquipmentAssociations for this PydEquipment
@@ -449,8 +449,8 @@ class PydEquipment(PydBaseClass):
report = Report()
if isinstance(procedure, str):
procedure = Procedure.query(name=procedure)
if isinstance(kittype, str):
kittype = KitType.query(name=kittype)
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
logger.debug(f"Querying equipment: {self.asset_number}")
equipment = Equipment.query(asset_number=self.asset_number)
if equipment is None:
@@ -470,8 +470,7 @@ class PydEquipment(PydBaseClass):
# NOTE: It looks like the way fetching the process is done in the SQL model, this shouldn't be a problem, but I'll include a failsafe.
# NOTE: I need to find a way to filter this by the kittype involved.
if len(self.processes) > 1:
process = Process.query(proceduretype=procedure.get_submission_type(), kittype=kittype,
equipmentrole=self.role)
process = Process.query(proceduretype=procedure.get_submission_type(), equipmentrole=self.role)
else:
process = Process.query(name=self.processes[0])
if process is None:
@@ -879,15 +878,15 @@ class PydRun(PydBaseClass): #, extra='allow'):
pass
return SubmissionFormWidget(parent=parent, pyd=self, disable=disable)
def to_writer(self) -> "SheetWriter":
"""
Sends data here to the sheet writer.
Returns:
SheetWriter: Sheetwriter object that will perform writing.
"""
from backend.excel.writer import SheetWriter
return SheetWriter(self)
# def to_writer(self) -> "SheetWriter":
# """
# Sends data here to the sheet writer.
#
# Returns:
# SheetWriter: Sheetwriter object that will perform writing.
# """
# from backend.excel.writer import SheetWriter
# return SheetWriter(self)
def construct_filename(self) -> str:
"""
@@ -1166,10 +1165,10 @@ class PydProcess(PydBaseClass, extra="allow"):
proceduretype: List[str]
equipment: List[str]
equipmentrole: List[str]
kittype: List[str]
# kittype: List[str]
tiprole: List[str]
@field_validator("proceduretype", "equipment", "equipmentrole", "kittype", "tiprole", mode="before")
@field_validator("proceduretype", "equipment", "equipmentrole", "tiprole", mode="before")
@classmethod
def enforce_list(cls, value):
if not isinstance(value, list):
@@ -1249,8 +1248,8 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
technician: dict = Field(default=dict(value="NA", missing=True))
repeat: bool = Field(default=False)
repeat_of: str | None = Field(default=None)
kittype: dict = Field(default=dict(value="NA", missing=True))
possible_kits: list | None = Field(default=[], validate_default=True)
# kittype: dict = Field(default=dict(value="NA", missing=True))
# possible_kits: list | None = Field(default=[], validate_default=True)
plate_map: str | None = Field(default=None)
reagent: list | None = Field(default=[])
reagentrole: dict | None = Field(default={}, validate_default=True)
@@ -1258,7 +1257,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
equipment: List[PydEquipment] = Field(default=[])
result: List[PydResults] | List[dict] = Field(default=[])
@field_validator("name", "technician", "kittype", mode="before")
@field_validator("name", "technician", mode="before")#"kittype", mode="before")
@classmethod
def convert_to_dict(cls, value):
if not value:
@@ -1295,18 +1294,18 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
value['missing'] = True
return value
@field_validator("possible_kits")
@classmethod
def rescue_possible_kits(cls, value, values):
if not value:
try:
if values.data['proceduretype']:
value = [kittype.__dict__['name'] for kittype in values.data['proceduretype'].kittype]
except KeyError:
pass
return value
# @field_validator("possible_kits")
# @classmethod
# def rescue_possible_kits(cls, value, values):
# if not value:
# try:
# if values.data['proceduretype']:
# value = [kittype.__dict__['name'] for kittype in values.data['proceduretype'].kittype]
# except KeyError:
# pass
# return value
@field_validator("name", "technician", "kittype")
@field_validator("name", "technician")#, "kittype")
@classmethod
def set_colour(cls, value):
try:
@@ -1321,20 +1320,26 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
@field_validator("reagentrole")
@classmethod
def rescue_reagentrole(cls, value, values):
# if not value:
# match values.data['kittype']:
# case dict():
# if "value" in values.data['kittype'].keys():
# roi = values.data['kittype']['value']
# elif "name" in values.data['kittype'].keys():
# roi = values.data['kittype']['name']
# else:
# raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}")
# case str():
# roi = values.data['kittype']
# if roi != cls.model_fields['kittype'].default['value']:
# kittype = KitType.query(name=roi)
# value = {item.name: item.reagent for item in kittype.reagentrole}
if not value:
match values.data['kittype']:
case dict():
if "value" in values.data['kittype'].keys():
roi = values.data['kittype']['value']
elif "name" in values.data['kittype'].keys():
roi = values.data['kittype']['name']
else:
raise KeyError(f"Couldn't find kittype name in the dictionary: {values.data['kittype']}")
case str():
roi = values.data['kittype']
if roi != cls.model_fields['kittype'].default['value']:
kittype = KitType.query(name=roi)
value = {item.name: item.reagent for item in kittype.reagentrole}
value = {}
for reagentrole in values.data['proceduretype'].reagentrole:
reagents = [reagent.lot_dicts for reagent in reagentrole.reagent]
value[reagentrole.name] = flatten_list(reagents)
# value = {item.name: item.reagent for item in values.data['proceduretype'].reagentrole}
return value
@field_validator("run")
@@ -1416,12 +1421,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
self.possible_kits.insert(0, self.possible_kits.pop(self.possible_kits.index(kittype)))
def update_samples(self, sample_list: List[dict]):
logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}")
# logger.debug(f"Incoming sample_list:\n{pformat(sample_list)}")
for iii, sample_dict in enumerate(sample_list, start=1):
if sample_dict['sample_id'].startswith("blank_"):
sample_dict['sample_id'] = ""
row, column = self.proceduretype.ranked_plate[sample_dict['index']]
logger.debug(f"Row: {row}, Column: {column}")
# logger.debug(f"Row: {row}, Column: {column}")
try:
sample = next(
(item for item in self.sample if item.sample_id.upper() == sample_dict['sample_id'].upper()))
@@ -1440,7 +1445,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sample.row = row
sample.column = column
sample.procedure_rank = sample_dict['index']
logger.debug(f"Sample of interest: {sample.improved_dict()}")
# logger.debug(f"Sample of interest: {sample.improved_dict()}")
# logger.debug(f"Updated samples:\n{pformat(self.sample)}")
def update_reagents(self, reagentrole: str, name: str, lot: str, expiry: str):
@@ -1456,7 +1461,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
idx = 0
insertable = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
self.reagent.insert(idx, insertable)
logger.debug(self.reagent)
# logger.debug(self.reagent)
@classmethod
def update_new_reagents(cls, reagent: PydReagent):
@@ -1492,24 +1497,24 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
if self.proceduretype:
sql.proceduretype = self.proceduretype
# Note: convert any new reagents to sql and save
for reagentrole, reagents in self.reagentrole.items():
for reagent in reagents:
if not reagent.lot or reagent.name == "--New--":
continue
self.update_new_reagents(reagent)
# for reagentrole, reagents in self.reagentrole.items():
for reagent in self.reagent:
if not reagent.lot or reagent.name == "--New--":
continue
self.update_new_reagents(reagent)
# NOTE: reset reagent associations.
sql.procedurereagentassociation = []
for reagent in self.reagent:
if isinstance(reagent, dict):
reagent = PydReagent(**reagent)
# logger.debug(reagent)
logger.debug(reagent)
reagentrole = reagent.reagentrole
reagent = reagent.to_sql()
# logger.debug(reagentrole)
if reagent not in sql.reagent:
if reagent not in sql.reagentlot:
# NOTE: Remove any previous association for this role.
if sql.id:
removable = ProcedureReagentAssociation.query(procedure=sql, reagentrole=reagentrole)
removable = ProcedureReagentLotAssociation.query(procedure=sql, reagentrole=reagentrole)
else:
removable = []
logger.debug(f"Removable: {removable}")
@@ -1520,7 +1525,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
else:
removable.delete()
# logger.debug(f"Adding {reagent} to {sql}")
reagent_assoc = ProcedureReagentAssociation(reagent=reagent, procedure=sql, reagentrole=reagentrole)
reagent_assoc = ProcedureReagentLotAssociation(reagentlot=reagent, procedure=sql, reagentrole=reagentrole)
try:
start_index = max([item.id for item in ProcedureSampleAssociation.query()]) + 1
except ValueError:
@@ -1543,10 +1548,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
row=sample.row, column=sample.column,
procedure_rank=sample.procedure_rank)
if self.kittype['value'] not in ["NA", None, ""]:
kittype = KitType.query(name=self.kittype['value'], limit=1)
if kittype:
sql.kittype = kittype
for equipment in self.equipment:
equip = Equipment.query(name=equipment.name)
if equip not in sql.equipment:
@@ -1729,7 +1730,7 @@ class PydClientSubmission(PydBaseClass):
case SubmissionType():
pass
case _:
sql.submissiontype = SubmissionType.query(name="Test")
sql.submissiontype = SubmissionType.query(name="Default")
for k in list(self.model_fields.keys()) + list(self.model_extra.keys()):
logger.debug(f"Running {k}")
attribute = getattr(self, k)

View File

@@ -13,7 +13,7 @@ from PyQt6.QtGui import QAction
from pathlib import Path
from markdown import markdown
from pandas import ExcelWriter
from backend.db.models import Reagent, KitType
from backend.db.models import Reagent
from tools import (
check_if_app, Settings, Report, jinja_template_loading, check_authorization, page_size, is_power_user,
under_development

View File

@@ -6,7 +6,7 @@ from PyQt6.QtWidgets import (
QWidget, QComboBox, QPushButton
)
from PyQt6.QtCore import QSignalBlocker
from backend import ChartReportMaker
from backend.excel.reports import ChartReportMaker
from backend.db import ControlType
import logging
from tools import Report, report_result

View File

@@ -2,7 +2,7 @@ from PyQt6.QtWidgets import (
QVBoxLayout, QDialog, QDialogButtonBox
)
from .misc import CheckableComboBox, StartEndDatePicker
from backend.db.models.kits import SubmissionType
from backend.db.models.procedures import SubmissionType
class DateTypePicker(QDialog):

View File

@@ -21,7 +21,7 @@ class EquipmentUsage(QDialog):
self.procedure = procedure
self.setWindowTitle(f"Equipment Checklist - {procedure.name}")
self.used_equipment = self.procedure.equipment
self.kit = self.procedure.kittype
# self.kit = self.procedure.kittype
self.opt_equipment = procedure.proceduretype.get_equipment()
self.layout = QVBoxLayout()
self.setLayout(self.layout)

View File

@@ -57,7 +57,7 @@ class EquipmentUsage(QDialog):
proceduretype = procedure.proceduretype
proceduretype_dict = proceduretype.details_dict()
run = procedure.run
proceduretype_dict['equipment_json'] = flatten_list([item['equipment_json'] for item in proceduretype_dict['equipment']])
# proceduretype_dict['equipment_json'] = flatten_list([item['equipment_json'] for item in proceduretype_dict['equipment']])
# proceduretype_dict['equipment_json'] = [
# {'name': 'Liquid Handler', 'equipment': [
# {'name': 'Other', 'asset_number': 'XXX', 'processes': [

View File

@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, List
if TYPE_CHECKING:
from backend.db.models import Run, Procedure
from backend.validators import PydProcedure
from tools import jinja_template_loading, get_application_from_parent, render_details_template
from tools import jinja_template_loading, get_application_from_parent, render_details_template, sanitize_object_for_json
logger = logging.getLogger(f"submissions.{__name__}")
@@ -32,10 +32,11 @@ class ProcedureCreation(QDialog):
self.edit = edit
self.run = procedure.run
self.procedure = procedure
logger.debug(f"procedure: {pformat(self.procedure.__dict__)}")
self.proceduretype = procedure.proceduretype
self.setWindowTitle(f"New {self.proceduretype.name} for {self.run.rsl_plate_number}")
# self.created_procedure = self.proceduretype.construct_dummy_procedure(run=self.run)
self.procedure.update_kittype_reagentroles(kittype=self.procedure.possible_kits[0])
# self.procedure.update_kittype_reagentroles(kittype=self.procedure.possible_kits[0])
# self.created_procedure.samples = self.run.constuct_sample_dicts_for_proceduretype(proceduretype=self.proceduretype)
# logger.debug(f"Samples to map\n{pformat(self.created_procedure.samples)}")
@@ -70,8 +71,13 @@ class ProcedureCreation(QDialog):
def set_html(self):
from .equipment_usage_2 import EquipmentUsage
logger.debug(f"Edit: {self.edit}")
# logger.debug(f"Edit: {self.edit}")
proceduretype_dict = self.proceduretype.details_dict()
logger.debug(f"Reagent roles: {self.procedure.reagentrole}")
logger.debug(f"Equipment roles: {pformat(proceduretype_dict['equipment'])}")
# NOTE: Add --New-- as an option for reagents.
for key, value in self.procedure.reagentrole.items():
value.append(dict(name="--New--"))
if self.procedure.equipment:
for equipmentrole in proceduretype_dict['equipment']:
# NOTE: Check if procedure equipment is present and move to head of the list if so.
@@ -85,6 +91,8 @@ class ProcedureCreation(QDialog):
equipmentrole['equipment'].insert(0, equipmentrole['equipment'].pop(
equipmentrole['equipment'].index(item_in_er_list)))
proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True)
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
self.update_equipment = EquipmentUsage.update_equipment
regex = re.compile(r".*R\d$")
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
@@ -94,12 +102,13 @@ class ProcedureCreation(QDialog):
js_in=["procedure_form", "grid_drag", "context_menu"],
proceduretype=proceduretype_dict,
run=self.run.details_dict(),
procedure=self.procedure.__dict__,
# procedure=self.procedure.__dict__,
procedure=self.procedure,
plate_map=self.plate_map,
edit=self.edit
)
# with open("procedure_creation_rendered.html", "w") as f:
# f.write(html)
with open("procedure_creation_rendered.html", "w") as f:
f.write(html)
self.webview.setHtml(html)
@pyqtSlot(str, str, str, str)
@@ -119,11 +128,13 @@ class ProcedureCreation(QDialog):
eoi.name = equipment.name
eoi.asset_number = equipment.asset_number
eoi.nickname = equipment.nickname
process = next((prcss for prcss in equipment.process if prcss.name == process))
eoi.process = process.to_pydantic()
tips = next((tps for tps in equipment.tips if tps.name == tips))
eoi.tips = tips.to_pydantic()
self.procedure.equipment.append(eoi)
process = next((prcss for prcss in equipment.process if prcss.name == process), None)
if process:
eoi.process = process.to_pydantic()
tips = next((tps for tps in equipment.tips if tps.name == tips), None)
if tips:
eoi.tips = tips.to_pydantic()
self.procedure.equipment.append(eoi)
logger.debug(f"Updated equipment: {self.procedure.equipment}")
@pyqtSlot(str, str)

View File

@@ -7,7 +7,7 @@ from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtCore import Qt, pyqtSlot
from jinja2 import TemplateNotFound
from backend.db.models import Run, Sample, Reagent, KitType, Equipment, Process, Tips
from backend.db.models import Run, Sample, Reagent, ProcedureType, Equipment, Process, Tips
from tools import is_power_user, jinja_template_loading, timezone, get_application_from_parent, list_str_comparator
from .functions import select_save_file, save_pdf
from pathlib import Path
@@ -161,7 +161,7 @@ class SubmissionDetails(QDialog):
self.setWindowTitle(f"Sample Details - {sample.sample_id}")
@pyqtSlot(str, str)
def reagent_details(self, reagent: str | Reagent, kit: str | KitType):
def reagent_details(self, reagent: str | Reagent, proceduretype: str | ProcedureType):
"""
Changes details view to summary of Reagent
@@ -172,9 +172,9 @@ class SubmissionDetails(QDialog):
logger.debug(f"Reagent details.")
if isinstance(reagent, str):
reagent = Reagent.query(lot=reagent)
if isinstance(kit, str):
self.kit = KitType.query(name=kit)
base_dict = reagent.to_sub_dict(kittype=self.kit, full_data=True)
if isinstance(proceduretype, str):
self.proceduretype = ProcedureType.query(name=proceduretype)
base_dict = reagent.to_sub_dict(proceduretype=self.proceduretype, full_data=True)
env = jinja_template_loading()
temp_name = "reagent_details.html"
try:

View File

@@ -15,7 +15,7 @@ from tools import Report, Result, check_not_nan, main_form_style, report_result,
from backend.validators import PydReagent, PydClientSubmission, PydSample
from backend.db import (
ClientLab, SubmissionType, Reagent,
ReagentRole, KitTypeReagentRoleAssociation, Run, ClientSubmission
ReagentRole, ProcedureTypeReagentRoleAssociation, Run, ClientSubmission
)
from pprint import pformat
from .pop_ups import QuestionAsker, AlertPop
@@ -760,8 +760,8 @@ class SubmissionFormWidget(QWidget):
def __init__(self, scrollWidget, reagent, extraction_kit: str) -> None:
super().__init__(scrollWidget=scrollWidget)
self.setEditable(True)
looked_up_rt = KitTypeReagentRoleAssociation.query(reagentrole=reagent.equipmentrole,
kittype=extraction_kit)
looked_up_rt = ProcedureTypeReagentRoleAssociation.query(reagentrole=reagent.reagentrole,
proceduretype=extraction_kit)
relevant_reagents = [str(item.lot) for item in looked_up_rt.get_all_relevant_reagents()]
# NOTE: if reagent in sheet is not found insert it into the front of relevant reagents so it shows
if str(reagent.lot) not in relevant_reagents:

View File

@@ -4,7 +4,7 @@ Pane to hold information e.g. cost summary.
from .info_tab import InfoPane
from PyQt6.QtWidgets import QWidget, QLabel, QPushButton
from backend.db import ClientLab
from backend.excel import ReportMaker
from backend.excel.reports import ReportMaker
from .misc import CheckableComboBox
import logging

View File

@@ -1,9 +1,7 @@
document.getElementById("kittype").addEventListener("change", function() {
backend.update_kit(this.value);
})
var formchecks = document.getElementsByClassName('form_check');
@@ -46,7 +44,7 @@ var reagentRoles = document.getElementsByClassName("reagentrole");
for(let i = 0; i < reagentRoles.length; i++) {
reagentRoles[i].addEventListener("change", function() {
if (reagentRoles[i].value.includes("--New--")) {
alert("Create new reagent.")
// alert("Create new reagent.")
var br = document.createElement("br");
var new_reg = document.getElementById("new_" + reagentRoles[i].id);
var new_form = document.createElement("form");

View File

@@ -29,12 +29,7 @@
<option value="{{ previous }}">{{ previous }}</option>
{% endfor %}
</select><br><br>
<label>Kit Type:</label><br>
<select class="dropdown" id="kittype" background-colour="{{ procedure['kittype']['colour'] }}">
{% for kittype in procedure['possible_kits'] %}
<option value="{{ kittype }}">{{ kittype }}</option>
{% endfor %}
</select><br>
{% if procedure['reagentrole'] %}
<br>
<h1><u>Reagents</u></h1>
@@ -47,6 +42,7 @@
{% endfor %}
</select>
<div class="new_reagent" id="new_{{ key }}"></div>
<br>
{% endfor %}
{% endif %}
</form>
@@ -58,9 +54,10 @@
{% endif %}
</div>
</div>
{% if proceduretype['equipment_section'] %}
{{ proceduretype['equipment_section'] }}
{% endif %}
{% with equipmentroles=proceduretype['equipment'], child=True %}
{% include 'support/equipment_usage.html' %}
{% endwith %}
{% include 'support/context_menu.html' %}
{% endblock %}

View File

@@ -19,7 +19,7 @@
<h1><u>Equipment</u></h1>
<br><hr><br>
{% for equipmentrole in proceduretype['equipment_json'] %}
{% for equipmentrole in equipmentroles %}
<div class="grid-container">
<div>
<label for="{{ equipmentrole['name'] }}">{{ equipmentrole['name'] }}:</label><br>
@@ -38,6 +38,7 @@
</div>
</div>
<br>
{% endfor %}
{% if not child %}
@@ -48,7 +49,7 @@
{% endfor %}
{% endif %}
<script>
const equipment_json = {{ proceduretype['equipment_json'] }};
const equipment_json = {{ equipmentroles }};
window.addEventListener('load', function () {
equipment_json.forEach(startup);
@@ -98,10 +99,10 @@
dropdown_oi.json = equipmentrole;
var equipment_name = document.getElementById(equipmentrole.name).value;
var equipment = equipmentrole.equipment.filter(function(x){ return x.name == equipment_name })[0];
for (let iii = 0; iii < equipment.processes.length; iii++) {
for (let iii = 0; iii < equipment.process.length; iii++) {
var opt = document.createElement('option');
opt.value = equipment.processes[iii].name;
opt.innerHTML = equipment.processes[iii].name;
opt.value = equipment.process[iii].name;
opt.innerHTML = equipment.process[iii].name;
dropdown_oi.appendChild(opt);
}
updateTipChoices(equipmentrole);
@@ -117,7 +118,7 @@
console.log(process_name);
var equipment = equipmentrole.equipment.filter(function(x){ return x.name == equipment_name })[0];
console.log(equipment);
var process = equipment.processes.filter(function(x){ return x.name == process_name })[0];
var process = equipment.process.filter(function(x){ return x.name == process_name })[0];
console.log(process);
for (let iii = 0; iii < process.tips.length; iii++) {
var opt = document.createElement('option');

View File

@@ -9,6 +9,8 @@ from datetime import date, datetime, timedelta
from json import JSONDecodeError
from threading import Thread
from inspect import getmembers, isfunction, stack
from types import NoneType
from dateutil.easter import easter
from dateutil.parser import parse
from jinja2 import Environment, FileSystemLoader
@@ -58,6 +60,7 @@ main_form_style = '''
page_size = 250
# micro_char = uni_char = "\u03BC"
def divide_chunks(input_list: list, chunk_count: int) -> Generator[Any, Any, None]:
"""
@@ -464,7 +467,7 @@ def render_details_template(template_name:str, css_in:List[str]|str=[], js_in:Li
for js in js_in:
with open(js, "r") as f:
js_out.append(f.read())
logger.debug(f"Kwargs: {kwargs}")
# logger.debug(f"Kwargs: {kwargs}")
return template.render(css=css_out, js=js_out, **kwargs)
@@ -1019,6 +1022,35 @@ def flatten_list(input_list: list):
return list(itertools.chain.from_iterable(input_list))
def sanitize_object_for_json(input_dict: dict) -> dict:
if not isinstance(input_dict, dict):
match input_dict:
case int() | float() | bool():
pass
case _:
try:
js = json.dumps(input_dict)
except TypeError:
input_dict = str(input_dict)
return input_dict
# return input_dict
output = {}
for key, value in input_dict.items():
match value:
case list():
value = [sanitize_object_for_json(object) for object in value]
case dict():
value = sanitize_object_for_json(value)
case _:
try:
js = json.dumps(value)
except TypeError:
value = str(value)
output[key] = value
return output
def create_plate_grid(rows: int, columns: int):
matrix = np.array([[0 for yyy in range(1, columns + 1)] for xxx in range(1, rows + 1)])
return {iii: (item[0][1]+1, item[0][0]+1) for iii, item in enumerate(np.ndenumerate(matrix), start=1)}