Code cleanup for excel.parsers complete.

This commit is contained in:
lwark
2025-09-04 14:35:44 -05:00
parent fcda0d873c
commit b6e1c0dee2
11 changed files with 161 additions and 138 deletions

View File

@@ -538,7 +538,10 @@ class BaseClass(Base):
case _: case _:
return super().__setattr__(key, value) return super().__setattr__(key, value)
else: else:
try:
return super().__setattr__(key, value) return super().__setattr__(key, value)
except AttributeError:
raise AttributeError(f"Can't set {key} to {value}")
def delete(self, **kwargs): def delete(self, **kwargs):
logger.error(f"Delete has not been implemented for {self.__class__.__name__}") logger.error(f"Delete has not been implemented for {self.__class__.__name__}")

View File

@@ -1027,7 +1027,8 @@ class ProcedureTypeReagentRoleAssociation(BaseClass):
reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id"), reagentrole_id = Column(INTEGER, ForeignKey("_reagentrole.id"),
primary_key=True) #: id of associated reagentrole primary_key=True) #: id of associated reagentrole
proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"), primary_key=True) #: id of associated proceduretype proceduretype_id = Column(INTEGER, ForeignKey("_proceduretype.id"),
primary_key=True) #: id of associated proceduretype
uses = Column(JSON) #: map to location on excel sheets of different procedure types uses = Column(JSON) #: map to location on excel sheets of different procedure types
required = Column(INTEGER) #: whether the reagent type is required for the kittype (Boolean 1 or 0) required = Column(INTEGER) #: whether the reagent type is required for the kittype (Boolean 1 or 0)
last_used = Column(String(32)) #: last used lot number of this type of reagent last_used = Column(String(32)) #: last used lot number of this type of reagent
@@ -1324,7 +1325,7 @@ class ProcedureReagentLotAssociation(BaseClass):
output.update(relevant) output.update(relevant)
output['reagentrole'] = self.reagentrole output['reagentrole'] = self.reagentrole
output['misc_info'] = misc output['misc_info'] = misc
logger.debug(f"Output: {pformat(output)}") # logger.debug(f"Output: {pformat(output)}")
return output return output
def delete(self, **kwargs): def delete(self, **kwargs):
@@ -1544,7 +1545,6 @@ class Equipment(BaseClass, LogMixin):
else: else:
return {k: v for k, v in self.__dict__.items()} return {k: v for k, v in self.__dict__.items()}
@classmethod @classmethod
@setup_lookup @setup_lookup
def query(cls, def query(cls,
@@ -1842,7 +1842,7 @@ class ProcessVersion(BaseClass):
@classmethod @classmethod
def query(cls, def query(cls,
version: str | None = None, version: str | float | None = None,
name: str | None = None, name: str | None = None,
limit: int = 0, limit: int = 0,
**kwargs) -> ReagentLot | List[ReagentLot]: **kwargs) -> ReagentLot | List[ReagentLot]:
@@ -1853,8 +1853,8 @@ class ProcessVersion(BaseClass):
case _: case _:
pass pass
match version: match version:
case str(): case str() | float():
query = query.filter(cls.version == version) query = query.filter(cls.version == float(version))
case _: case _:
pass pass
return cls.execute_query(query=query, limit=limit) return cls.execute_query(query=query, limit=limit)
@@ -1879,6 +1879,9 @@ class Tips(BaseClass):
@setup_lookup @setup_lookup
def query(cls, def query(cls,
name: str | None = None, name: str | None = None,
manufacturer: str | None = None,
capacity: str | None = None,
ref: str | None = None,
limit: int = 0, limit: int = 0,
**kwargs) -> Tips | List[Tips]: **kwargs) -> Tips | List[Tips]:
query = cls.__database_session__.query(cls) query = cls.__database_session__.query(cls)
@@ -1888,6 +1891,22 @@ class Tips(BaseClass):
limit = 1 limit = 1
case _: case _:
pass pass
match manufacturer:
case str():
query = query.filter(cls.manufacturer == manufacturer)
case _:
pass
match capacity:
case int():
query = query.filter(cls.capacity == capacity)
case _:
pass
match ref:
case str():
query = query.filter(cls.ref == ref)
limit = 1
case _:
pass
return cls.execute_query(query=query, limit=limit) return cls.execute_query(query=query, limit=limit)
@check_authorization @check_authorization
@@ -1944,15 +1963,21 @@ class TipsLot(BaseClass, LogMixin):
@property @property
def name(self) -> str: def name(self) -> str:
return f"{self.tips.manufacturer}-{self.tips.capacity}-{self.lot}" return f"{self.tips.manufacturer}-{self.tips.ref}-{self.lot}"
@classmethod @classmethod
def query(cls, name: str | None = None, lot: str | None = None, limit: int = 0, **kwargs) -> Tips | List[Tips]: def query(cls,
manufacturer: str | None = None,
ref: str | None = None,
lot: str | None = None,
limit: int = 0,
**kwargs) -> Tips | List[Tips]:
""" """
Lookup tips Lookup tips
Args: Args:
name (str | None, optional): Informal name of tips. Defaults to None. manufacturer (str | None, optional): Name of parent tip manufacturer. Defaults to None.
ref (str | None, optional): Name of parent tip reference number. Defaults to None.
lot (str | None, optional): Lot number. Defaults to None. lot (str | None, optional): Lot number. Defaults to None.
limit (int, optional): Maximum number of results to return (0=all). Defaults to 0. limit (int, optional): Maximum number of results to return (0=all). Defaults to 0.
@@ -1960,9 +1985,17 @@ class TipsLot(BaseClass, LogMixin):
Tips | List[Tips]: Tips matching criteria Tips | List[Tips]: Tips matching criteria
""" """
query = cls.__database_session__.query(cls) query = cls.__database_session__.query(cls)
match name: if manufacturer is not None and ref is not None:
manufacturer = None
match manufacturer:
case str(): case str():
query = query.filter(cls.name == name) logger.debug(f"Looking for {manufacturer}")
query = query.join(Tips).filter(Tips.manufacturer == manufacturer)
case _:
pass
match ref:
case str():
query = query.join(Tips).filter(Tips.ref == ref)
case _: case _:
pass pass
match lot: match lot:
@@ -2032,7 +2065,8 @@ class ProcedureEquipmentAssociation(BaseClass):
equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment equipment = relationship(Equipment, back_populates="equipmentprocedureassociation") #: associated equipment
processversion = relationship(ProcessVersion, back_populates="procedureequipmentassociation") #: Associated process version processversion = relationship(ProcessVersion,
back_populates="procedureequipmentassociation") #: Associated process version
tipslot_id = Column(INTEGER, ForeignKey("_tipslot.id", ondelete="SET NULL", tipslot_id = Column(INTEGER, ForeignKey("_tipslot.id", ondelete="SET NULL",
name="SEA_Tipslot_id")) name="SEA_Tipslot_id"))
@@ -2153,7 +2187,10 @@ class ProcedureEquipmentAssociation(BaseClass):
output.update(relevant) output.update(relevant)
output['misc_info'] = misc output['misc_info'] = misc
output['equipment_role'] = self.equipmentrole output['equipment_role'] = self.equipmentrole
try:
output['processversion'] = self.processversion.details_dict() output['processversion'] = self.processversion.details_dict()
except AttributeError:
output['processversion'] = None
try: try:
output['tips'] = self.tipslot.details_dict() output['tips'] = self.tipslot.details_dict()
except AttributeError: except AttributeError:

View File

@@ -1,5 +1,5 @@
""" """
Default Parser archetypes.
""" """
from __future__ import annotations from __future__ import annotations
import logging, re import logging, re
@@ -43,7 +43,8 @@ class DefaultParser(object):
*args (): *args ():
**kwargs (): **kwargs ():
""" """
logger.debug(f"\n\nHello from {self.__class__.__name__}\n\n") logger.info(f"\n\nHello from {self.__class__.__name__}\n\n")
self.filepath = filepath
self.proceduretype = proceduretype self.proceduretype = proceduretype
try: try:
self._pyd_object = getattr(pydant, self._pyd_object = getattr(pydant,
@@ -61,10 +62,8 @@ class DefaultParser(object):
self.worksheet = self.workbook[self.sheet] self.worksheet = self.workbook[self.sheet]
self.start_row = self.delineate_start_row(start_row=start_row) self.start_row = self.delineate_start_row(start_row=start_row)
self.end_row = self.delineate_end_row(start_row=self.start_row) self.end_row = self.delineate_end_row(start_row=self.start_row)
logger.debug(f"Start row: {self.start_row}, End row: {self.end_row}")
def to_pydantic(self): def to_pydantic(self):
# data = {key: value['value'] for key, value in self.parsed_info.items()}
data = self.parsed_info data = self.parsed_info
data['filepath'] = self.filepath data['filepath'] = self.filepath
return self._pyd_object(**data) return self._pyd_object(**data)
@@ -100,7 +99,6 @@ class DefaultKEYVALUEParser(DefaultParser):
rows = range(self.start_row, self.end_row) rows = range(self.start_row, self.end_row)
for row in rows: for row in rows:
check_row = [item for item in self.worksheet.rows][row-1] check_row = [item for item in self.worksheet.rows][row-1]
logger.debug(f"Checking row {row-1}, {check_row} for merged cells.")
if any([isinstance(cell, MergedCell) for cell in check_row]): if any([isinstance(cell, MergedCell) for cell in check_row]):
continue continue
key = self.worksheet.cell(row, 1).value key = self.worksheet.cell(row, 1).value
@@ -110,9 +108,7 @@ class DefaultKEYVALUEParser(DefaultParser):
key = key.lower().replace(":", "").strip().replace(" ", "_") key = key.lower().replace(":", "").strip().replace(" ", "_")
value = self.worksheet.cell(row, 2).value value = self.worksheet.cell(row, 2).value
missing = False if value else True missing = False if value else True
# location_map = dict(row=row, key_column=1, value_column=2, sheet=self.worksheet.title)
value = dict(value=value, missing=missing)#, location=location_map) value = dict(value=value, missing=missing)#, location=location_map)
logger.debug(f"Yielding {value} for {key}")
yield key, value yield key, value
@@ -123,7 +119,6 @@ class DefaultTABLEParser(DefaultParser):
@property @property
def parsed_info(self) -> Generator[dict, None, None]: def parsed_info(self) -> Generator[dict, None, None]:
logger.debug(f"creating dataframe from {self.start_row} to {self.end_row}")
df = DataFrame( df = DataFrame(
[item for item in self.worksheet.values][self.start_row - 1:self.end_row - 1]) [item for item in self.worksheet.values][self.start_row - 1:self.end_row - 1])
df.columns = df.iloc[0] df.columns = df.iloc[0]
@@ -131,12 +126,10 @@ class DefaultTABLEParser(DefaultParser):
df = df.dropna(axis=1, how='all') df = df.dropna(axis=1, how='all')
for ii, row in enumerate(df.iterrows()): for ii, row in enumerate(df.iterrows()):
output = {} output = {}
# for key, value in row[1].to_dict().items():
for key, value in row[1].details_dict().items(): for key, value in row[1].details_dict().items():
if isinstance(key, str): if isinstance(key, str):
key = key.lower().replace(" ", "_") key = key.lower().replace(" ", "_")
key = re.sub(r"_(\(.*\)|#)", "", key) key = re.sub(r"_(\(.*\)|#)", "", key)
# logger.debug(f"Row {ii} values: {key}: {value}")
output[key] = value output[key] = value
yield output yield output

View File

@@ -1,11 +1,11 @@
""" """
Module for clientsubmission parsing
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
from pathlib import Path from pathlib import Path
from string import ascii_lowercase from string import ascii_lowercase
from typing import Generator, TYPE_CHECKING, Literal from typing import Generator, TYPE_CHECKING
from openpyxl.reader.excel import load_workbook from openpyxl.reader.excel import load_workbook
from tools import row_keys from tools import row_keys
from . import DefaultKEYVALUEParser, DefaultTABLEParser from . import DefaultKEYVALUEParser, DefaultTABLEParser
@@ -122,20 +122,6 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
else: else:
self.submissiontype = submissiontype self.submissiontype = submissiontype
super().__init__(filepath=filepath, sheet="Client Info", start_row=1, **kwargs) super().__init__(filepath=filepath, sheet="Client Info", start_row=1, **kwargs)
# NOTE: move to the manager class.
# allowed_procedure_types = [item.name for item in self.submissiontype.proceduretype]
# for name in allowed_procedure_types:
# if name in self.workbook.sheetnames:
# # TODO: check if run with name already exists
# add_run = QuestionAsker(title="Add Run?", message="We've detected a sheet corresponding to an associated procedure type.\nWould you like to add a new run?")
# if add_run.accepted:
# # NOTE: recruit parser.
# try:
# manager = getattr(procedure_managers, name)
# except AttributeError:
# manager = procedure_managers.DefaultManager
# self.manager = manager(proceduretype=name)
# pass
@property @property
def parsed_info(self): def parsed_info(self):
@@ -144,13 +130,11 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
output['clientlab'] = output['client_lab'] output['clientlab'] = output['client_lab']
except KeyError: except KeyError:
pass pass
# output['submissiontype'] = dict(value=self.submissiontype.name.title())
try: try:
output['submissiontype'] = output['submission_type'] output['submissiontype'] = output['submission_type']
output['submissiontype']['value'] = self.submissiontype.name.title() output['submissiontype']['value'] = self.submissiontype.name.title()
except KeyError: except KeyError:
pass pass
logger.debug(f"Data: {output}")
return output return output
@@ -173,8 +157,6 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
def parsed_info(self) -> Generator[dict, None, None]: def parsed_info(self) -> Generator[dict, None, None]:
output = super().parsed_info output = super().parsed_info
for ii, sample in enumerate(output, start=1): for ii, sample in enumerate(output, start=1):
# logger.debug(f"Parsed info sample: {sample}")
if isinstance(sample["row"], str) and sample["row"].lower() in ascii_lowercase[0:8]: if isinstance(sample["row"], str) and sample["row"].lower() in ascii_lowercase[0:8]:
try: try:
sample["row"] = row_keys[sample["row"]] sample["row"] = row_keys[sample["row"]]
@@ -184,5 +166,4 @@ class ClientSubmissionSampleParser(DefaultTABLEParser, SubmissionTyperMixin):
yield sample yield sample
def to_pydantic(self): def to_pydantic(self):
logger.debug(f"Attempting to pydantify: {self._pyd_object}")
return [self._pyd_object(**sample) for sample in self.parsed_info if sample['sample_id']] return [self._pyd_object(**sample) for sample in self.parsed_info if sample['sample_id']]

View File

@@ -8,6 +8,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
"""
TODO
- range dicts should hopefully not be necessary in this type of parser. Hopefully all procedure parsers are the same.
"""
class ProcedureInfoParser(DefaultKEYVALUEParser): class ProcedureInfoParser(DefaultKEYVALUEParser):
default_range_dict = [dict( default_range_dict = [dict(

View File

@@ -1,5 +1,5 @@
""" """
Parser for pcr results from Design and Analysis Studio
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
@@ -15,7 +15,7 @@ logger = logging.getLogger(f"submissions.{__name__}")
class PCRInfoParser(DefaultResultsInfoParser): class PCRInfoParser(DefaultResultsInfoParser):
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs): def __init__(self, filepath: Path | str, procedure=None, **kwargs):
self.results_type = "PCR" self.results_type = "PCR"
self.procedure = procedure self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype) super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype)

View File

@@ -49,7 +49,7 @@ class ClientSubmissionNamer(DefaultNamer):
logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.") logger.warning(f"Getting submissiontype from regex failed, using default submissiontype.")
sub_type = SubmissionType.query(name="Default") sub_type = SubmissionType.query(name="Default")
logger.debug(f"Submission Type: {sub_type}") logger.debug(f"Submission Type: {sub_type}")
sys.exit() # sys.exit()
return sub_type return sub_type
def get_subtype_from_regex(self) -> SubmissionType: def get_subtype_from_regex(self) -> SubmissionType:

View File

@@ -357,7 +357,7 @@ class PydTips(PydBaseClass):
# return value # return value
@report_result @report_result
def to_sql(self, procedure: Run) -> Tuple[Tips, Report]: def to_sql(self) -> Tuple[Tips, Report]:
""" """
Convert this object to the SQL version for database storage. Convert this object to the SQL version for database storage.
@@ -382,7 +382,7 @@ class PydEquipment(PydBaseClass):
# process: List[dict] | None # process: List[dict] | None
process: List[PydProcess] | PydProcess | None process: List[PydProcess] | PydProcess | None
equipmentrole: str | PydEquipmentRole | None equipmentrole: str | PydEquipmentRole | None
# tips: List[PydTips] | PydTips | None = Field(default=[]) tips: List[PydTips] | PydTips | None = Field(default=[])
@field_validator('equipmentrole', mode='before') @field_validator('equipmentrole', mode='before')
@classmethod @classmethod
@@ -400,7 +400,7 @@ class PydEquipment(PydBaseClass):
@field_validator('process', mode='before') @field_validator('process', mode='before')
@classmethod @classmethod
def make_empty_list(cls, value, values): def process_to_pydantic(cls, value, values):
# if isinstance(value, dict): # if isinstance(value, dict):
# value = value['processes'] # value = value['processes']
if isinstance(value, GeneratorType): if isinstance(value, GeneratorType):
@@ -408,6 +408,9 @@ class PydEquipment(PydBaseClass):
value = convert_nans_to_nones(value) value = convert_nans_to_nones(value)
if not value: if not value:
value = [] value = []
if isinstance(value, ProcessVersion):
value = value.to_pydantic(pyd_model_name="PydProcess")
else:
try: try:
# value = [item.strip() for item in value] # value = [item.strip() for item in value]
d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None) d: Process = next((process for process in value if values.data['name'] in [item.name for item in process.equipment]), None)
@@ -421,6 +424,34 @@ class PydEquipment(PydBaseClass):
pass pass
return value return value
@field_validator('tips', mode='before')
@classmethod
def tips_to_pydantic(cls, value, values):
# if isinstance(value, dict):
# value = value['processes']
if isinstance(value, GeneratorType):
value = [item for item in value]
value = convert_nans_to_nones(value)
if not value:
value = []
if isinstance(value, TipsLot):
value = value.to_pydantic(pyd_model_name="PydTips")
else:
try:
# value = [item.strip() for item in value]
d: Tips = next(
(tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]),
None)
print(f"Next process: {d.details_dict()}")
if d:
# value = PydProcess(**d.details_dict())
value = d.to_pydantic()
# value = next((process.to_pydantic() for process in value))
except AttributeError as e:
logger.error(f"Process Validation error due to {e}")
pass
return value
# @field_validator('tips', mode='before') # @field_validator('tips', mode='before')
# @classmethod # @classmethod
# def tips_to_pydantic(cls, value): # def tips_to_pydantic(cls, value):
@@ -1172,7 +1203,7 @@ class PydEquipmentRole(BaseModel):
class PydProcess(PydBaseClass, extra="allow"): class PydProcess(PydBaseClass, extra="allow"):
name: str name: str
version: str = Field(default="1") version: str = Field(default="1.0")
# equipment: List[str] # equipment: List[str]
tips: List[PydTips] tips: List[PydTips]
@@ -1198,12 +1229,20 @@ class PydProcess(PydBaseClass, extra="allow"):
value = [item for item in value if item] value = [item for item in value if item]
return value return value
@field_validator("version", mode="before")
@classmethod
def enforce_float_string(cls, value):
if isinstance(value, float):
value = str(value)
return value
@report_result @report_result
def to_sql(self): def to_sql(self):
report = Report() report = Report()
name = self.name.split("-")[0]
logger.debug(f"Query process: {self.name}, version = {self.version}") logger.debug(f"Query process: {self.name}, version = {self.version}")
# NOTE: can't use query_or_create due to name not being part of ProcessVersion # NOTE: can't use query_or_create due to name not being part of ProcessVersion
instance = ProcessVersion.query(name=self.name, version=self.version, limit=1) instance = ProcessVersion.query(name=name, version=self.version, limit=1)
logger.debug(f"Got instance: {instance}") logger.debug(f"Got instance: {instance}")
if not instance: if not instance:
instance = ProcessVersion() instance = ProcessVersion()
@@ -1245,8 +1284,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
technician: dict = Field(default=dict(value="NA", missing=True)) technician: dict = Field(default=dict(value="NA", missing=True))
repeat: bool = Field(default=False) repeat: bool = Field(default=False)
repeat_of: Procedure | None = Field(default=None) repeat_of: Procedure | None = Field(default=None)
# kittype: dict = Field(default=dict(value="NA", missing=True))
# possible_kits: list | None = Field(default=[], validate_default=True)
plate_map: str | None = Field(default=None) plate_map: str | None = Field(default=None)
reagent: list | None = Field(default=[]) reagent: list | None = Field(default=[])
reagentrole: dict | None = Field(default={}, validate_default=True) reagentrole: dict | None = Field(default={}, validate_default=True)
@@ -1291,17 +1328,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
value['missing'] = True value['missing'] = True
return value return value
# @field_validator("possible_kits")
# @classmethod
# def rescue_possible_kits(cls, value, values):
# if not value:
# try:
# if values.data['proceduretype']:
# value = [kittype.__dict__['name'] for kittype in values.data['proceduretype'].kittype]
# except KeyError:
# pass
# return value
@field_validator("name", "technician")#, "kittype") @field_validator("name", "technician")#, "kittype")
@classmethod @classmethod
def set_colour(cls, value): def set_colour(cls, value):
@@ -1471,8 +1497,6 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql = Procedure() sql = Procedure()
else: else:
sql = super().to_sql() sql = super().to_sql()
logger.debug(f"Initial PYD: {pformat(self.__dict__)}")
# sql.results = [result.to_sql() for result in self.results]
if isinstance(self.name, dict): if isinstance(self.name, dict):
sql.name = self.name['value'] sql.name = self.name['value']
else: else:
@@ -1481,7 +1505,7 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql.technician = self.technician['value'] sql.technician = self.technician['value']
else: else:
sql.technician = self.technician sql.technician = self.technician
sql.repeat = self.repeat # sql.repeat = int(self.repeat)
if sql.repeat: if sql.repeat:
regex = re.compile(r".*\dR\d$") regex = re.compile(r".*\dR\d$")
repeats = [item for item in self.run.procedure if repeats = [item for item in self.run.procedure if
@@ -1493,21 +1517,12 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
sql.run = self.run sql.run = self.run
if self.proceduretype: if self.proceduretype:
sql.proceduretype = self.proceduretype sql.proceduretype = self.proceduretype
# Note: convert any new reagents to sql and save
# for reagentrole, reagents in self.reagentrole.items():
# for reagent in self.reagent:
# if not reagent.lot or reagent.name == "--New--":
# continue
# self.update_new_reagents(reagent)
# NOTE: reset reagent associations. # NOTE: reset reagent associations.
# sql.procedurereagentassociation = []
for reagent in self.reagent: for reagent in self.reagent:
if isinstance(reagent, dict): if isinstance(reagent, dict):
reagent = PydReagent(**reagent) reagent = PydReagent(**reagent)
logger.debug(reagent)
reagentrole = reagent.reagentrole reagentrole = reagent.reagentrole
reagent = reagent.to_sql() reagent = reagent.to_sql()
# logger.debug(reagentrole)
if reagent not in sql.reagentlot: if reagent not in sql.reagentlot:
# NOTE: Remove any previous association for this role. # NOTE: Remove any previous association for this role.
if sql.id: if sql.id:
@@ -1529,36 +1544,36 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
start_index = 1 start_index = 1
relevant_samples = [sample for sample in self.sample if relevant_samples = [sample for sample in self.sample if
not sample.sample_id.startswith("blank_") and not sample.sample_id == ""] not sample.sample_id.startswith("blank_") and not sample.sample_id == ""]
# logger.debug(f"start index: {start_index}")
assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1) assoc_id_range = range(start_index, start_index + len(relevant_samples) + 1)
# logger.debug(f"Association id range: {assoc_id_range}")
for iii, sample in enumerate(relevant_samples): for iii, sample in enumerate(relevant_samples):
sample_sql = sample.to_sql() sample_sql = sample.to_sql()
if sql.run: if sql.run:
if sample_sql not in sql.run.sample: if sample_sql not in sql.run.sample:
logger.debug(f"sample {sample_sql} not found in {sql.run.sample}")
run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row, run_assoc = RunSampleAssociation(sample=sample_sql, run=self.run, row=sample.row,
column=sample.column) column=sample.column)
# else:
# logger.debug(f"sample {sample_sql} found in {sql.run.sample}")
if sample_sql not in sql.sample: if sample_sql not in sql.sample:
proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql, proc_assoc = ProcedureSampleAssociation(new_id=assoc_id_range[iii], procedure=sql, sample=sample_sql,
row=sample.row, column=sample.column, row=sample.row, column=sample.column,
procedure_rank=sample.procedure_rank) procedure_rank=sample.procedure_rank)
# sys.exit(pformat(self.equipment))
for equipment in self.equipment: for equipment in self.equipment:
logger.debug(f"Equipment: {equipment}")
# equip = Equipment.query(name=equipment.name)
equip, _ = equipment.to_sql() equip, _ = equipment.to_sql()
logger.debug(f"Process: {equipment.process}")
if isinstance(equipment.process, list): if isinstance(equipment.process, list):
equipment.process = equipment.process[0] equipment.process = equipment.process[0]
if isinstance(equipment.tips, list):
try:
equipment.tips = equipment.tips[0]
except IndexError:
equipment.tips = None
if equip not in sql.equipment: if equip not in sql.equipment:
equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql, equip_assoc = ProcedureEquipmentAssociation(equipment=equip, procedure=sql,
equipmentrole=equip.equipmentrole[0]) equipmentrole=equip.equipmentrole[0])
process = equipment.process.to_sql() process = equipment.process.to_sql()
equip_assoc.processversion = process equip_assoc.processversion = process
# sys.exit(pformat([item.__dict__ for item in sql.procedureequipmentassociation])) try:
tipslot = equipment.tips.to_sql()
except AttributeError:
tipslot = None
equip_assoc.tipslot = tipslot
return sql, None return sql, None

