Mid code cleanup

This commit is contained in:
lwark
2024-09-26 13:54:28 -05:00
parent f4e930c64e
commit 3c98d2a4ad
6 changed files with 49 additions and 62 deletions

View File

@@ -11,7 +11,7 @@ from typing import Any, List
from pathlib import Path
from tools import report_result
# Load testing environment
# NOTE: Load testing environment
if 'pytest' in sys.modules:
sys.path.append(Path(__file__).parents[4].absolute().joinpath("tests").__str__())
@@ -24,7 +24,7 @@ class BaseClass(Base):
"""
Abstract class to pass ctx values to all SQLAlchemy objects.
"""
__abstract__ = True #: Will not be added to DB
__abstract__ = True #: NOTE: Will not be added to DB
__table_args__ = {'extend_existing': True} #: Will only add new columns
@@ -168,6 +168,7 @@ class BaseClass(Base):
logger.error(f"Error message: {type(e)}")
self.__database_session__.rollback()
report.add_result(Result(msg=e, status="Critical"))
return report
class ConfigItem(BaseClass):
@@ -202,5 +203,6 @@ from .organizations import *
from .kits import *
from .submissions import *
# NOTE: Add a creator to the submission for reagent association.
# NOTE: Add a creator to the submission for reagent association. Assigned here due to circular import constraints.
# https://docs.sqlalchemy.org/en/20/orm/extensions/associationproxy.html#sqlalchemy.ext.associationproxy.association_proxy.params.creator
BasicSubmission.reagents.creator = lambda reg: SubmissionReagentAssociation(reagent=reg)

View File

@@ -161,7 +161,7 @@ class Control(BaseClass):
}
return output
def convert_by_mode(self, mode: Literal['kraken', 'matches', 'contains']) -> list[dict]:
def convert_by_mode(self, mode: Literal['kraken', 'matches', 'contains']) -> List[dict]:
"""
split this instance into analysis types for controls graphs
@@ -169,7 +169,7 @@ class Control(BaseClass):
mode (str): analysis type, 'contains', etc
Returns:
list[dict]: list of records
List[dict]: list of records
"""
output = []
# logger.debug("load json string for mode (i.e. contains, matches, kraken2)")

View File

