Mid-code cleanup

This commit is contained in:
lwark
2024-12-05 11:19:47 -06:00
parent 51cb5c41a4
commit 5fc02ffeec
8 changed files with 176 additions and 83 deletions

View File

@@ -24,8 +24,8 @@ def set_sqlite_pragma(dbapi_connection, connection_record):
if ctx.database_schema == "sqlite":
execution_phrase = "PRAGMA foreign_keys=ON"
# cursor.execute(execution_phrase)
elif ctx.database_schema == "mssql+pyodbc":
execution_phrase = "SET IDENTITY_INSERT dbo._wastewater ON;"
# elif ctx.database_schema == "mssql+pyodbc":
# execution_phrase = "SET IDENTITY_INSERT dbo._wastewater ON;"
else:
print("Nothing to execute, returning")
cursor.close()

View File

@@ -261,9 +261,9 @@ class KitType(BaseClass):
Returns:
dict: Dictionary containing relevant info for SubmissionType construction
"""
base_dict = dict(name=self.name)
base_dict['reagent roles'] = []
base_dict['equipment roles'] = []
base_dict = dict(name=self.name, reagent_roles=[], equipment_roles=[])
# base_dict['reagent roles'] = []
# base_dict['equipment roles'] = []
for k, v in self.construct_xl_map_for_use(submission_type=submission_type):
# logger.debug(f"Value: {v}")
try:
@@ -272,17 +272,17 @@ class KitType(BaseClass):
continue
for kk, vv in assoc.to_export_dict().items():
v[kk] = vv
base_dict['reagent roles'].append(v)
base_dict['reagent_roles'].append(v)
for k, v in submission_type.construct_field_map("equipment"):
try:
assoc = next(item for item in submission_type.submissiontype_equipmentrole_associations if
item.equipment_role.name == k)
except StopIteration:
continue
for kk, vv in assoc.to_export_dict(kit_type=self).items():
for kk, vv in assoc.to_export_dict(extraction_kit=self).items():
# logger.debug(f"{kk}:{vv}")
v[kk] = vv
base_dict['equipment roles'].append(v)
base_dict['equipment_roles'].append(v)
# logger.debug(f"KT returning {base_dict}")
return base_dict
@@ -360,12 +360,12 @@ class ReagentRole(BaseClass):
assert reagent.role
# logger.debug(f"Looking up reagent type for {type(kit_type)} {kit_type} and {type(reagent)} {reagent}")
# logger.debug(f"Kit reagent types: {kit_type.reagent_types}")
result = list(set(kit_type.reagent_roles).intersection(reagent.role))
result = set(kit_type.reagent_roles).intersection(reagent.role)
# logger.debug(f"Result: {result}")
try:
return result[0]
except IndexError:
return None
# try:
return next((item for item in result), None)
# except IndexError:
# return None
match name:
case str():
# logger.debug(f"Looking up reagent type by name str: {name}")
@@ -445,12 +445,8 @@ class Reagent(BaseClass, LogMixin):
if extraction_kit is not None:
# NOTE: Get the intersection of this reagent's ReagentType and all ReagentTypes in KitType
reagent_role = next((item for item in set(self.role).intersection(extraction_kit.reagent_roles)), self.role[0])
# try:
# reagent_role = list(set(self.role).intersection(extraction_kit.reagent_roles))[0]
# # NOTE: Most will be able to fall back to first ReagentType in itself because most will only have 1.
# except:
# reagent_role = self.role[0]
reagent_role = next((item for item in set(self.role).intersection(extraction_kit.reagent_roles)),
self.role[0])
else:
try:
reagent_role = self.role[0]
@@ -580,11 +576,14 @@ class Reagent(BaseClass, LogMixin):
for key, value in vars.items():
match key:
case "expiry":
if not isinstance(value, date):
field_value = datetime.strptime(value, "%Y-%m-%d").date
field_value.replace(tzinfo=timezone)
if isinstance(value, str):
field_value = datetime.strptime(value, "%Y-%m-%d")
# field_value.replace(tzinfo=timezone)
elif isinstance(value, date):
field_value = datetime.combine(value, datetime.min.time())
else:
field_value = value
field_value.replace(tzinfo=timezone)
case "role":
continue
case _:
@@ -792,6 +791,15 @@ class SubmissionType(BaseClass):
return self.sample_map
def construct_field_map(self, field: Literal['equipment', 'tip']) -> Generator[(str, dict), None, None]:
"""
Make a map of all locations for tips or equipment.
Args:
field (Literal['equipment', 'tip']): the field to construct a map for
Returns:
Generator[(str, dict), None, None]: Generator composing key, locations for each item in the map
"""
for item in self.__getattribute__(f"submissiontype_{field}role_associations"):
fmap = item.uses
if fmap is None:
@@ -799,6 +807,12 @@ class SubmissionType(BaseClass):
yield getattr(item, f"{field}_role").name, fmap
def get_default_kit(self) -> KitType | None:
"""
If only one kits exists for this Submission Type, return it.
Returns:
KitType | None:
"""
if len(self.kit_types) == 1:
return self.kit_types[0]
else:
@@ -1379,7 +1393,7 @@ class Equipment(BaseClass):
else:
return {k: v for k, v in self.__dict__.items()}
def get_processes(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None) -> List[str]:
def get_processes(self, submission_type: str | SubmissionType | None = None, extraction_kit: str | KitType | None = None) -> List[str]:
"""
Get all processes associated with this Equipment for a given SubmissionType
@@ -1390,23 +1404,35 @@ class Equipment(BaseClass):
Returns:
List[Process]: List of process names
"""
processes = [process for process in self.processes if submission_type in process.submission_types]
match extraction_kit:
case str():
# logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}")
processes = [process for process in processes if
extraction_kit in [kit.name for kit in process.kit_types]]
case KitType():
# logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}")
processes = [process for process in processes if extraction_kit in process.kit_types]
case _:
pass
# NOTE: Convert to strings
processes = [process.name for process in processes]
assert all([isinstance(process, str) for process in processes])
if len(processes) == 0:
processes = ['']
return processes
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if isinstance(extraction_kit, str):
extraction_kit = KitType.query(name=extraction_kit)
for process in self.processes:
if submission_type not in process.submission_types:
continue
if extraction_kit and extraction_kit not in process.kit_types:
continue
yield process
# processes = (process for process in self.processes if submission_type in process.submission_types)
# match extraction_kit:
# case str():
# # logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}")
# processes = (process for process in processes if
# extraction_kit in [kit.name for kit in process.kit_types])
# case KitType():
# # logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}")
# processes = (process for process in processes if extraction_kit in process.kit_types)
# case _:
# pass
# # NOTE: Convert to strings
# # processes = [process.name for process in processes]
# # assert all([isinstance(process, str) for process in processes])
# # if len(processes) == 0:
# # processes = ['']
# # return processes
# for process in processes:
# yield process.name
@classmethod
@setup_lookup
@@ -1452,8 +1478,7 @@ class Equipment(BaseClass):
pass
return cls.execute_query(query=query, limit=limit)
def to_pydantic(self, submission_type: SubmissionType,
extraction_kit: str | KitType | None = None,
def to_pydantic(self, submission_type: SubmissionType, extraction_kit: str | KitType | None = None,
role: str = None) -> "PydEquipment":
"""
Creates PydEquipment of this Equipment
@@ -1466,8 +1491,8 @@ class Equipment(BaseClass):
PydEquipment: pydantic equipment object
"""
from backend.validators.pydant import PydEquipment
return PydEquipment(
processes=self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit), role=role,
processes = self.get_processes(submission_type=submission_type, extraction_kit=extraction_kit)
return PydEquipment(processes=processes, role=role,
**self.to_dict(processes=False))
@classmethod
@@ -1603,7 +1628,7 @@ class EquipmentRole(BaseClass):
return cls.execute_query(query=query, limit=limit)
def get_processes(self, submission_type: str | SubmissionType | None,
extraction_kit: str | KitType | None = None) -> List[Process]:
extraction_kit: str | KitType | None = None) -> Generator[Process, None, None]:
"""
Get processes used by this EquipmentRole
@@ -1612,30 +1637,38 @@ class EquipmentRole(BaseClass):
extraction_kit (str | KitType | None, optional): KitType of interest. Defaults to None.
Returns:
List[Process]: _description_
List[Process]: List of processes
"""
if isinstance(submission_type, str):
# logger.debug(f"Checking if str {submission_type} exists")
submission_type = SubmissionType.query(name=submission_type)
if submission_type is not None:
# logger.debug("Getting all processes for this EquipmentRole")
processes = [process for process in self.processes if submission_type in process.submission_types]
else:
processes = self.processes
match extraction_kit:
case str():
# logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}")
processes = [item for item in processes if extraction_kit in [kit.name for kit in item.kit_types]]
case KitType():
# logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}")
processes = [item for item in processes if extraction_kit in [kit for kit in item.kit_types]]
case _:
pass
output = [item.name for item in processes]
if len(output) == 0:
return ['']
else:
return output
if isinstance(extraction_kit, str):
extraction_kit = KitType.query(name=extraction_kit)
for process in self.processes:
if submission_type and submission_type not in process.submission_types:
continue
if extraction_kit and extraction_kit not in process.kit_types:
continue
yield process.name
# if submission_type is not None:
# # logger.debug("Getting all processes for this EquipmentRole")
# processes = [process for process in self.processes if submission_type in process.submission_types]
# else:
# processes = self.processes
# match extraction_kit:
# case str():
# # logger.debug(f"Filtering processes by extraction_kit str {extraction_kit}")
# processes = [item for item in processes if extraction_kit in [kit.name for kit in item.kit_types]]
# case KitType():
# # logger.debug(f"Filtering processes by extraction_kit KitType {extraction_kit}")
# processes = [item for item in processes if extraction_kit in [kit for kit in item.kit_types]]
# case _:
# pass
# output = [item.name for item in processes]
# if len(output) == 0:
# return ['']
# else:
# return output
def to_export_dict(self, submission_type: SubmissionType, kit_type: KitType):
"""
@@ -1644,8 +1677,8 @@ class EquipmentRole(BaseClass):
Returns:
dict: dictionary of Association and related reagent role
"""
return dict(role=self.name,
processes=self.get_processes(submission_type=submission_type, extraction_kit=kit_type))
processes = self.get_processes(submission_type=submission_type, extraction_kit=kit_type)
return dict(role=self.name, processes=[item for item in processes])
class SubmissionEquipmentAssociation(BaseClass):

View File

@@ -1074,6 +1074,7 @@ class BasicSubmission(BaseClass, LogMixin):
@setup_lookup
def query(cls,
submission_type: str | SubmissionType | None = None,
submission_type_name: str|None = None,
id: int | str | None = None,
rsl_plate_num: str | None = None,
start_date: date | str | int | None = None,
@@ -1173,6 +1174,11 @@ class BasicSubmission(BaseClass, LogMixin):
limit = 1
case _:
pass
match submission_type_name:
case str():
query = query.filter(model.submission_type_name == submission_type_name)
case _:
pass
# NOTE: by id (returns only a single value)
match id:
case int():
@@ -1389,13 +1395,23 @@ class BasicSubmission(BaseClass, LogMixin):
@classmethod
def calculate_turnaround(cls, start_date:date|None=None, end_date:date|None=None) -> Tuple[int|None, bool|None]:
if 'pytest' not in sys.modules:
from tools import ctx
else:
from test_settings import ctx
if not end_date:
return None, None
try:
delta = np.busday_count(start_date, end_date, holidays=create_holidays_for_year(start_date.year)) + 1
except ValueError:
return None, None
return delta, delta <= ctx.TaT_threshold
try:
tat = cls.get_default_info("turnaround_time")
except (AttributeError, KeyError):
tat = None
if not tat:
tat = ctx.TaT_threshold
return delta, delta <= tat
# Below are the custom submission types