Pre code cleanup
This commit is contained in:
@@ -14,7 +14,8 @@ from sqlalchemy.orm import relationship, validates, Query
|
||||
from sqlalchemy.ext.associationproxy import association_proxy
|
||||
from datetime import date
|
||||
import logging, re
|
||||
from tools import check_authorization, setup_lookup, Report, Result, jinja_template_loading, check_regex_match
|
||||
from tools import check_authorization, setup_lookup, Report, Result, jinja_template_loading, check_regex_match, \
|
||||
yaml_regex_creator
|
||||
from typing import List, Literal, Generator, Any
|
||||
from pandas import ExcelFile
|
||||
from pathlib import Path
|
||||
@@ -422,7 +423,7 @@ class Reagent(BaseClass):
|
||||
else:
|
||||
return f"<Reagent({self.role.name}-{self.lot})>"
|
||||
|
||||
def to_sub_dict(self, extraction_kit: KitType = None, full_data:bool=False) -> dict:
|
||||
def to_sub_dict(self, extraction_kit: KitType = None, full_data: bool = False) -> dict:
|
||||
"""
|
||||
dictionary containing values necessary for gui
|
||||
|
||||
@@ -873,17 +874,16 @@ class SubmissionType(BaseClass):
|
||||
|
||||
@classmethod
|
||||
@check_authorization
|
||||
def import_from_json(cls, filepath:Path|str):
|
||||
def import_from_json(cls, filepath: Path | str):
|
||||
yaml.add_constructor("!regex", yaml_regex_creator)
|
||||
if isinstance(filepath, str):
|
||||
filepath = Path(filepath)
|
||||
if not filepath.exists():
|
||||
logging.critical(f"Given file could not be found.")
|
||||
return None
|
||||
|
||||
with open(filepath, "r") as f:
|
||||
if filepath.suffix == ".json":
|
||||
import_dict = json.load(fp=f)
|
||||
elif filepath.suffix == ".yml":
|
||||
import_dict = yaml.safe_load(stream=f)
|
||||
import_dict = yaml.load(stream=f, Loader=yaml.Loader)
|
||||
else:
|
||||
raise Exception(f"Filetype {filepath.suffix} not supported.")
|
||||
logger.debug(pformat(import_dict))
|
||||
@@ -901,8 +901,14 @@ class SubmissionType(BaseClass):
|
||||
new_kit = KitType(name=kit['kit_type']['name'])
|
||||
for role in kit['kit_type']['reagent roles']:
|
||||
new_role = ReagentRole.query(name=role['role'])
|
||||
eol = datetime.timedelta(role['extension_of_life'])
|
||||
if new_role:
|
||||
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
|
||||
if check.lower() == "n":
|
||||
new_role = None
|
||||
else:
|
||||
pass
|
||||
if not new_role:
|
||||
eol = datetime.timedelta(role['extension_of_life'])
|
||||
new_role = ReagentRole(name=role['role'], eol_ext=eol)
|
||||
uses = dict(expiry=role['expiry'], lot=role['lot'], name=role['name'], sheet=role['sheet'])
|
||||
ktrr_assoc = KitTypeReagentRoleAssociation(kit_type=new_kit, reagent_role=new_role, uses=uses)
|
||||
@@ -917,9 +923,18 @@ class SubmissionType(BaseClass):
|
||||
)
|
||||
for role in kit['kit_type']['equipment roles']:
|
||||
new_role = EquipmentRole.query(name=role['role'])
|
||||
if new_role:
|
||||
check = input(f"Found existing role: {new_role.name}. Use this? [Y/n]: ")
|
||||
if check.lower() == "n":
|
||||
new_role = None
|
||||
else:
|
||||
pass
|
||||
if not new_role:
|
||||
new_role = EquipmentRole(name=role['role'])
|
||||
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type, equipment_role=new_role)
|
||||
for equipment in Equipment.assign_equipment(equipment_role=new_role):
|
||||
new_role.instances.append(equipment)
|
||||
ster_assoc = SubmissionTypeEquipmentRoleAssociation(submission_type=submission_type,
|
||||
equipment_role=new_role)
|
||||
try:
|
||||
uses = dict(name=role['name'], process=role['process'], sheet=role['sheet'], static=role['static'])
|
||||
except KeyError:
|
||||
@@ -1160,6 +1175,7 @@ class KitTypeReagentRoleAssociation(BaseClass):
|
||||
for rel_reagent in relevant_reagents:
|
||||
yield rel_reagent
|
||||
|
||||
|
||||
class SubmissionReagentAssociation(BaseClass):
|
||||
"""
|
||||
table containing submission/reagent associations
|
||||
@@ -1400,6 +1416,22 @@ class Equipment(BaseClass):
|
||||
(?P<Labcon>\d{4}-\d{3}-\d{3}-\d$)""",
|
||||
re.VERBOSE)
|
||||
|
||||
@classmethod
|
||||
def assign_equipment(cls, equipment_role: EquipmentRole|str) -> List[Equipment]:
|
||||
if isinstance(equipment_role, str):
|
||||
equipment_role = EquipmentRole.query(name=equipment_role)
|
||||
equipment = cls.query()
|
||||
options = "\n".join([f"{ii}. {item.name}" for ii, item in enumerate(equipment)])
|
||||
choices = input(f"Enter equipment numbers to add to {equipment_role.name} (space seperated):\n{options}\n\n")
|
||||
output = []
|
||||
for choice in choices.split(" "):
|
||||
try:
|
||||
choice = int(choice)
|
||||
except (AttributeError, ValueError):
|
||||
continue
|
||||
output.append(equipment[choice])
|
||||
return output
|
||||
|
||||
|
||||
class EquipmentRole(BaseClass):
|
||||
"""
|
||||
|
||||
@@ -167,10 +167,10 @@ class BasicSubmission(BaseClass):
|
||||
dicto = dict(
|
||||
details_ignore=['excluded', 'reagents', 'samples',
|
||||
'extraction_info', 'comment', 'barcode',
|
||||
'platemap', 'export_map', 'equipment', 'tips'],
|
||||
'platemap', 'export_map', 'equipment', 'tips', 'custom'],
|
||||
# NOTE: Fields not placed in ui form
|
||||
form_ignore=['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by', 'comment', 'namer',
|
||||
'submission_object', "tips", 'contact_phone'] + recover,
|
||||
'submission_object', "tips", 'contact_phone', 'custom'] + recover,
|
||||
# NOTE: Fields not placed in ui form to be moved to pydantic
|
||||
form_recover=recover
|
||||
)
|
||||
@@ -347,12 +347,14 @@ class BasicSubmission(BaseClass):
|
||||
logger.error(f"Error setting tips: {e}")
|
||||
tips = None
|
||||
cost_centre = self.cost_centre
|
||||
custom = self.custom
|
||||
else:
|
||||
reagents = None
|
||||
samples = None
|
||||
equipment = None
|
||||
tips = None
|
||||
cost_centre = None
|
||||
custom = None
|
||||
# logger.debug("Getting comments")
|
||||
try:
|
||||
comments = self.comment
|
||||
@@ -381,6 +383,7 @@ class BasicSubmission(BaseClass):
|
||||
# logger.debug(f"Setting contact to: {contact} of type: {type(contact)}")
|
||||
output["contact"] = contact
|
||||
output["contact_phone"] = contact_phone
|
||||
output["custom"] = custom
|
||||
return output
|
||||
|
||||
def calculate_column_count(self) -> int:
|
||||
@@ -549,26 +552,29 @@ class BasicSubmission(BaseClass):
|
||||
case "ctx" | "csv" | "filepath" | "equipment":
|
||||
return
|
||||
case item if item in self.jsons():
|
||||
# logger.debug(f"Setting JSON attribute.")
|
||||
existing = self.__getattribute__(key)
|
||||
if value is None or value in ['', 'null']:
|
||||
logger.error(f"No value given, not setting.")
|
||||
return
|
||||
if existing is None:
|
||||
existing = []
|
||||
if value in existing:
|
||||
logger.warning("Value already exists. Preventing duplicate addition.")
|
||||
return
|
||||
else:
|
||||
if isinstance(value, list):
|
||||
existing += value
|
||||
else:
|
||||
if value is not None:
|
||||
if key == "custom":
|
||||
existing = value
|
||||
match value:
|
||||
case list():
|
||||
# logger.debug(f"Setting JSON attribute.")
|
||||
existing = self.__getattribute__(key)
|
||||
if value is None or value in ['', 'null']:
|
||||
logger.error(f"No value given, not setting.")
|
||||
return
|
||||
if existing is None:
|
||||
existing = []
|
||||
if value in existing:
|
||||
logger.warning("Value already exists. Preventing duplicate addition.")
|
||||
return
|
||||
else:
|
||||
if isinstance(value, list):
|
||||
existing += value
|
||||
else:
|
||||
existing.append(value)
|
||||
|
||||
if value is not None:
|
||||
if key == "custom":
|
||||
existing = value
|
||||
else:
|
||||
existing.append(value)
|
||||
case _:
|
||||
existing = value
|
||||
self.__setattr__(key, existing)
|
||||
flag_modified(self, key)
|
||||
return
|
||||
@@ -749,9 +755,8 @@ class BasicSubmission(BaseClass):
|
||||
# logger.debug(f"Input dict: {input_dict}")
|
||||
# logger.debug(f"Custom fields: {custom_fields}")
|
||||
input_dict['custom'] = {}
|
||||
for k,v in custom_fields.items():
|
||||
logger.debug(f"Attempting custom parse of {k}: {v}")
|
||||
|
||||
for k, v in custom_fields.items():
|
||||
# logger.debug(f"Attempting custom parse of {k}: {v}")
|
||||
match v['type']:
|
||||
case "exempt":
|
||||
continue
|
||||
@@ -766,8 +771,9 @@ class BasicSubmission(BaseClass):
|
||||
if v['start_column'] != v['end_column']:
|
||||
v['end_column'] = v['end_column'] + 1
|
||||
for ii in range(v['start_row'], v['end_row']):
|
||||
for jj in range(v['start_column'], v['end_column']+1):
|
||||
input_dict['custom'][k].append(dict(value=ws.cell(row=ii, column=jj).value, row=ii, column=jj))
|
||||
for jj in range(v['start_column'], v['end_column'] + 1):
|
||||
input_dict['custom'][k].append(
|
||||
dict(value=ws.cell(row=ii, column=jj).value, row=ii, column=jj))
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
@@ -819,7 +825,7 @@ class BasicSubmission(BaseClass):
|
||||
logger.info(f"Hello from {cls.__mapper_args__['polymorphic_identity']} autofill")
|
||||
logger.debug(f"Input dict: {info}")
|
||||
logger.debug(f"Custom fields: {custom_fields}")
|
||||
for k,v in custom_fields.items():
|
||||
for k, v in custom_fields.items():
|
||||
try:
|
||||
assert v['type'] in ['exempt', 'range', 'cell']
|
||||
except (AssertionError, KeyError):
|
||||
@@ -1170,6 +1176,7 @@ class BasicSubmission(BaseClass):
|
||||
if "submitted_date" not in kwargs.keys():
|
||||
instance.submitted_date = date.today()
|
||||
else:
|
||||
logger.warning(f"Found existing instance: {instance}, asking to overwrite.")
|
||||
code = 1
|
||||
msg = "This submission already exists.\nWould you like to overwrite?"
|
||||
report.add_result(Result(msg=msg, code=code))
|
||||
@@ -1659,7 +1666,8 @@ class Wastewater(BasicSubmission):
|
||||
continue
|
||||
copy = dict(submitter_id=sample['submitter_id'], row=row, column=column)
|
||||
well_24.append(copy)
|
||||
input_dict['origin_plate'] = [item for item in DocxWriter.create_plate_map(sample_list=well_24, rows=4, columns=6)]
|
||||
input_dict['origin_plate'] = [item for item in
|
||||
DocxWriter.create_plate_map(sample_list=well_24, rows=4, columns=6)]
|
||||
return input_dict
|
||||
|
||||
|
||||
@@ -1728,14 +1736,22 @@ class WastewaterArtic(BasicSubmission):
|
||||
ws = wb[info_dict['sheet']]
|
||||
img_loader = SheetImageLoader(ws)
|
||||
for ii in range(info_dict['start_row'], info_dict['end_row'] + 1):
|
||||
logger.debug(f"Checking row: {ii}")
|
||||
# logger.debug(f"Checking row: {ii}")
|
||||
for jj in range(info_dict['start_column'], info_dict['end_column'] + 1):
|
||||
# logger.debug(f"Checking column: {jj}")
|
||||
cell_str = f"{row_map[jj]}{ii}"
|
||||
if img_loader.image_in(cell_str):
|
||||
return img_loader.get(cell_str)
|
||||
try:
|
||||
return img_loader.get(cell_str)
|
||||
except ValueError as e:
|
||||
logger.error(f"Could not open image from cell: {cell_str} due to {e}")
|
||||
return None
|
||||
return None
|
||||
|
||||
input_dict = super().custom_info_parser(input_dict)
|
||||
|
||||
input_dict['submission_type'] = dict(value="Wastewater Artic", missing=False)
|
||||
|
||||
logger.debug(f"Custom fields: {custom_fields}")
|
||||
egel_section = custom_fields['egel_controls']
|
||||
ws = xl[egel_section['sheet']]
|
||||
|
||||
Reference in New Issue
Block a user