@@ -263,7 +263,6 @@ class KitType(BaseClass):
for k, v in self.construct_xl_map_for_use(submission_type=submission_type):
# logger.debug(f"Value: {v}")
try:
# assoc = [item for item in self.kit_reagentrole_associations if item.reagent_role.name == k][0]
assoc = next(item for item in self.kit_reagentrole_associations if item.reagent_role.name == k)
except StopIteration as e:
continue
@@ -272,10 +271,8 @@ class KitType(BaseClass):
base_dict['reagent roles'].append(v)
for k, v in submission_type.construct_equipment_map():
try:
# assoc = [item for item in submission_type.submissiontype_equipmentrole_associations if
# item.equipment_role.name == k][0]
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k)
item.equipment_role.name == k)
except StopIteration:
continue
for kk, vv in assoc.to_export_dict(kit_type=self).items():
@@ -292,7 +289,7 @@ class ReagentRole(BaseClass):
"""
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: name of reagent type
name = Column(String(64)) #: name of role reagent plays
instances = relationship("Reagent", back_populates="role",
secondary=reagentroles_reagents) #: concrete instances of this reagent type
eol_ext = Column(Interval()) #: extension of life interval
@@ -747,15 +744,12 @@ class SubmissionType(BaseClass):
Returns:
dict: Map equipment locations in excel sheet
"""
# output = {}
# logger.debug("Iterating through equipment roles")
for item in self.submissiontype_equipmentrole_associations:
emap = item.uses
if emap is None:
emap = {}
# output[item.equipment_role.name] = emap
yield item.equipment_role.name, emap
# return output
def construct_tips_map(self) -> Generator[str, dict]:
"""
@@ -764,14 +758,11 @@ class SubmissionType(BaseClass):
Returns:
dict: Tip locations in the excel sheet.
"""
# output = {}
for item in self.submissiontype_tiprole_associations:
tmap = item.uses
if tmap is None:
tmap = {}
# output[item.tip_role.name] = tmap
yield item.tip_role.name, tmap
# return output
def get_equipment(self, extraction_kit: str | KitType | None = None) -> List['PydEquipmentRole']:
"""
@@ -1035,7 +1026,7 @@ class SubmissionTypeKitTypeAssociation(BaseClass):
exclude = ['_sa_instance_state', 'submission_types_id', 'kits_id', 'submission_type', 'kit_type']
base_dict = {k: v for k, v in self.__dict__.items() if k not in exclude}
base_dict['kit_type'] = self.kit_type.to_export_dict(submission_type=self.submission_type)
logger.debug(f"STKTA returning: {base_dict}")
# logger.debug(f"STKTA returning: {base_dict}")
return base_dict
@@ -1073,7 +1064,7 @@ class KitTypeReagentRoleAssociation(BaseClass):
return f"<KitTypeReagentRoleAssociation({self.kit_type} & {self.reagent_role})>"
@validates('required')
def validate_age(self, key, value):
def validate_required(self, key, value):
"""
Ensures only 1 & 0 used in 'required'
@@ -1243,7 +1234,6 @@ class SubmissionReagentAssociation(BaseClass):
match submission:
case BasicSubmission() | str():
if isinstance(submission, str):
# submission = BasicSubmission.query(rsl_number=submission)
submission = BasicSubmission.query(rsl_plate_num=submission)
# logger.debug(f"Lookup SubmissionReagentAssociation by submission BasicSubmission {submission}")
query = query.filter(cls.submission == submission)
@@ -1424,7 +1414,7 @@ class Equipment(BaseClass):
re.VERBOSE)
@classmethod
def assign_equipment(cls, equipment_role: EquipmentRole|str) -> List[Equipment]:
def assign_equipment(cls, equipment_role: EquipmentRole | str) -> List[Equipment]:
"""
Creates a list of equipment from user input to be used in Submission Type creation
@@ -1581,10 +1571,11 @@ class EquipmentRole(BaseClass):
Returns:
dict: dictionary of Association and related reagent role
"""
base_dict = {}
base_dict['role'] = self.name
base_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)
return base_dict
return dict(role=self.name,
processes=self.get_processes(submission_type=submission_type, extraction_kit=kit_type))
# base_dict['role'] = self.name
# base_dict['processes'] = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)
# return base_dict
class SubmissionEquipmentAssociation(BaseClass):
@@ -1621,6 +1612,7 @@ class SubmissionEquipmentAssociation(BaseClass):
Returns:
dict: This SubmissionEquipmentAssociation as a dictionary
"""
# TODO: Currently this will only fetch a single process, even if multiple are selectable.
try:
process = self.process.name
except AttributeError:
@@ -1707,9 +1699,9 @@ class SubmissionTypeEquipmentRoleAssociation(BaseClass):
super().save()
def to_export_dict(self, kit_type: KitType):
base_dict = dict(static=self.static)
for k, v in self.equipment_role.to_export_dict(submission_type=self.submission_type, kit_type=kit_type).items():
base_dict[k] = v
base_dict = {k: v for k, v in self.equipment_role.to_export_dict(submission_type=self.submission_type,
kit_type=kit_type).items()}
base_dict['static'] = self.static
return base_dict

View File

@@ -132,7 +132,7 @@ class BasicSubmission(BaseClass):
str: Representation of this BasicSubmission
"""
submission_type = self.submission_type or "Basic"
return f"{submission_type}Submission({self.rsl_plate_num})"
return f"<{submission_type}Submission({self.rsl_plate_num})>"
@classmethod
def jsons(cls) -> List[str]:
@@ -231,6 +231,9 @@ class BasicSubmission(BaseClass):
"""
Gets the SubmissionType associated with this class
Args:
sub_type (str | SubmissionType, Optional): Identity of the submission type to retrieve. Defaults to None.
Returns:
SubmissionType: SubmissionType with name equal to this polymorphic identity
"""
@@ -421,8 +424,6 @@ class BasicSubmission(BaseClass):
except Exception as e:
logger.error(f"Column count error: {e}")
# NOTE: Get kit associated with this submission
# assoc = [item for item in self.extraction_kit.kit_submissiontype_associations if
# item.submission_type == self.submission_type][0]
assoc = next((item for item in self.extraction_kit.kit_submissiontype_associations if item.submission_type == self.submission_type),
None)
# logger.debug(f"Came up with association: {assoc}")
@@ -445,7 +446,7 @@ class BasicSubmission(BaseClass):
Returns positve sample locations for plate
Returns:
list: list of htipick dictionaries for each sample
list: list of hitpick dictionaries for each sample
"""
output_list = [assoc.to_hitpick() for assoc in self.submission_sample_associations]
return output_list
@@ -468,8 +469,7 @@ class BasicSubmission(BaseClass):
for column in range(1, plate_columns + 1):
for row in range(1, plate_rows + 1):
try:
# well = [item for item in sample_list if item['row'] == row and item['column'] == column][0]
well = next(item for item in sample_list if item['row'] == row and item['column'] == column)
well = next((item for item in sample_list if item['row'] == row and item['column'] == column), dict(name="", row=row, column=column, background_color="#ffffff"))
except StopIteration:
well = dict(name="", row=row, column=column, background_color="#ffffff")
output_samples.append(well)
@@ -2788,16 +2788,17 @@ class SubmissionSampleAssociation(BaseClass):
if isinstance(polymorphic_identity, dict):
polymorphic_identity = polymorphic_identity['value']
if polymorphic_identity is None:
output = cls
model = cls
else:
try:
output = [item for item in cls.__subclasses__() if
item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
# output = [item for item in cls.__subclasses__() if
# item.__mapper_args__['polymorphic_identity'] == polymorphic_identity][0]
model = cls.__mapper__.polymorphic_map[polymorphic_identity].class_
except Exception as e:
logger.error(f"Could not get polymorph {polymorphic_identity} of {cls} due to {e}")
output = cls
model = cls
# logger.debug(f"Using SubmissionSampleAssociation subclass: {output}")
return output
return model
@classmethod
@setup_lookup
@@ -2986,7 +2987,7 @@ class WastewaterAssociation(SubmissionSampleAssociation):
return sample
@classmethod
def autoincrement_id(cls) -> int:
def autoincrement_id_local(cls) -> int:
"""
Increments the association id automatically. Overrides parent
@@ -2994,11 +2995,16 @@ class WastewaterAssociation(SubmissionSampleAssociation):
int: incremented id
"""
try:
parent = [base for base in cls.__bases__ if base.__name__ == "SubmissionSampleAssociation"][0]
parent = next((base for base in cls.__bases__ if base.__name__=="SubmissionSampleAssociation"),
SubmissionSampleAssociation)
return max([item.id for item in parent.query()]) + 1
except ValueError as e:
except StopIteration as e:
logger.error(f"Problem incrementing id: {e}")
return 1
@classmethod
def autoincrement_id(cls) -> int:
return super().autoincrement_id()
class WastewaterArticAssociation(SubmissionSampleAssociation):
@@ -3040,8 +3046,9 @@ class WastewaterArticAssociation(SubmissionSampleAssociation):
int: incremented id
"""
try:
parent = [base for base in cls.__bases__ if base.__name__ == "SubmissionSampleAssociation"][0]
parent = next((base for base in cls.__bases__ if base.__name__ == "SubmissionSampleAssociation"),
SubmissionSampleAssociation)
return max([item.id for item in parent.query()]) + 1
except ValueError as e:
except StopIteration as e:
logger.error(f"Problem incrementing id: {e}")
return 1

View File

@@ -547,23 +547,9 @@ class DocxWriter(object):
if columns == 0:
columns = max([sample['column'] for sample in sample_list])
for row in range(0, rows):
# NOTE: Create a list with length equal to columns length
# contents = [''] * columns
# NOTE: Create a list with length equal to columns length, padding with '' where necessary
contents = [next((item['submitter_id'] for item in sample_list if item['row'] == row + 1 and
item['column'] == column + 1), '') for column in range(0, columns)]
# for column in range(0, columns):
# contents[column] = next((item['submitter_id'] for item in sample_list if item['row'] == row + 1 and item['column'] == column), '')
# try:
# # ooi = [item for item in sample_list if item['row'] == row + 1 and item['column'] == column + 1][0]
# ooi = next(item for item in sample_list if item['row'] == row + 1 and item['column'] == column)
# except StopIteration:
# continue
# contents[column] = ooi['submitter_id']
# NOTE: Pad length of contents to reflect columns
# if len(contents) < columns:
# contents += [''] * (columns - len(contents))
# if not contents:
# contents = [''] * columns
yield contents
def create_merged_template(self, *args) -> BytesIO: