Second round of code cleanup.
This commit is contained in:
@@ -28,6 +28,8 @@ class BaseClass(Base):
|
||||
|
||||
__table_args__ = {'extend_existing': True} #: Will only add new columns
|
||||
|
||||
singles = ['id']
|
||||
|
||||
@classmethod
|
||||
@declared_attr
|
||||
def __tablename__(cls) -> str:
|
||||
@@ -92,17 +94,21 @@ class BaseClass(Base):
|
||||
Returns:
|
||||
dict | list | str: Output of key:value dict or single (list, str) desired variable
|
||||
"""
|
||||
dicto = dict(singles=['id'])
|
||||
output = {}
|
||||
for k, v in dicto.items():
|
||||
if len(args) > 0 and k not in args:
|
||||
# logger.debug(f"{k} not selected as being of interest.")
|
||||
continue
|
||||
else:
|
||||
output[k] = v
|
||||
if len(args) == 1:
|
||||
return output[args[0]]
|
||||
return output
|
||||
# if issubclass(cls, BaseClass) and cls.__name__ != "BaseClass":
|
||||
singles = list(set(cls.singles + BaseClass.singles))
|
||||
# else:
|
||||
# singles = cls.singles
|
||||
# output = dict(singles=singles)
|
||||
# output = {}
|
||||
# for k, v in dicto.items():
|
||||
# if len(args) > 0 and k not in args:
|
||||
# # logger.debug(f"{k} not selected as being of interest.")
|
||||
# continue
|
||||
# else:
|
||||
# output[k] = v
|
||||
# if len(args) == 1:
|
||||
# return output[args[0]]
|
||||
return dict(singles=singles)
|
||||
|
||||
@classmethod
|
||||
def query(cls, **kwargs) -> Any | List[Any]:
|
||||
@@ -190,10 +196,15 @@ class ConfigItem(BaseClass):
|
||||
Returns:
|
||||
ConfigItem|List[ConfigItem]: Config item(s)
|
||||
"""
|
||||
config_items = cls.__database_session__.query(cls).all()
|
||||
config_items = [item for item in config_items if item.key in args]
|
||||
if len(args) == 1:
|
||||
config_items = config_items[0]
|
||||
query = cls.__database_session__.query(cls)
|
||||
# config_items = [item for item in config_items if item.key in args]
|
||||
match len(args):
|
||||
case 0:
|
||||
config_items = query.all()
|
||||
case 1:
|
||||
config_items = query.filter(cls.key == args[0]).first()
|
||||
case _:
|
||||
config_items = query.filter(cls.key.in_(args)).all()
|
||||
return config_items
|
||||
|
||||
|
||||
|
||||
@@ -131,10 +131,8 @@ class Control(BaseClass):
|
||||
__mapper_args__ = {
|
||||
"polymorphic_identity": "Basic Control",
|
||||
"polymorphic_on": case(
|
||||
|
||||
(controltype_name == "PCR Control", "PCR Control"),
|
||||
(controltype_name == "Irida Control", "Irida Control"),
|
||||
|
||||
else_="Basic Control"
|
||||
),
|
||||
"with_polymorphic": "*",
|
||||
@@ -147,15 +145,15 @@ class Control(BaseClass):
|
||||
def find_polymorphic_subclass(cls, polymorphic_identity: str | ControlType | None = None,
|
||||
attrs: dict | None = None) -> Control:
|
||||
"""
|
||||
Find subclass based on polymorphic identity or relevant attributes.
|
||||
Find subclass based on polymorphic identity or relevant attributes.
|
||||
|
||||
Args:
|
||||
polymorphic_identity (str | None, optional): String representing polymorphic identity. Defaults to None.
|
||||
attrs (str | SubmissionType | None, optional): Attributes of the relevant class. Defaults to None.
|
||||
Args:
|
||||
polymorphic_identity (str | None, optional): String representing polymorphic identity. Defaults to None.
|
||||
attrs (str | SubmissionType | None, optional): Attributes of the relevant class. Defaults to None.
|
||||
|
||||
Returns:
|
||||
Control: Subclass of interest.
|
||||
"""
|
||||
Returns:
|
||||
Control: Subclass of interest.
|
||||
"""
|
||||
if isinstance(polymorphic_identity, dict):
|
||||
# logger.debug(f"Controlling for dict value")
|
||||
polymorphic_identity = polymorphic_identity['value']
|
||||
@@ -189,14 +187,11 @@ class Control(BaseClass):
|
||||
|
||||
Args:
|
||||
parent (QWidget): chart holding widget to add buttons to.
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
pass
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def make_chart(cls, parent, chart_settings: dict, ctx):
|
||||
def make_chart(cls, parent, chart_settings: dict, ctx) -> Tuple[Report, "CustomFigure" | None]:
|
||||
"""
|
||||
Dummy operation to be overridden by child classes.
|
||||
|
||||
@@ -307,6 +302,7 @@ class PCRControl(Control):
|
||||
return cls.execute_query(query=query, limit=limit)
|
||||
|
||||
@classmethod
|
||||
@report_result
|
||||
def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]:
|
||||
"""
|
||||
Creates a PCRFigure. Overrides parent
|
||||
|
||||
@@ -4,6 +4,7 @@ All kit and reagent related models
|
||||
from __future__ import annotations
|
||||
import datetime
|
||||
import json
|
||||
import sys
|
||||
from pprint import pformat
|
||||
import yaml
|
||||
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
|
||||
@@ -693,6 +694,9 @@ class SubmissionType(BaseClass):
|
||||
Returns:
|
||||
List[str]: List of sheet names
|
||||
"""
|
||||
# print(f"Getting template file from {self.__database_session__.get_bind()}")
|
||||
if "pytest" in sys.modules:
|
||||
return ExcelFile("C:\\Users\lwark\Documents\python\submissions\mytests\\test_assets\RSL-AR-20240513-1.xlsx").sheet_names
|
||||
return ExcelFile(BytesIO(self.template_file), engine="openpyxl").sheet_names
|
||||
|
||||
def set_template_file(self, filepath: Path | str):
|
||||
|
||||
@@ -6,7 +6,7 @@ import sys
|
||||
import types
|
||||
from copy import deepcopy
|
||||
from getpass import getuser
|
||||
import logging, uuid, tempfile, re, yaml, base64
|
||||
import logging, uuid, tempfile, re, base64
|
||||
from zipfile import ZipFile
|
||||
from tempfile import TemporaryDirectory, TemporaryFile
|
||||
from operator import itemgetter
|
||||
@@ -167,28 +167,24 @@ class BasicSubmission(BaseClass):
|
||||
|
||||
"""
|
||||
# NOTE: Create defaults for all submission_types
|
||||
parent_defs = super().get_default_info()
|
||||
# NOTE: Singles tells the query which fields to set limit to 1
|
||||
dicto = super().get_default_info()
|
||||
recover = ['filepath', 'samples', 'csv', 'comment', 'equipment']
|
||||
dicto = dict(
|
||||
dicto.update(dict(
|
||||
details_ignore=['excluded', 'reagents', 'samples',
|
||||
'extraction_info', 'comment', 'barcode',
|
||||
'platemap', 'export_map', 'equipment', 'tips', 'custom'],
|
||||
# NOTE: Fields not placed in ui form
|
||||
form_ignore=['reagents', 'ctx', 'id', 'cost', 'extraction_info', 'signed_by', 'comment', 'namer',
|
||||
'submission_object', "tips", 'contact_phone', 'custom'] + recover,
|
||||
'submission_object', "tips", 'contact_phone', 'custom', 'cost_centre'] + recover,
|
||||
# NOTE: Fields not placed in ui form to be moved to pydantic
|
||||
form_recover=recover
|
||||
)
|
||||
# NOTE: Singles tells the query which fields to set limit to 1
|
||||
dicto['singles'] = parent_defs['singles']
|
||||
))
|
||||
# NOTE: Grab mode_sub_type specific info.
|
||||
output = {}
|
||||
for k, v in dicto.items():
|
||||
if len(args) > 0 and k not in args:
|
||||
# logger.debug(f"Don't want {k}")
|
||||
continue
|
||||
else:
|
||||
output[k] = v
|
||||
if args:
|
||||
output = {k: v for k, v in dicto.items() if k in args}
|
||||
else:
|
||||
output = {k: v for k, v in dicto.items()}
|
||||
if isinstance(submission_type, SubmissionType):
|
||||
st = submission_type
|
||||
else:
|
||||
@@ -198,7 +194,7 @@ class BasicSubmission(BaseClass):
|
||||
else:
|
||||
output['submission_type'] = st.name
|
||||
for k, v in st.defaults.items():
|
||||
if len(args) > 0 and k not in args:
|
||||
if args and k not in args:
|
||||
# logger.debug(f"Don't want {k}")
|
||||
continue
|
||||
else:
|
||||
@@ -272,6 +268,7 @@ class BasicSubmission(BaseClass):
|
||||
field = self.__getattribute__(name)
|
||||
except AttributeError:
|
||||
return None
|
||||
# assert isinstance(field, list)
|
||||
for item in field:
|
||||
if extra:
|
||||
yield item.to_sub_dict(extra)
|
||||
@@ -1137,9 +1134,9 @@ class BasicSubmission(BaseClass):
|
||||
limit = 1
|
||||
case _:
|
||||
pass
|
||||
if chronologic:
|
||||
logger.debug("Attempting sort by date descending")
|
||||
query = query.order_by(cls.submitted_date.desc())
|
||||
# if chronologic:
|
||||
# logger.debug("Attempting sort by date descending")
|
||||
query = query.order_by(cls.submitted_date.desc())
|
||||
if page_size is not None:
|
||||
query = query.limit(page_size)
|
||||
page = page - 1
|
||||
@@ -2980,7 +2977,6 @@ class WastewaterArticAssociation(SubmissionSampleAssociation):
|
||||
Returns:
|
||||
dict: Updated dictionary with row, column and well updated
|
||||
"""
|
||||
|
||||
sample = super().to_sub_dict()
|
||||
sample['ct'] = self.ct
|
||||
sample['source_plate'] = self.source_plate
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
contains parser objects for pulling values from client generated submission sheets.
|
||||
'''
|
||||
import json
|
||||
import sys
|
||||
from copy import copy
|
||||
from getpass import getuser
|
||||
from pprint import pformat
|
||||
@@ -95,6 +96,7 @@ class SheetParser(object):
|
||||
parser = ReagentParser(xl=self.xl, submission_type=self.submission_type,
|
||||
extraction_kit=extraction_kit)
|
||||
self.sub['reagents'] = parser.parse_reagents()
|
||||
logger.debug(f"Reagents out of parser: {pformat(self.sub['reagents'])}")
|
||||
|
||||
def parse_samples(self):
|
||||
"""
|
||||
@@ -273,7 +275,8 @@ class ReagentParser(object):
|
||||
# logger.debug(f"Reagent Parser map: {self.map}")
|
||||
self.xl = xl
|
||||
|
||||
def fetch_kit_info_map(self, submission_type: str) -> dict:
|
||||
@report_result
|
||||
def fetch_kit_info_map(self, submission_type: str) -> Tuple[Report, dict]:
|
||||
"""
|
||||
Gets location of kit reagents from database
|
||||
|
||||
@@ -283,7 +286,7 @@ class ReagentParser(object):
|
||||
Returns:
|
||||
dict: locations of reagent info for the kit.
|
||||
"""
|
||||
|
||||
report = Report()
|
||||
if isinstance(submission_type, dict):
|
||||
submission_type = submission_type['value']
|
||||
reagent_map = {k: v for k, v in self.kit_object.construct_xl_map_for_use(submission_type)}
|
||||
@@ -291,7 +294,12 @@ class ReagentParser(object):
|
||||
del reagent_map['info']
|
||||
except KeyError:
|
||||
pass
|
||||
return reagent_map
|
||||
# logger.debug(f"Reagent map: {pformat(reagent_map)}")
|
||||
if not reagent_map.keys():
|
||||
report.add_result(Result(owner=__name__, code=0, msg=f"No kit map found for {self.kit_object.name}.\n\n"
|
||||
f"Are you sure you used the right kit?",
|
||||
status="Critical"))
|
||||
return report, reagent_map
|
||||
|
||||
def parse_reagents(self) -> Generator[dict, None, None]:
|
||||
"""
|
||||
@@ -401,6 +409,7 @@ class SampleParser(object):
|
||||
"""
|
||||
invalids = [0, "0", "EMPTY"]
|
||||
smap = self.sample_info_map['plate_map']
|
||||
print(smap)
|
||||
ws = self.xl[smap['sheet']]
|
||||
plate_map_samples = []
|
||||
for ii, row in enumerate(range(smap['start_row'], smap['end_row'] + 1), start=1):
|
||||
@@ -469,8 +478,10 @@ class SampleParser(object):
|
||||
yield new
|
||||
else:
|
||||
merge_on_id = self.sample_info_map['lookup_table']['merge_on_id']
|
||||
plate_map_samples = sorted(copy(self.plate_map_samples), key=lambda d: d['id'])
|
||||
lookup_samples = sorted(copy(self.lookup_samples), key=lambda d: d[merge_on_id])
|
||||
# plate_map_samples = sorted(copy(self.plate_map_samples), key=lambda d: d['id'])
|
||||
# lookup_samples = sorted(copy(self.lookup_samples), key=lambda d: d[merge_on_id])
|
||||
plate_map_samples = sorted(copy(self.plate_map_samples), key=itemgetter('id'))
|
||||
lookup_samples = sorted(copy(self.lookup_samples), key=itemgetter(merge_on_id))
|
||||
for ii, psample in enumerate(plate_map_samples):
|
||||
# NOTE: See if we can do this the easy way and just use the same list index.
|
||||
try:
|
||||
@@ -483,6 +494,8 @@ class SampleParser(object):
|
||||
lookup_samples[ii] = {}
|
||||
else:
|
||||
logger.warning(f"Match for {psample['id']} not direct, running search.")
|
||||
searchables = [(jj, sample) for jj, sample in enumerate(lookup_samples)
|
||||
if merge_on_id in sample.keys()]
|
||||
# for jj, lsample in enumerate(lookup_samples):
|
||||
# try:
|
||||
# check = lsample[merge_on_id] == psample['id']
|
||||
@@ -494,14 +507,18 @@ class SampleParser(object):
|
||||
# break
|
||||
# else:
|
||||
# new = psample
|
||||
jj, new = next(((jj, lsample) for jj, lsample in enumerate(lookup_samples) if lsample[merge_on_id] == psample['id']), (-1, psample))
|
||||
jj, new = next(((jj, lsample | psample) for jj, lsample in searchables
|
||||
if lsample[merge_on_id] == psample['id']), (-1, psample))
|
||||
logger.debug(f"Assigning from index {jj} - {new}")
|
||||
if jj >= 0:
|
||||
lookup_samples[jj] = {}
|
||||
if not check_key_or_attr(key='submitter_id', interest=new, check_none=True):
|
||||
new['submitter_id'] = psample['id']
|
||||
new = self.sub_object.parse_samples(new)
|
||||
del new['id']
|
||||
try:
|
||||
del new['id']
|
||||
except KeyError:
|
||||
pass
|
||||
yield new
|
||||
|
||||
|
||||
@@ -586,7 +603,7 @@ class EquipmentParser(object):
|
||||
nickname=eq.nickname)
|
||||
except AttributeError:
|
||||
logger.error(f"Unable to add {eq} to list.")
|
||||
|
||||
|
||||
|
||||
class TipParser(object):
|
||||
"""
|
||||
@@ -649,7 +666,7 @@ class TipParser(object):
|
||||
yield dict(name=eq.name, role=k, lot=lot)
|
||||
except AttributeError:
|
||||
logger.error(f"Unable to add {eq} to PydTips list.")
|
||||
|
||||
|
||||
|
||||
class PCRParser(object):
|
||||
"""Object to pull data from Design and Analysis PCR export file."""
|
||||
@@ -705,4 +722,3 @@ class PCRParser(object):
|
||||
pcr['imported_by'] = getuser()
|
||||
# logger.debug(f"PCR: {pformat(pcr)}")
|
||||
return pcr
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ contains writer objects for pushing values to submission sheet templates.
|
||||
"""
|
||||
import logging
|
||||
from copy import copy
|
||||
from operator import itemgetter
|
||||
from pprint import pformat
|
||||
from typing import List, Generator, Tuple
|
||||
from openpyxl import load_workbook, Workbook
|
||||
@@ -272,7 +273,8 @@ class SampleWriter(object):
|
||||
self.sample_map = submission_type.construct_sample_map()['lookup_table']
|
||||
# NOTE: exclude any samples without a submission rank.
|
||||
samples = [item for item in self.reconcile_map(sample_list) if item['submission_rank'] > 0]
|
||||
self.samples = sorted(samples, key=lambda k: k['submission_rank'])
|
||||
# self.samples = sorted(samples, key=lambda k: k['submission_rank'])
|
||||
self.samples = sorted(samples, key=itemgetter('submission_rank'))
|
||||
|
||||
def reconcile_map(self, sample_list: list) -> Generator[dict, None, None]:
|
||||
"""
|
||||
|
||||
@@ -11,7 +11,7 @@ from dateutil.parser import ParserError
|
||||
from typing import List, Tuple, Literal
|
||||
from . import RSLNamer
|
||||
from pathlib import Path
|
||||
from tools import check_not_nan, convert_nans_to_nones, Report, Result
|
||||
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone
|
||||
from backend.db.models import *
|
||||
from sqlalchemy.exc import StatementError, IntegrityError
|
||||
from PyQt6.QtWidgets import QWidget
|
||||
@@ -148,7 +148,9 @@ class PydReagent(BaseModel):
|
||||
case "expiry":
|
||||
if isinstance(value, str):
|
||||
value = date(year=1970, month=1, day=1)
|
||||
reagent.expiry = value
|
||||
value = datetime.combine(value, datetime.min.time())
|
||||
logger.debug(f"Expiry date coming into sql: {value} with type {type(value)}")
|
||||
reagent.expiry = value.replace(tzinfo=timezone)
|
||||
case _:
|
||||
try:
|
||||
reagent.__setattr__(key, value)
|
||||
@@ -187,7 +189,11 @@ class PydSample(BaseModel, extra='allow'):
|
||||
for k, v in data.model_extra.items():
|
||||
if k in model.timestamps():
|
||||
if isinstance(v, str):
|
||||
# try:
|
||||
v = datetime.strptime(v, "%Y-%m-%d")
|
||||
# except ValueError:
|
||||
# logger.warning(f"Attribute {k} value {v} for sample {data.submitter_id} could not be coerced into date. Setting to None.")
|
||||
# v = None
|
||||
data.__setattr__(k, v)
|
||||
# logger.debug(f"Data coming out of validation: {pformat(data)}")
|
||||
return data
|
||||
@@ -678,6 +684,7 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
return value
|
||||
|
||||
def __init__(self, run_custom: bool = False, **data):
|
||||
logger.debug(f"{__name__} input data: {data}")
|
||||
super().__init__(**data)
|
||||
# NOTE: this could also be done with default_factory
|
||||
self.submission_object = BasicSubmission.find_polymorphic_subclass(
|
||||
@@ -833,6 +840,18 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
continue
|
||||
if association is not None and association not in instance.submission_tips_associations:
|
||||
instance.submission_tips_associations.append(association)
|
||||
case item if item in instance.timestamps():
|
||||
logger.warning(f"Incoming timestamp key: {item}, with value: {value}")
|
||||
# value = value.replace(tzinfo=timezone)
|
||||
if isinstance(value, date):
|
||||
value = datetime.combine(value, datetime.min.time())
|
||||
value = value.replace(tzinfo=timezone)
|
||||
elif isinstance(value, str):
|
||||
value: datetime = datetime.strptime(value, "%Y-%m-%d")
|
||||
value = value.replace(tzinfo=timezone)
|
||||
else:
|
||||
value = value
|
||||
instance.set_attribute(key=key, value=value)
|
||||
case item if item in instance.jsons():
|
||||
# logger.debug(f"{item} is a json.")
|
||||
try:
|
||||
@@ -941,7 +960,7 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
# NOTE: Exclude any reagenttype found in this pyd not expected in kit.
|
||||
expected_check = [item.role for item in ext_kit_rtypes]
|
||||
output_reagents = [rt for rt in self.reagents if rt.role in expected_check]
|
||||
# logger.debug(f"Already have these reagent types: {output_reagents}")
|
||||
logger.debug(f"Already have these reagent types: {output_reagents}")
|
||||
missing_check = [item.role for item in output_reagents]
|
||||
missing_reagents = [rt for rt in ext_kit_rtypes if rt.role not in missing_check]
|
||||
missing_reagents += [rt for rt in output_reagents if rt.missing]
|
||||
|
||||
Reference in New Issue
Block a user