Mid documentation and code clean-up.
This commit is contained in:
@@ -112,7 +112,7 @@ class BaseClass(Base):
|
||||
@classmethod
|
||||
def execute_query(cls, query: Query = None, model=None, limit: int = 0, **kwargs) -> Any | List[Any]:
|
||||
"""
|
||||
Execute sqlalchemy query.
|
||||
Execute sqlalchemy query with relevant defaults.
|
||||
|
||||
Args:
|
||||
model (Any, optional): model to be queried. Defaults to None
|
||||
@@ -137,6 +137,7 @@ class BaseClass(Base):
|
||||
except (ArgumentError, AttributeError) as e:
|
||||
logger.error(f"Attribute {k} unavailable due to:\n\t{e}\nSkipping.")
|
||||
if k in singles:
|
||||
logger.warning(f"{k} is in singles. Returning only one value.")
|
||||
limit = 1
|
||||
with query.session.no_autoflush:
|
||||
match limit:
|
||||
@@ -165,8 +166,8 @@ class ConfigItem(BaseClass):
|
||||
Key:JSON objects to store config settings in database.
|
||||
"""
|
||||
id = Column(INTEGER, primary_key=True)
|
||||
key = Column(String(32))
|
||||
value = Column(JSON)
|
||||
key = Column(String(32)) #: Name of the configuration item.
|
||||
value = Column(JSON) #: Value associated with the config item.
|
||||
|
||||
def __repr__(self):
|
||||
return f"ConfigItem({self.key} : {self.value})"
|
||||
|
||||
@@ -55,7 +55,7 @@ class ControlType(BaseClass):
|
||||
|
||||
def get_subtypes(self, mode: str) -> List[str]:
|
||||
"""
|
||||
Get subtypes associated with this controltype
|
||||
Get subtypes associated with this controltype (currently used only for Kraken)
|
||||
|
||||
Args:
|
||||
mode (str): analysis mode name
|
||||
@@ -140,15 +140,11 @@ class Control(BaseClass):
|
||||
kraken = {}
|
||||
# logger.debug("calculating kraken count total to use in percentage")
|
||||
kraken_cnt_total = sum([kraken[item]['kraken_count'] for item in kraken])
|
||||
new_kraken = []
|
||||
for item in kraken:
|
||||
# logger.debug("calculating kraken percent (overwrites what's already been scraped)")
|
||||
kraken_percent = kraken[item]['kraken_count'] / kraken_cnt_total
|
||||
new_kraken.append({'name': item, 'kraken_count': kraken[item]['kraken_count'],
|
||||
'kraken_percent': "{0:.0%}".format(kraken_percent)})
|
||||
# logger.debug("Creating new kraken.")
|
||||
new_kraken = [dict(name=item, kraken_count=kraken[item]['kraken_count'], kraken_percent="{0:.0%}".format(kraken[item]['kraken_count'] / kraken_cnt_total)) for item in kraken]
|
||||
new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)
|
||||
# logger.debug("setting targets")
|
||||
if self.controltype.targets == []:
|
||||
if not self.controltype.targets:
|
||||
targets = ["None"]
|
||||
else:
|
||||
targets = self.controltype.targets
|
||||
|
||||
@@ -70,6 +70,7 @@ kittypes_processes = Table(
|
||||
extend_existing=True
|
||||
)
|
||||
|
||||
# logger.debug("Table for TipRole/Tips relations")
|
||||
tiproles_tips = Table(
|
||||
"_tiproles_tips",
|
||||
Base.metadata,
|
||||
@@ -78,6 +79,7 @@ tiproles_tips = Table(
|
||||
extend_existing=True
|
||||
)
|
||||
|
||||
# logger.debug("Table for Process/TipRole relations")
|
||||
process_tiprole = Table(
|
||||
"_process_tiprole",
|
||||
Base.metadata,
|
||||
@@ -86,6 +88,7 @@ process_tiprole = Table(
|
||||
extend_existing=True
|
||||
)
|
||||
|
||||
# logger.debug("Table for Equipment/Tips relations")
|
||||
equipment_tips = Table(
|
||||
"_equipment_tips",
|
||||
Base.metadata,
|
||||
@@ -410,6 +413,7 @@ class Reagent(BaseClass):
|
||||
except (TypeError, AttributeError) as e:
|
||||
place_holder = date.today()
|
||||
logger.error(f"We got a type error setting {self.lot} expiry: {e}. setting to today for testing")
|
||||
# NOTE: The notation for not having an expiry is 1970.1.1
|
||||
if self.expiry.year == 1970:
|
||||
place_holder = "NA"
|
||||
else:
|
||||
@@ -700,7 +704,13 @@ class SubmissionType(BaseClass):
|
||||
output[item.equipment_role.name] = emap
|
||||
return output
|
||||
|
||||
def construct_tips_map(self):
|
||||
def construct_tips_map(self) -> dict:
|
||||
"""
|
||||
Constructs map of tips to excel cells.
|
||||
|
||||
Returns:
|
||||
dict: Tip locations in the excel sheet.
|
||||
"""
|
||||
output = {}
|
||||
for item in self.submissiontype_tiprole_associations:
|
||||
tmap = item.uses
|
||||
@@ -1142,6 +1152,7 @@ class Equipment(BaseClass):
|
||||
processes = [process for process in processes if extraction_kit in process.kit_types]
|
||||
case _:
|
||||
pass
|
||||
# NOTE: Convert to strings
|
||||
processes = [process.name for process in processes]
|
||||
assert all([isinstance(process, str) for process in processes])
|
||||
if len(processes) == 0:
|
||||
@@ -1193,7 +1204,8 @@ class Equipment(BaseClass):
|
||||
return cls.execute_query(query=query, limit=limit)
|
||||
|
||||
def to_pydantic(self, submission_type: SubmissionType,
|
||||
extraction_kit: str | KitType | None = None) -> "PydEquipment":
|
||||
extraction_kit: str | KitType | None = None,
|
||||
role: str = None) -> "PydEquipment":
|
||||
"""
|
||||
Creates PydEquipment of this Equipment
|
||||
|
||||
@@ -1206,7 +1218,7 @@ class Equipment(BaseClass):
|
||||
"""
|
||||
from backend.validators.pydant import PydEquipment
|
||||
return PydEquipment(
|
||||
processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=None,
|
||||
processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=role,
|
||||
**self.to_dict(processes=False))
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -17,7 +17,6 @@ orgs_contacts = Table(
|
||||
Base.metadata,
|
||||
Column("org_id", INTEGER, ForeignKey("_organization.id")),
|
||||
Column("contact_id", INTEGER, ForeignKey("_contact.id")),
|
||||
# __table_args__ = {'extend_existing': True}
|
||||
extend_existing=True
|
||||
)
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ from pathlib import Path
|
||||
from jinja2.exceptions import TemplateNotFound
|
||||
from jinja2 import Template
|
||||
from docxtpl import InlineImage
|
||||
from io import BytesIO
|
||||
from docx.shared import Inches
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
@@ -229,6 +229,15 @@ class BasicSubmission(BaseClass):
|
||||
|
||||
@classmethod
|
||||
def finalize_details(cls, input_dict: dict) -> dict:
|
||||
"""
|
||||
Make final adjustments to the details dictionary before display.
|
||||
|
||||
Args:
|
||||
input_dict (dict): Incoming dictionary.
|
||||
|
||||
Returns:
|
||||
dict: Final details dictionary.
|
||||
"""
|
||||
del input_dict['id']
|
||||
return input_dict
|
||||
|
||||
@@ -610,6 +619,12 @@ class BasicSubmission(BaseClass):
|
||||
|
||||
@classmethod
|
||||
def get_regex(cls) -> str:
|
||||
"""
|
||||
Dummy for inheritence.
|
||||
|
||||
Returns:
|
||||
str: Regex for submission type.
|
||||
"""
|
||||
return cls.construct_regex()
|
||||
|
||||
# Polymorphic functions
|
||||
@@ -733,7 +748,16 @@ class BasicSubmission(BaseClass):
|
||||
|
||||
@classmethod
|
||||
def custom_docx_writer(cls, input_dict:dict, tpl_obj=None):
|
||||
"""
|
||||
Adds custom fields to docx template writer for exported details.
|
||||
|
||||
Args:
|
||||
input_dict (dict): Incoming default dictionary.
|
||||
tpl_obj (_type_, optional): Template object. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with information added.
|
||||
"""
|
||||
return input_dict
|
||||
|
||||
@classmethod
|
||||
@@ -785,6 +809,7 @@ class BasicSubmission(BaseClass):
|
||||
repeat = ""
|
||||
outstr = re.sub(r"(-\dR)\d?", rf"\1 {repeat}", outstr).replace(" ", "")
|
||||
abb = cls.get_default_info('abbreviation')
|
||||
outstr = re.sub(rf"RSL{abb}", rf"RSL-{abb}", outstr)
|
||||
return re.sub(rf"{abb}(\d)", rf"{abb}-\1", outstr)
|
||||
|
||||
@classmethod
|
||||
@@ -924,7 +949,7 @@ class BasicSubmission(BaseClass):
|
||||
if submission_type is not None:
|
||||
model = cls.find_polymorphic_subclass(polymorphic_identity=submission_type)
|
||||
elif len(kwargs) > 0:
|
||||
# find the subclass containing the relevant attributes
|
||||
# NOTE: find the subclass containing the relevant attributes
|
||||
# logger.debug(f"Attributes for search: {kwargs}")
|
||||
model = cls.find_polymorphic_subclass(attrs=kwargs)
|
||||
else:
|
||||
@@ -1241,7 +1266,7 @@ class BacterialCulture(BasicSubmission):
|
||||
plate_map (dict | None, optional): _description_. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: _description_
|
||||
dict: Updated dictionary.
|
||||
"""
|
||||
from . import ControlType
|
||||
input_dict = super().finalize_parse(input_dict, xl, info_map)
|
||||
@@ -1495,7 +1520,17 @@ class Wastewater(BasicSubmission):
|
||||
# self.report.add_result(Result(msg=f"We added PCR info to {sub.rsl_plate_num}.", status='Information'))
|
||||
|
||||
@classmethod
|
||||
def custom_docx_writer(cls, input_dict:dict, tpl_obj=None):
|
||||
def custom_docx_writer(cls, input_dict:dict, tpl_obj=None) -> dict:
|
||||
"""
|
||||
Adds custom fields to docx template writer for exported details. Extends parent.
|
||||
|
||||
Args:
|
||||
input_dict (dict): Incoming default dictionary.
|
||||
tpl_obj (_type_, optional): Template object. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with information added.
|
||||
"""
|
||||
from backend.excel.writer import DocxWriter
|
||||
input_dict = super().custom_docx_writer(input_dict)
|
||||
well_24 = []
|
||||
@@ -1611,6 +1646,7 @@ class WastewaterArtic(BasicSubmission):
|
||||
instr = re.sub(r"^(\d{6})", f"RSL-AR-\\1", instr)
|
||||
# logger.debug(f"name coming out of Artic namer: {instr}")
|
||||
outstr = super().enforce_name(instr=instr, data=data)
|
||||
outstr = outstr.replace("RSLAR", "RSL-AR")
|
||||
return outstr
|
||||
|
||||
@classmethod
|
||||
@@ -1788,7 +1824,6 @@ class WastewaterArtic(BasicSubmission):
|
||||
input_excel = super().custom_info_writer(input_excel, info, backup)
|
||||
# logger.debug(f"Info:\n{pformat(info)}")
|
||||
# NOTE: check for source plate information
|
||||
# check = 'source_plates' in info.keys() and info['source_plates'] is not None
|
||||
if check_key_or_attr(key='source_plates', interest=info, check_none=True):
|
||||
worksheet = input_excel['First Strand List']
|
||||
start_row = 8
|
||||
@@ -1805,15 +1840,11 @@ class WastewaterArtic(BasicSubmission):
|
||||
except TypeError:
|
||||
pass
|
||||
# NOTE: check for gel information
|
||||
# check = 'gel_info' in info.keys() and info['gel_info']['value'] is not None
|
||||
if check_key_or_attr(key='gel_info', interest=info, check_none=True):
|
||||
# logger.debug(f"Gel info check passed.")
|
||||
# if info['gel_info'] is not None:
|
||||
# logger.debug(f"Gel info not none.")
|
||||
# NOTE: print json field gel results to Egel results
|
||||
worksheet = input_excel['Egel results']
|
||||
# TODO: Move all this into a seperate function?
|
||||
#
|
||||
start_row = 21
|
||||
start_column = 15
|
||||
for row, ki in enumerate(info['gel_info']['value'], start=1):
|
||||
@@ -1831,13 +1862,11 @@ class WastewaterArtic(BasicSubmission):
|
||||
worksheet.cell(row=row, column=column, value=kj['value'])
|
||||
except AttributeError:
|
||||
logger.error(f"Failed {kj['name']} with value {kj['value']} to row {row}, column {column}")
|
||||
# check = 'gel_image' in info.keys() and info['gel_image']['value'] is not None
|
||||
if check_key_or_attr(key='gel_image', interest=info, check_none=True):
|
||||
# if info['gel_image'] is not None:
|
||||
if check_key_or_attr(key='gel_image_path', interest=info, check_none=True):
|
||||
worksheet = input_excel['Egel results']
|
||||
# logger.debug(f"We got an image: {info['gel_image']}")
|
||||
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
|
||||
z = zipped.extract(info['gel_image']['value'], Path(TemporaryDirectory().name))
|
||||
z = zipped.extract(info['gel_image_path']['value'], Path(TemporaryDirectory().name))
|
||||
img = OpenpyxlImage(z)
|
||||
img.height = 400 # insert image height in pixels as float or int (e.g. 305.5)
|
||||
img.width = 600
|
||||
@@ -1860,36 +1889,16 @@ class WastewaterArtic(BasicSubmission):
|
||||
base_dict['excluded'] += ['gel_info', 'gel_image', 'headers', "dna_core_submission_number", "source_plates",
|
||||
"gel_controls, gel_image_path"]
|
||||
base_dict['DNA Core ID'] = base_dict['dna_core_submission_number']
|
||||
# check = 'gel_info' in base_dict.keys() and base_dict['gel_info'] is not None
|
||||
if check_key_or_attr(key='gel_info', interest=base_dict, check_none=True):
|
||||
headers = [item['name'] for item in base_dict['gel_info'][0]['values']]
|
||||
base_dict['headers'] = [''] * (4 - len(headers))
|
||||
base_dict['headers'] += headers
|
||||
# logger.debug(f"Gel info: {pformat(base_dict['headers'])}")
|
||||
# check = 'gel_image' in base_dict.keys() and base_dict['gel_image'] is not None
|
||||
if check_key_or_attr(key='gel_image_path', interest=base_dict, check_none=True):
|
||||
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
|
||||
base_dict['gel_image'] = base64.b64encode(zipped.read(base_dict['gel_image_path'])).decode('utf-8')
|
||||
return base_dict, template
|
||||
|
||||
# def adjust_to_dict_samples(self, backup: bool = False) -> List[dict]:
|
||||
# """
|
||||
# Updates sample dictionaries with custom values
|
||||
#
|
||||
# Args:
|
||||
# backup (bool, optional): Whether to perform backup. Defaults to False.
|
||||
#
|
||||
# Returns:
|
||||
# List[dict]: Updated dictionaries
|
||||
# """
|
||||
# # logger.debug(f"Hello from {self.__class__.__name__} dictionary sample adjuster.")
|
||||
# output = []
|
||||
#
|
||||
# for assoc in self.submission_sample_associations:
|
||||
# dicto = assoc.to_sub_dict()
|
||||
# output.append(dicto)
|
||||
# return output
|
||||
|
||||
def custom_context_events(self) -> dict:
|
||||
"""
|
||||
Creates dictionary of str:function to be passed to context menu. Extends parent
|
||||
@@ -1902,6 +1911,13 @@ class WastewaterArtic(BasicSubmission):
|
||||
return events
|
||||
|
||||
def set_attribute(self, key: str, value):
|
||||
"""
|
||||
Performs custom attribute setting based on values. Extends parent
|
||||
|
||||
Args:
|
||||
key (str): name of attribute
|
||||
value (_type_): value of attribute
|
||||
"""
|
||||
super().set_attribute(key=key, value=value)
|
||||
if key == 'gel_info':
|
||||
if len(self.gel_info) > 3:
|
||||
@@ -1938,7 +1954,17 @@ class WastewaterArtic(BasicSubmission):
|
||||
self.save()
|
||||
|
||||
@classmethod
|
||||
def custom_docx_writer(cls, input_dict:dict, tpl_obj=None):
|
||||
def custom_docx_writer(cls, input_dict:dict, tpl_obj=None) -> dict:
|
||||
"""
|
||||
Adds custom fields to docx template writer for exported details.
|
||||
|
||||
Args:
|
||||
input_dict (dict): Incoming default dictionary/
|
||||
tpl_obj (_type_, optional): Template object. Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary with information added.
|
||||
"""
|
||||
input_dict = super().custom_docx_writer(input_dict)
|
||||
if check_key_or_attr(key='gel_image_path', interest=input_dict, check_none=True):
|
||||
with ZipFile(cls.__directory_path__.joinpath("submission_imgs.zip")) as zipped:
|
||||
@@ -1946,12 +1972,11 @@ class WastewaterArtic(BasicSubmission):
|
||||
with tempfile.TemporaryFile(mode="wb", suffix=".jpg", delete=False) as tmp:
|
||||
tmp.write(img)
|
||||
logger.debug(f"Tempfile: {tmp.name}")
|
||||
img = InlineImage(tpl_obj, image_descriptor=tmp.name)#, width=5.5)#, height=400)
|
||||
img = InlineImage(tpl_obj, image_descriptor=tmp.name, width=Inches(5.5))#, width=5.5)#, height=400)
|
||||
input_dict['gel_image'] = img
|
||||
return input_dict
|
||||
|
||||
|
||||
|
||||
# Sample Classes
|
||||
|
||||
class BasicSample(BaseClass):
|
||||
@@ -2009,6 +2034,12 @@ class BasicSample(BaseClass):
|
||||
|
||||
@classmethod
|
||||
def timestamps(cls) -> List[str]:
|
||||
"""
|
||||
Constructs a list of all attributes stored as SQL Timestamps
|
||||
|
||||
Returns:
|
||||
List[str]: Attribute list
|
||||
"""
|
||||
output = [item.name for item in cls.__table__.columns if isinstance(item.type, TIMESTAMP)]
|
||||
if issubclass(cls, BasicSample) and not cls.__name__ == "BasicSample":
|
||||
output += BasicSample.timestamps()
|
||||
@@ -2082,10 +2113,6 @@ class BasicSample(BaseClass):
|
||||
logger.info(f"Recruiting model: {model}")
|
||||
return model
|
||||
|
||||
@classmethod
|
||||
def sql_enforcer(cls, pyd_sample: "PydSample"):
|
||||
return pyd_sample
|
||||
|
||||
@classmethod
|
||||
def parse_sample(cls, input_dict: dict) -> dict:
|
||||
f"""
|
||||
@@ -2194,6 +2221,15 @@ class BasicSample(BaseClass):
|
||||
# limit: int = 0,
|
||||
**kwargs
|
||||
) -> List[BasicSample]:
|
||||
"""
|
||||
Allows for fuzzy search of samples. (Experimental)
|
||||
|
||||
Args:
|
||||
sample_type (str | BasicSample | None, optional): Type of sample. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List[BasicSample]: List of samples that match kwarg search parameters.
|
||||
"""
|
||||
match sample_type:
|
||||
case str():
|
||||
model = cls.find_polymorphic_subclass(polymorphic_identity=sample_type)
|
||||
@@ -2220,26 +2256,20 @@ class BasicSample(BaseClass):
|
||||
return [dict(label="Submitter ID", field="submitter_id")]
|
||||
|
||||
@classmethod
|
||||
def samples_to_df(cls, sample_type: str | None | BasicSample = None, **kwargs):
|
||||
# def samples_to_df(cls, sample_type:str|None|BasicSample=None, searchables:dict={}):
|
||||
logger.debug(f"Checking {sample_type} with type {type(sample_type)}")
|
||||
match sample_type:
|
||||
case str():
|
||||
model = BasicSample.find_polymorphic_subclass(polymorphic_identity=sample_type)
|
||||
case _:
|
||||
try:
|
||||
check = issubclass(sample_type, BasicSample)
|
||||
except TypeError:
|
||||
check = False
|
||||
if check:
|
||||
model = sample_type
|
||||
else:
|
||||
model = cls
|
||||
q_out = model.fuzzy_search(sample_type=sample_type, **kwargs)
|
||||
if not isinstance(q_out, list):
|
||||
q_out = [q_out]
|
||||
def samples_to_df(cls, sample_list:List[BasicSample], **kwargs) -> pd.DataFrame:
|
||||
"""
|
||||
Runs a fuzzy search and converts into a dataframe.
|
||||
|
||||
Args:
|
||||
sample_list (List[BasicSample]): List of samples to be parsed. Defaults to None.
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: Dataframe all samples
|
||||
"""
|
||||
if not isinstance(sample_list, list):
|
||||
sample_list = [sample_list]
|
||||
try:
|
||||
samples = [sample.to_sub_dict() for sample in q_out]
|
||||
samples = [sample.to_sub_dict() for sample in sample_list]
|
||||
except TypeError as e:
|
||||
logger.error(f"Couldn't find any samples with data: {kwargs}\nDue to {e}")
|
||||
return None
|
||||
@@ -2287,6 +2317,12 @@ class WastewaterSample(BasicSample):
|
||||
|
||||
@classmethod
|
||||
def get_default_info(cls, *args):
|
||||
"""
|
||||
Returns default info for a model. Extends BaseClass method.
|
||||
|
||||
Returns:
|
||||
dict | list | str: Output of key:value dict or single (list, str) desired variable
|
||||
"""
|
||||
dicto = super().get_default_info(*args)
|
||||
match dicto:
|
||||
case dict():
|
||||
|
||||
Reference in New Issue
Block a user