View File

@@ -1,18 +1,15 @@
""" """
Main module to construct the procedure form
""" """
from __future__ import annotations from __future__ import annotations
import sys, logging, os, re, datetime import sys, logging, re, datetime
from pathlib import Path
from pprint import pformat from pprint import pformat
from PyQt6.QtCore import pyqtSlot, Qt from PyQt6.QtCore import pyqtSlot, Qt
from PyQt6.QtGui import QContextMenuEvent, QAction
from PyQt6.QtWebChannel import QWebChannel from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QDialog, QGridLayout, QMenu, QDialogButtonBox from PyQt6.QtWidgets import QDialog, QGridLayout, QDialogButtonBox
from typing import TYPE_CHECKING, Any, List from typing import TYPE_CHECKING, Any, List
if TYPE_CHECKING: if TYPE_CHECKING:
from backend.db.models import Run, Procedure
from backend.validators import PydProcedure, PydEquipment from backend.validators import PydProcedure, PydEquipment
from tools import get_application_from_parent, render_details_template, sanitize_object_for_json from tools import get_application_from_parent, render_details_template, sanitize_object_for_json
@@ -26,7 +23,6 @@ class ProcedureCreation(QDialog):
self.edit = edit self.edit = edit
self.run = procedure.run self.run = procedure.run
self.procedure = procedure self.procedure = procedure
# logger.debug(f"procedure: {pformat(self.procedure.__dict__)}")
self.proceduretype = procedure.proceduretype self.proceduretype = procedure.proceduretype
self.setWindowTitle(f"New {self.proceduretype.name} for {self.run.rsl_plate_number}") self.setWindowTitle(f"New {self.proceduretype.name} for {self.run.rsl_plate_number}")
self.plate_map = self.proceduretype.construct_plate_map(sample_dicts=self.procedure.sample) self.plate_map = self.proceduretype.construct_plate_map(sample_dicts=self.procedure.sample)
@@ -75,17 +71,14 @@ class ProcedureCreation(QDialog):
equipmentrole['equipment'].index(item_in_er_list))) equipmentrole['equipment'].index(item_in_er_list)))
proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True) proceduretype_dict['equipment_section'] = EquipmentUsage.construct_html(procedure=self.procedure, child=True)
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']] proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
logger.debug(proceduretype_dict['equipment'])
self.update_equipment = EquipmentUsage.update_equipment self.update_equipment = EquipmentUsage.update_equipment
regex = re.compile(r".*R\d$") regex = re.compile(r".*R\d$")
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))] proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
html = render_details_template( html = render_details_template(
template_name="procedure_creation", template_name="procedure_creation",
# css_in=['new_context_menu'],
js_in=["procedure_form", "grid_drag", "context_menu"], js_in=["procedure_form", "grid_drag", "context_menu"],
proceduretype=proceduretype_dict, proceduretype=proceduretype_dict,
run=self.run.details_dict(), run=self.run.details_dict(),
# procedure=self.procedure.__dict__,
procedure=self.procedure, procedure=self.procedure,
plate_map=self.plate_map, plate_map=self.plate_map,
edit=self.edit edit=self.edit
@@ -94,8 +87,12 @@ class ProcedureCreation(QDialog):
@pyqtSlot(str, str, str, str) @pyqtSlot(str, str, str, str)
def update_equipment(self, equipmentrole: str, equipment: str, process: str, tips: str): def update_equipment(self, equipmentrole: str, equipment: str, process: str, tips: str):
from backend.db.models import Equipment from backend.db.models import Equipment, ProcessVersion, TipsLot
# logger.debug("Updating equipment") logger.debug(f"Updating equipment with"
f"\n\tEquipment role: {equipmentrole}"
f"\n\tEquipment: {equipment}"
f"\n\tProcess: {process}"
f"\n\tTips: {tips}")
try: try:
equipment_of_interest = next( equipment_of_interest = next(
(item for item in self.procedure.equipment if item.equipmentrole == equipmentrole)) (item for item in self.procedure.equipment if item.equipmentrole == equipmentrole))
@@ -109,14 +106,25 @@ class ProcedureCreation(QDialog):
eoi.name = equipment.name eoi.name = equipment.name
eoi.asset_number = equipment.asset_number eoi.asset_number = equipment.asset_number
eoi.nickname = equipment.nickname eoi.nickname = equipment.nickname
# logger.warning("Setting processes.") process_name, version = process.split("-v")
eoi.process = [process for process in equipment.get_processes(equipmentrole=equipmentrole)] process = ProcessVersion.query(name=process_name, version=version, limit=1)
eoi.process = process
# sys.exit(f"Process:\n{pformat(eoi.process.__dict__)}")
try:
tips_manufacturer, tipsref, lot = [item if item != "" else None for item in tips.split("-")]
logger.debug(f"Querying with '{tips_manufacturer}', '{tipsref}', '{lot}'")
tips = TipsLot.query(manufacturer=tips_manufacturer, ref=tipsref, lot=lot)
logger.debug(f"Found tips: {tips}")
eoi.tips = tips
except ValueError:
logger.warning(f"No tips info to unpack")
# tips = TipsLot.query(manufacturer=tips_manufacturer, ref=tipsref, lot=lot)
# eoi.tips = tips
self.procedure.equipment.append(eoi) self.procedure.equipment.append(eoi)
# logger.debug(f"Updated equipment: {pformat(self.procedure.equipment)}") logger.debug(f"Updated equipment:\n{pformat([item.__dict__ for item in self.procedure.equipment])}")
@pyqtSlot(str, str) @pyqtSlot(str, str)
def text_changed(self, key: str, new_value: str): def text_changed(self, key: str, new_value: str):
logger.debug(f"New value for {key}: {new_value}")
match key: match key:
case "rsl_plate_num": case "rsl_plate_num":
setattr(self.procedure.run, key, new_value) setattr(self.procedure.run, key, new_value)
@@ -131,20 +139,14 @@ class ProcedureCreation(QDialog):
attribute['value'] = new_value.strip('\"') attribute['value'] = new_value.strip('\"')
case _: case _:
setattr(self.procedure, key, new_value.strip('\"')) setattr(self.procedure, key, new_value.strip('\"'))
logger.debug(f"Set value for {key}: {getattr(self.procedure, key)}")
# sys.exit()
@pyqtSlot(str, bool) @pyqtSlot(str, bool)
def check_toggle(self, key: str, ischecked: bool): def check_toggle(self, key: str, ischecked: bool):
logger.debug(f"{key} is checked: {ischecked}")
setattr(self.procedure, key, ischecked) setattr(self.procedure, key, ischecked)
@pyqtSlot(str) @pyqtSlot(str)
def update_kit(self, kittype): def update_kit(self, kittype):
self.procedure.update_kittype_reagentroles(kittype=kittype) self.procedure.update_kittype_reagentroles(kittype=kittype)
logger.debug({k: v for k, v in self.procedure.__dict__.items() if k != "plate_map"})
self.set_html() self.set_html()
@pyqtSlot(list) @pyqtSlot(list)
@@ -160,9 +162,7 @@ class ProcedureCreation(QDialog):
from backend.validators.pydant import PydReagent from backend.validators.pydant import PydReagent
expiry = datetime.datetime.strptime(expiry, "%Y-%m-%d") expiry = datetime.datetime.strptime(expiry, "%Y-%m-%d")
pyd = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) pyd = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
logger.debug(pyd)
self.procedure.reagentrole[reagentrole].insert(0, pyd) self.procedure.reagentrole[reagentrole].insert(0, pyd)
logger.debug(pformat(self.procedure.__dict__))
self.set_html() self.set_html()
@pyqtSlot(str, str) @pyqtSlot(str, str)
@@ -177,16 +177,3 @@ class ProcedureCreation(QDialog):
def return_sql(self, new: bool = False): def return_sql(self, new: bool = False):
return self.procedure.to_sql(new=new) return self.procedure.to_sql(new=new)
# class ProcedureWebViewer(QWebEngineView):
#
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
#
# def contextMenuEvent(self, event: QContextMenuEvent):
# self.menu = self.page().createStandardContextMenu()
# self.menu = self.createStandardContextMenu()
# add_sample = QAction("Add Sample")
# self.menu = QMenu()
# self.menu.addAction(add_sample)
# self.menu.popup(event.globalPos())

View File

@@ -62,7 +62,7 @@ class SubmissionDetails(QDialog):
css = f.read() css = f.read()
key = object.__class__.__name__.lower() key = object.__class__.__name__.lower()
d = {key: details} d = {key: details}
logger.debug(f"Using details: {pformat(d['procedure']['equipment'])}") # logger.debug(f"Using details: {pformat(d['procedure']['equipment'])}")
html = template.render(**d, css=[css]) html = template.render(**d, css=[css])
self.webview.setHtml(html) self.webview.setHtml(html)
self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}") self.setWindowTitle(f"{object.__class__.__name__} Details - {object.name}")

View File

@@ -372,7 +372,7 @@ class SubmissionsTree(QTreeView):
self.clear() self.clear()
self.data = [item.to_dict(full_data=True) for item in self.data = [item.to_dict(full_data=True) for item in
ClientSubmission.query(chronologic=True, page=page, page_size=page_size)] ClientSubmission.query(chronologic=True, page=page, page_size=page_size)]
logger.debug(f"setting data:\n {pformat(self.data)}") # logger.debug(f"setting data:\n {pformat(self.data)}")
# sys.exit() # sys.exit()
root = self.model.invisibleRootItem() root = self.model.invisibleRootItem()
for submission in self.data: for submission in self.data: