Compare commits

..

10 Commits

Author SHA1 Message Date
ccee4b3afe Addition of new reagent lots updated. 2025-10-03 10:19:17 -05:00
1445d2b93b Qubit sample results now written to export. 2025-10-01 15:04:02 -05:00
8fee07b0c3 Qubit sample results now written to export. 2025-09-29 12:22:50 -05:00
lwark
7f40e091fa Qubit sample results now written to export. 2025-09-23 15:06:18 -05:00
lwark
e9ff0a2774 Fixed Tips not being written during export. 2025-09-23 10:52:46 -05:00
lwark
4522f5909e Qubit results parsing complete. 2025-09-23 09:00:25 -05:00
lwark
4d70d751ca Qubit results parsing complete. 2025-09-23 08:59:04 -05:00
lwark
39d20bbc22 Qubit results parsing complete. 2025-09-23 08:57:40 -05:00
lwark
6f1202d3ba Sortable headers in treeview 2025-09-17 13:12:23 -05:00
lwark
656164d124 Sortable headers in treeview 2025-09-17 13:11:38 -05:00
32 changed files with 734 additions and 241 deletions

View File

@@ -1,6 +1,19 @@
# 202510.01
- Update for Python 3.13
# 202509.04
- Qubit results parsing complete.
# 202509.03
- Sortable headers in treeview.
- Added gitea remote.
# 202509.02 # 202509.02
- First Useable updated version. - First usable updated version.
# 202504.04 # 202504.04
@@ -8,7 +21,7 @@
# 202504.03 # 202504.03
- Split Concentration controls on the chart so they are individually selectable. - Split Concentration controls on the chart, so they are individually selectable.
# 202504.02 # 202504.02
@@ -311,7 +324,7 @@
## 202307.03 ## 202307.03
- Auto-filling of some empty cells in Excel file. - Autofilling of some empty cells in Excel file.
- Better pydantic validations of missing data. - Better pydantic validations of missing data.
## 202307.02 ## 202307.02

View File

@@ -1,5 +1,7 @@
- [ ] Add in database objects for rsl_run (submission -> run), procedure (run -> procedure), many more things will likely be associated with procedure. - [ ] Do results writing.
- [ ] Add in database object for client submission. - [ ] Allow use of multiple tips per process.
- [x] Add in database objects for rsl_run (submission -> run), procedure (run -> procedure), many more things will likely be associated with procedure.
- [x] Add in database object for client submission.
- [ ] Add arbitrary pipette addition to equipment UI. - [ ] Add arbitrary pipette addition to equipment UI.
- [ ] transfer details template rendering fully into sql objects - [ ] transfer details template rendering fully into sql objects
- [x] Add in connecting links for tips. - [x] Add in connecting links for tips.

View File

@@ -18,7 +18,7 @@ from sqlalchemy.exc import ArgumentError
from typing import Any, List, ClassVar from typing import Any, List, ClassVar
from pathlib import Path from pathlib import Path
from sqlalchemy.orm.relationships import _RelationshipDeclared from sqlalchemy.orm.relationships import _RelationshipDeclared
from tools import report_result, list_sort_dict, jinja_template_loading, Report, Result, ctx from tools import report_result, list_sort_dict, jinja_template_loading, Report, Alert, ctx
# NOTE: Load testing environment # NOTE: Load testing environment
if 'pytest' in sys.modules: if 'pytest' in sys.modules:
@@ -48,10 +48,10 @@ class BaseClass(Base):
except AttributeError: except AttributeError:
return f"<{self.__class__.__name__}(Name Unavailable)>" return f"<{self.__class__.__name__}(Name Unavailable)>"
# @classproperty
@classmethod
@declared_attr @declared_attr
def aliases(cls) -> List[str]: @classmethod
def aliases(cls):
""" """
List of other names this class might be known by. List of other names this class might be known by.
@@ -60,9 +60,9 @@ class BaseClass(Base):
""" """
return [cls.query_alias] return [cls.query_alias]
@classmethod
@declared_attr @declared_attr
def query_alias(cls) -> str: @classmethod
def query_alias(cls):
""" """
What to query this class as. What to query this class as.
@@ -71,8 +71,8 @@ class BaseClass(Base):
""" """
return cls.__name__.lower() return cls.__name__.lower()
@classmethod
@declared_attr @declared_attr
@classmethod
def __tablename__(cls) -> str: def __tablename__(cls) -> str:
""" """
Sets table name to lower case class name. Sets table name to lower case class name.
@@ -82,8 +82,8 @@ class BaseClass(Base):
""" """
return f"_{cls.__name__.lower()}" return f"_{cls.__name__.lower()}"
@classmethod
@declared_attr @declared_attr
@classmethod
def __database_session__(cls) -> Session: def __database_session__(cls) -> Session:
""" """
Pull db session from ctx to be used in operations Pull db session from ctx to be used in operations
@@ -93,8 +93,8 @@ class BaseClass(Base):
""" """
return ctx.database_session return ctx.database_session
@classmethod
@declared_attr @declared_attr
@classmethod
def __directory_path__(cls) -> Path: def __directory_path__(cls) -> Path:
""" """
Pull directory path from ctx to be used in operations. Pull directory path from ctx to be used in operations.
@@ -104,8 +104,8 @@ class BaseClass(Base):
""" """
return ctx.directory_path return ctx.directory_path
@classmethod
@declared_attr @declared_attr
@classmethod
def __backup_path__(cls) -> Path: def __backup_path__(cls) -> Path:
""" """
Pull backup directory path from ctx to be used in operations. Pull backup directory path from ctx to be used in operations.
@@ -119,10 +119,9 @@ class BaseClass(Base):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._misc_info = dict() self._misc_info = dict()
# @classproperty
@classmethod
@declared_attr @declared_attr
def jsons(cls) -> List[str]: @classmethod
def jsons(cls):
""" """
Get list of JSON db columns Get list of JSON db columns
@@ -134,10 +133,9 @@ class BaseClass(Base):
except AttributeError: except AttributeError:
return [] return []
# @classproperty
@classmethod
@declared_attr @declared_attr
def timestamps(cls) -> List[str]: @classmethod
def timestamps(cls):
""" """
Get list of TIMESTAMP columns Get list of TIMESTAMP columns
@@ -364,7 +362,7 @@ class BaseClass(Base):
logger.error(f"Error message: {type(e)}") logger.error(f"Error message: {type(e)}")
logger.error(pformat(self.__dict__)) logger.error(pformat(self.__dict__))
self.__database_session__.rollback() self.__database_session__.rollback()
report.add_result(Result(msg=e, status="Critical")) report.add_result(Alert(msg=e, status="Critical"))
return report return report
@property @property
@@ -392,10 +390,9 @@ class BaseClass(Base):
pass pass
return dicto return dicto
# @classproperty
@classmethod
@declared_attr @declared_attr
def pydantic_model(cls) -> BaseModel: @classmethod
def pydantic_model(cls):
""" """
Gets the pydantic model corresponding to this object. Gets the pydantic model corresponding to this object.
@@ -414,9 +411,9 @@ class BaseClass(Base):
return model return model
# @classproperty # @classproperty
@classmethod
@declared_attr @declared_attr
def add_edit_tooltips(cls) -> dict: @classmethod
def add_edit_tooltips(cls):
""" """
Gets tooltips for Omni-add-edit Gets tooltips for Omni-add-edit
@@ -425,10 +422,9 @@ class BaseClass(Base):
""" """
return dict() return dict()
# @classproperty
@classmethod
@declared_attr @declared_attr
def details_template(cls) -> Template: @classmethod
def details_template(cls):
""" """
Get the details jinja template for the correct class Get the details jinja template for the correct class
@@ -524,6 +520,7 @@ class BaseClass(Base):
if isinstance(field_type, InstrumentedAttribute): if isinstance(field_type, InstrumentedAttribute):
match field_type.property: match field_type.property:
case ColumnProperty(): case ColumnProperty():
return super().__setattr__(key, value) return super().__setattr__(key, value)
case _RelationshipDeclared(): case _RelationshipDeclared():
if field_type.property.uselist: if field_type.property.uselist:
@@ -655,11 +652,12 @@ class BaseClass(Base):
from backend.validators import pydant from backend.validators import pydant
if not pyd_model_name: if not pyd_model_name:
pyd_model_name = f"Pyd{self.__class__.__name__}" pyd_model_name = f"Pyd{self.__class__.__name__}"
logger.info(f"Looking for pydant model {pyd_model_name}") # logger.info(f"Looking for pydant model {pyd_model_name}")
try: try:
pyd = getattr(pydant, pyd_model_name) pyd = getattr(pydant, pyd_model_name)
except AttributeError: except AttributeError:
raise AttributeError(f"Could not get pydantic class {pyd_model_name}") raise AttributeError(f"Could not get pydantic class {pyd_model_name}")
pyd.model_rebuild()
return pyd(**self.details_dict(**kwargs)) return pyd(**self.details_dict(**kwargs))
def show_details(self, obj): def show_details(self, obj):
@@ -699,6 +697,7 @@ class ConfigItem(BaseClass):
""" """
Key:JSON objects to store config settings in database. Key:JSON objects to store config settings in database.
""" """
id = Column(INTEGER, primary_key=True) id = Column(INTEGER, primary_key=True)
key = Column(String(32)) #: Name of the configuration item. key = Column(String(32)) #: Name of the configuration item.
value = Column(JSON) #: Value associated with the config item. value = Column(JSON) #: Value associated with the config item.

View File

@@ -2,15 +2,18 @@
All kittype and reagent related models All kittype and reagent related models
""" """
from __future__ import annotations from __future__ import annotations
import sys
import zipfile, logging, re, numpy as np import zipfile, logging, re, numpy as np
from operator import itemgetter from operator import itemgetter
from pathlib import Path
from pprint import pformat from pprint import pformat
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, func from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, func
from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, validates, Query, declared_attr from sqlalchemy.orm import relationship, validates, Query, declared_attr
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from tools import check_authorization, setup_lookup, Report, Result, check_regex_match, timezone, \ from tools import check_authorization, setup_lookup, Report, Alert, check_regex_match, timezone, \
jinja_template_loading, flatten_list jinja_template_loading, flatten_list
from typing import List, Literal, Generator, Any, Tuple, TYPE_CHECKING from typing import List, Literal, Generator, Any, Tuple, TYPE_CHECKING
from . import BaseClass, ClientLab, LogMixin from . import BaseClass, ClientLab, LogMixin
@@ -343,6 +346,7 @@ class Reagent(BaseClass, LogMixin):
return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.reagentlot] return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.reagentlot]
class ReagentLot(BaseClass): class ReagentLot(BaseClass):
pyd_model_name = "Reagent" pyd_model_name = "Reagent"
@@ -442,6 +446,7 @@ class ReagentLot(BaseClass):
output['reagent'] = output['reagent'].name output['reagent'] = output['reagent'].name
return output return output
class Discount(BaseClass): class Discount(BaseClass):
""" """
Relationship table for client labs for certain kits. Relationship table for client labs for certain kits.
@@ -592,7 +597,7 @@ class SubmissionType(BaseClass):
query: Query = cls.__database_session__.query(cls) query: Query = cls.__database_session__.query(cls)
match name: match name:
case str(): case str():
logger.debug(f"querying with {name}") # logger.debug(f"querying with {name}")
query = query.filter(cls.name == name) query = query.filter(cls.name == name)
limit = 1 limit = 1
case _: case _:
@@ -926,10 +931,13 @@ class Procedure(BaseClass):
logger.info(f"Add Results! {resultstype_name}") logger.info(f"Add Results! {resultstype_name}")
from backend.managers import results from backend.managers import results
results_manager = getattr(results, f"{resultstype_name}Manager") results_manager = getattr(results, f"{resultstype_name}Manager")
rs = results_manager(procedure=self, parent=obj) rs = results_manager(procedure=self, parent=obj)#, fname=Path("C:\\Users\lwark\Documents\Submission_Forms\QubitData_18-09-2025_13-43-53.csv"))
procedure = rs.procedure_to_pydantic() procedure = rs.procedure_to_pydantic()
samples = rs.samples_to_pydantic() samples = rs.samples_to_pydantic()
if procedure:
procedure_sql = procedure.to_sql() procedure_sql = procedure.to_sql()
else:
return
procedure_sql.save() procedure_sql.save()
for sample in samples: for sample in samples:
sample_sql = sample.to_sql() sample_sql = sample.to_sql()
@@ -978,7 +986,6 @@ class Procedure(BaseClass):
output['sample'] = active_samples + inactive_samples output['sample'] = active_samples + inactive_samples
output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']] output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']]
output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']] output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']]
logger.debug(f"equipment: {pformat([item for item in output['equipment']])}")
output['repeat'] = self.repeat output['repeat'] = self.repeat
output['run'] = self.run.name output['run'] = self.run.name
output['excluded'] += self.get_default_info("details_ignore") output['excluded'] += self.get_default_info("details_ignore")
@@ -1005,7 +1012,6 @@ class Procedure(BaseClass):
output.result = [item.to_pydantic() for item in self.results] output.result = [item.to_pydantic() for item in self.results]
output.sample_results = flatten_list( output.sample_results = flatten_list(
[[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation]) [[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation])
return output return output
def create_proceduresampleassociations(self, sample): def create_proceduresampleassociations(self, sample):
@@ -1045,7 +1051,6 @@ class Procedure(BaseClass):
return html return html
class ProcedureTypeReagentRoleAssociation(BaseClass): class ProcedureTypeReagentRoleAssociation(BaseClass):
""" """
table containing reagenttype/kittype associations table containing reagenttype/kittype associations
@@ -2145,7 +2150,7 @@ class ProcedureEquipmentAssociation(BaseClass):
@property @property
def tips(self): def tips(self):
try: try:
return Tips.query(id=self.tips_id, limit=1) return TipsLot.query(id=self.tipslot_id, limit=1)
except AttributeError: except AttributeError:
return None return None
@@ -2172,7 +2177,9 @@ class ProcedureEquipmentAssociation(BaseClass):
PydEquipment: pydantic equipment model PydEquipment: pydantic equipment model
""" """
from backend.validators import PydEquipment from backend.validators import PydEquipment
return PydEquipment(**self.details_dict()) output = PydEquipment(**self.details_dict())
output.tips = self.tips.to_pydantic(pyd_model_name="PydTips")
return output
@classmethod @classmethod
@setup_lookup @setup_lookup
@@ -2229,7 +2236,7 @@ class ProcedureEquipmentAssociation(BaseClass):
output['processversion'] = None output['processversion'] = None
try: try:
output['tips'] = self.tipslot.details_dict() output['tips'] = self.tipslot.details_dict()
except AttributeError: except AttributeError as e:
output['tips'] = None output['tips'] = None
return output return output

View File

@@ -18,7 +18,8 @@ from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError from sqlalchemy.exc import OperationalError as AlcOperationalError, IntegrityError as AlcIntegrityError, StatementError
from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError from sqlite3 import OperationalError as SQLOperationalError, IntegrityError as SQLIntegrityError
from tools import setup_lookup, jinja_template_loading, create_holidays_for_year, check_dictionary_inclusion_equality, is_power_user from tools import (setup_lookup, jinja_template_loading, create_holidays_for_year,
check_dictionary_inclusion_equality, is_power_user, row_map)
from datetime import datetime, date from datetime import datetime, date
from typing import List, Literal, Generator, TYPE_CHECKING from typing import List, Literal, Generator, TYPE_CHECKING
from pathlib import Path from pathlib import Path
@@ -42,7 +43,7 @@ class ClientSubmission(BaseClass, LogMixin):
submission_category = Column(String(64)) #: i.e. Surveillance submission_category = Column(String(64)) #: i.e. Surveillance
sample_count = Column(INTEGER) #: Number of sample in the procedure sample_count = Column(INTEGER) #: Number of sample in the procedure
full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate. full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate.
comment = Column(JSON) #: comment objects from users. comments = Column(JSON) #: comment objects from users.
run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship
contact = relationship("Contact", back_populates="clientsubmission") #: contact representing submitting lab. contact = relationship("Contact", back_populates="clientsubmission") #: contact representing submitting lab.
contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL", contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL",
@@ -239,9 +240,9 @@ class ClientSubmission(BaseClass, LogMixin):
custom = None custom = None
runs = None runs = None
try: try:
comments = self.comment comments = self.comments
except Exception as e: except Exception as e:
logger.error(f"Error setting comment: {self.comment}, {e}") logger.error(f"Error setting comment: {self.comments}, {e}")
comments = None comments = None
try: try:
contact = self.contact.name contact = self.contact.name
@@ -645,7 +646,6 @@ class Run(BaseClass, LogMixin):
'permission', "clientsubmission"] 'permission', "clientsubmission"]
output['sample_count'] = self.sample_count output['sample_count'] = self.sample_count
output['clientsubmission'] = self.clientsubmission.name output['clientsubmission'] = self.clientsubmission.name
# output['clientlab'] = self.clientsubmission.clientlab
output['started_date'] = self.started_date output['started_date'] = self.started_date
output['completed_date'] = self.completed_date output['completed_date'] = self.completed_date
return output return output
@@ -1851,6 +1851,9 @@ class RunSampleAssociation(BaseClass):
class ProcedureSampleAssociation(BaseClass): class ProcedureSampleAssociation(BaseClass):
pyd_model_name = "PydSample"
id = Column(INTEGER, unique=True, nullable=False) id = Column(INTEGER, unique=True, nullable=False)
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
@@ -1865,6 +1868,16 @@ class ProcedureSampleAssociation(BaseClass):
results = relationship("Results", back_populates="sampleprocedureassociation") #: associated results results = relationship("Results", back_populates="sampleprocedureassociation") #: associated results
@property
def well(self):
if self.row > 0:
if self.column > 0:
return f"{row_map[self.row]}{self.column}"
else:
return self.row
else:
return None
@classmethod @classmethod
def query(cls, sample: Sample | str | None = None, procedure: Procedure | str | None = None, limit: int = 0, def query(cls, sample: Sample | str | None = None, procedure: Procedure | str | None = None, limit: int = 0,
**kwargs): **kwargs):
@@ -1913,16 +1926,19 @@ class ProcedureSampleAssociation(BaseClass):
# NOTE: Figure out how to merge the misc_info if doing .update instead. # NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']} relevant = {k: v for k, v in output.items() if k not in ['sample']}
output = output['sample'].details_dict() output = output['sample'].details_dict()
# logger.debug(output)
misc = output['misc_info'] misc = output['misc_info']
output.update(relevant) output.update(relevant)
output['misc_info'] = misc output['misc_info'] = misc
output['row'] = self.row output['row'] = self.row
output['column'] = self.column output['column'] = self.column
output['results'] = [result.details_dict() for result in output['results']] output['results'] = [item.details_dict() for item in self.results]
return output return output
def to_pydantic(self, **kwargs): def to_pydantic(self, **kwargs):
output = super().to_pydantic(pyd_model_name="PydSample") output = super().to_pydantic(pyd_model_name="PydSample")
# from backend.validators.pydant import PydSample
# output = PydSample(**self.details_dict(**kwargs))
try: try:
output.submission_rank = output.misc_info['submission_rank'] output.submission_rank = output.misc_info['submission_rank']
except KeyError: except KeyError:

View File

@@ -2,11 +2,13 @@
Default Parser archetypes. Default Parser archetypes.
""" """
from __future__ import annotations from __future__ import annotations
import logging, re import logging, re, csv
from pathlib import Path from pathlib import Path
from pprint import pformat
from typing import Generator, TYPE_CHECKING from typing import Generator, TYPE_CHECKING
from openpyxl.cell import MergedCell from openpyxl.cell import MergedCell
from openpyxl.reader.excel import load_workbook from openpyxl.reader.excel import load_workbook
from openpyxl.workbook import Workbook
from pandas import DataFrame from pandas import DataFrame
from backend.validators import pydant from backend.validators import pydant
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -44,6 +46,8 @@ class DefaultParser(object):
**kwargs (): **kwargs ():
""" """
logger.info(f"\n\nHello from {self.__class__.__name__}\n\n") logger.info(f"\n\nHello from {self.__class__.__name__}\n\n")
if isinstance(filepath, str):
filepath = Path(filepath)
self.filepath = filepath self.filepath = filepath
self.proceduretype = proceduretype self.proceduretype = proceduretype
try: try:
@@ -58,13 +62,27 @@ class DefaultParser(object):
self.sheet = sheet self.sheet = sheet
if not start_row: if not start_row:
start_row = self.__class__.start_row start_row = self.__class__.start_row
if self.filepath.suffix == ".xlsx":
self.workbook = load_workbook(self.filepath, data_only=True) self.workbook = load_workbook(self.filepath, data_only=True)
self.worksheet = self.workbook[self.sheet] self.worksheet = self.workbook[self.sheet]
elif self.filepath.suffix == ".csv":
self.workbook, self.worksheet = self.csv2xlsx(self.filepath)
self.start_row = self.delineate_start_row(start_row=start_row) self.start_row = self.delineate_start_row(start_row=start_row)
self.end_row = self.delineate_end_row(start_row=self.start_row) self.end_row = self.delineate_end_row(start_row=self.start_row)
@classmethod
def csv2xlsx(cls, filepath):
wb = Workbook()
ws = wb.active
with open(filepath, "r") as f:
reader = csv.reader(f, delimiter=",")
for row in reader:
ws.append(row)
return wb, ws
def to_pydantic(self): def to_pydantic(self):
data = self.parsed_info data = self.parsed_info
logger.debug(f"Data for {self.__class__.__name__}: {pformat(data)}")
data['filepath'] = self.filepath data['filepath'] = self.filepath
return self._pyd_object(**data) return self._pyd_object(**data)
@@ -85,7 +103,7 @@ class DefaultParser(object):
for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row): for iii, row in enumerate(self.worksheet.iter_rows(min_row=start_row), start=start_row):
if all([item.value is None for item in row]): if all([item.value is None for item in row]):
return iii return iii
return self.worksheet.max_row return self.worksheet.max_row + 1
class DefaultKEYVALUEParser(DefaultParser): class DefaultKEYVALUEParser(DefaultParser):

View File

@@ -3,6 +3,8 @@ Module for clientsubmission parsing
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import sys
from datetime import datetime
from pathlib import Path from pathlib import Path
from string import ascii_lowercase from string import ascii_lowercase
from typing import Generator, TYPE_CHECKING from typing import Generator, TYPE_CHECKING
@@ -135,6 +137,9 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
output['submissiontype']['value'] = self.submissiontype.name.title() output['submissiontype']['value'] = self.submissiontype.name.title()
except KeyError: except KeyError:
pass pass
if isinstance(output['submitted_date']['value'], datetime):
output['submitted_date']['value'] = output['submitted_date']['value'].date()
return output return output

View File

@@ -12,12 +12,22 @@ logger = logging.getLogger(f"submissions.{__name__}")
class DefaultResultsInfoParser(DefaultKEYVALUEParser): class DefaultResultsInfoParser(DefaultKEYVALUEParser):
pyd_name = "PydResults" pyd_name = "PydResults"
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None, def __init__(self, filepath: Path | str, results_type: str, proceduretype: "ProcedureType" | None = None,
results_type: str | None = "PCR", *args, **kwargs): *args, **kwargs):
if results_type: if results_type:
self.results_type = results_type self.results_type = results_type
try:
sheet = proceduretype.allowed_result_methods[results_type]['info']['sheet'] sheet = proceduretype.allowed_result_methods[results_type]['info']['sheet']
except KeyError:
sheet = 1
if "start_row" not in kwargs:
try:
start_row = proceduretype.allowed_result_methods[results_type]['info']['start_row'] start_row = proceduretype.allowed_result_methods[results_type]['info']['start_row']
except KeyError:
start_row = 1
else:
start_row = kwargs.pop('start_row')
# start_row = proceduretype.allowed_result_methods[results_type]['info']['start_row']
super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args, super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args,
**kwargs) **kwargs)
@@ -25,14 +35,24 @@ class DefaultResultsInfoParser(DefaultKEYVALUEParser):
class DefaultResultsSampleParser(DefaultTABLEParser): class DefaultResultsSampleParser(DefaultTABLEParser):
pyd_name = "PydResults" pyd_name = "PydResults"
def __init__(self, filepath: Path | str, proceduretype: "ProcedureType" | None = None, def __init__(self, filepath: Path | str, results_type: str, proceduretype: "ProcedureType" | None = None,
results_type: str | None = "PCR", *args, **kwargs): *args, **kwargs):
if results_type: if results_type:
self.results_type = results_type self.results_type = results_type
try:
sheet = proceduretype.allowed_result_methods[results_type]['sample']['sheet'] sheet = proceduretype.allowed_result_methods[results_type]['sample']['sheet']
except KeyError:
sheet = 1
if "start_row" not in kwargs:
try:
start_row = proceduretype.allowed_result_methods[results_type]['sample']['start_row'] start_row = proceduretype.allowed_result_methods[results_type]['sample']['start_row']
except KeyError:
start_row = 1
else:
start_row = kwargs.pop('start_row')
super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args, super().__init__(filepath=filepath, proceduretype=proceduretype, sheet=sheet, start_row=start_row, *args,
**kwargs) **kwargs)
from .pcr_results_parser import PCRInfoParser, PCRSampleParser from .pcr_results_parser import PCRInfoParser, PCRSampleParser
from .qubit_results_parser import QubitInfoParser, QubitSampleParser

View File

@@ -0,0 +1,58 @@
"""
"""
from __future__ import annotations
import logging
from csv import reader
from typing import Generator, TYPE_CHECKING
from frontend.widgets.results_sample_matcher import ResultsSampleMatcher
from backend import Procedure
from backend.db.models import ProcedureSampleAssociation
from backend.excel.parsers.results_parsers import DefaultResultsInfoParser, DefaultResultsSampleParser
from pathlib import Path
if TYPE_CHECKING:
from backend.validators.pydant import PydSample
logger = logging.getLogger(f"submissions.{__name__}")
class QubitInfoParser(DefaultResultsInfoParser):
def __init__(self, filepath: Path | str, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
def to_pydantic(self):
"""
Since there is no overview generated, return blank PydResults object.
Returns:
PydResults
"""
from backend.validators.pydant import PydResults
return None
class QubitSampleParser(DefaultResultsSampleParser):
"""Object to pull data from Design and Analysis PCR export file."""
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
self.sample_matcher()
def sample_matcher(self):
# samples = [item for item in self.procedure.proceduresampleassociation]
dlg = ResultsSampleMatcher(
parent=None,
results_var_name="original_sample_conc.",
results=self.parsed_info,
samples=self.procedure.proceduresampleassociation,
procedure=self.procedure,
results_type="Qubit"
)
if dlg.exec():
for result in dlg.output:
result.save()

View File

@@ -40,7 +40,6 @@ class DefaultWriter(object):
case x if issubclass(value.__class__, BaseClass): case x if issubclass(value.__class__, BaseClass):
value = value.name value = value.name
case x if issubclass(value.__class__, PydBaseClass): case x if issubclass(value.__class__, PydBaseClass):
logger.warning(f"PydBaseClass: {value}")
value = value.name value = value.name
case bytes() | list(): case bytes() | list():
value = None value = None
@@ -241,6 +240,7 @@ class DefaultTABLEWriter(DefaultWriter):
from .procedure_writers import ProcedureInfoWriter, ProcedureSampleWriter, ProcedureReagentWriter, ProcedureEquipmentWriter from .procedure_writers import ProcedureInfoWriter, ProcedureSampleWriter, ProcedureReagentWriter, ProcedureEquipmentWriter
from .results_writers import ( from .results_writers import (
PCRInfoWriter, PCRSampleWriter PCRInfoWriter, PCRSampleWriter,
QubitInfoWriter, QubitSampleWriter
) )
from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter

View File

@@ -15,7 +15,7 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter):
header_order = [] header_order = []
exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits', exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits',
'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent', 'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent',
'reagentrole', 'results', 'sample', 'tips', 'reagentlot'] 'reagentrole', 'results', 'sample', 'tips', 'reagentlot', 'platemap']
def __init__(self, pydant_obj, *args, **kwargs): def __init__(self, pydant_obj, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, *args, **kwargs) super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
@@ -45,7 +45,7 @@ class ProcedureReagentWriter(DefaultTABLEWriter):
class ProcedureEquipmentWriter(DefaultTABLEWriter): class ProcedureEquipmentWriter(DefaultTABLEWriter):
exclude = ['id'] exclude = ['id', "equipment_role"]
header_order = ['equipmentrole', 'name', 'asset_number', 'process', 'tips'] header_order = ['equipmentrole', 'name', 'asset_number', 'process', 'tips']
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs): def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):

View File

@@ -1 +1,32 @@
"""
"""
from openpyxl import Workbook
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from backend.db.models import ProcedureType
from tools import flatten_list
class DefaultResultsInfoWriter(DefaultKEYVALUEWriter):
pass
class DefaultResultsSampleWriter(DefaultTABLEWriter):
def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs)
self.pydant_obj = flatten_list([sample.results for sample in pydant_obj.sample])
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs) -> Workbook:
try:
self.worksheet = workbook[f"{self.proceduretype.name[:15]} Results"]
except KeyError:
self.worksheet = workbook.create_sheet(f"{self.proceduretype.name[:15]} Results")
# worksheet = workbook[f"{self.proceduretype.name[:15]} Results"]
return workbook
from .qubit_results_writer import QubitInfoWriter, QubitSampleWriter
from .pcr_results_writer import PCRInfoWriter, PCRSampleWriter from .pcr_results_writer import PCRInfoWriter, PCRSampleWriter

View File

@@ -7,14 +7,15 @@ from pprint import pformat
from typing import Generator, TYPE_CHECKING from typing import Generator, TYPE_CHECKING
from openpyxl import Workbook from openpyxl import Workbook
from openpyxl.styles import Alignment from openpyxl.styles import Alignment
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter # from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from . import DefaultResultsInfoWriter, DefaultResultsSampleWriter
from tools import flatten_list from tools import flatten_list
if TYPE_CHECKING: if TYPE_CHECKING:
from backend.db.models import ProcedureType from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
class PCRInfoWriter(DefaultKEYVALUEWriter): class PCRInfoWriter(DefaultResultsInfoWriter):
start_row = 1 start_row = 1
@@ -28,7 +29,7 @@ class PCRInfoWriter(DefaultKEYVALUEWriter):
return workbook return workbook
class PCRSampleWriter(DefaultTABLEWriter): class PCRSampleWriter(DefaultResultsSampleWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook: def write_to_workbook(self, workbook: Workbook) -> Workbook:
worksheet = workbook[f"{self.proceduretype.name} Results"] worksheet = workbook[f"{self.proceduretype.name} Results"]

View File

@@ -0,0 +1,51 @@
"""
Writers for PCR results from Qubit device
"""
from __future__ import annotations
import logging
from pprint import pformat
from openpyxl import Workbook
from openpyxl.styles import Alignment
from . import DefaultResultsInfoWriter, DefaultResultsSampleWriter
logger = logging.getLogger(f"submissions.{__name__}")
class QubitInfoWriter(DefaultResultsInfoWriter):
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
return workbook
class QubitSampleWriter(DefaultResultsSampleWriter):
def write_to_workbook(self, workbook: Workbook, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, *args, **kwargs)
header_row = self.proceduretype.allowed_result_methods['Qubit']['sample']['start_row']
for iii, header in enumerate(self.column_headers, start=1):
# logger.debug(f"Row: {header_row}, column: {iii}")
self.worksheet.cell(row=header_row, column=iii, value=header.replace("_", " ").title())
# logger.debug(f"Column headers: {self.column_headers}")
for iii, result in enumerate(self.pydant_obj, start = 1):
row = header_row + iii
for k, v in result.result.items():
try:
column = next((col[0].column for col in self.worksheet.iter_cols() if col[0].value == k.replace("_", " ").title()))
except StopIteration:
print(f"fail for {k.replace('_', ' ').title()}")
continue
# logger.debug(f"Writing to row: {row}, column {column}")
cell = self.worksheet.cell(row=row, column=column)
cell.value = v
cell.alignment = Alignment(horizontal='left')
self.worksheet = self.postwrite(self.worksheet)
return workbook
@property
def column_headers(self):
output = []
for result in self.pydant_obj:
for k, value in result.result.items():
output.append(k)
return sorted(list(set(output)))

View File

@@ -2,6 +2,7 @@
Module for manager defaults. Module for manager defaults.
""" """
import logging import logging
from pprint import pformat
from pathlib import Path from pathlib import Path
from frontend.widgets.functions import select_open_file from frontend.widgets.functions import select_open_file
from tools import get_application_from_parent from tools import get_application_from_parent
@@ -14,6 +15,7 @@ class DefaultManager(object):
def __init__(self, parent, input_object: Path | str | None = None): def __init__(self, parent, input_object: Path | str | None = None):
self.parent = parent self.parent = parent
match input_object: match input_object:
case str(): case str():
self.input_object = Path(input_object) self.input_object = Path(input_object)

View File

@@ -22,6 +22,7 @@ class DefaultProcedureManager(DefaultManager):
if isinstance(proceduretype, str): if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype) proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype self.proceduretype = proceduretype
self.procedure = input_object
super().__init__(parent=parent, input_object=input_object) super().__init__(parent=parent, input_object=input_object)
@@ -84,4 +85,8 @@ class DefaultProcedureManager(DefaultManager):
Writer = getattr(results_writers, f"{result.result_type}InfoWriter") Writer = getattr(results_writers, f"{result.result_type}InfoWriter")
res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype) res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype)
workbook = res_info_writer.write_to_workbook(workbook=workbook) workbook = res_info_writer.write_to_workbook(workbook=workbook)
for result in self.pyd.sample_results:
Writer = getattr(results_writers, f"{result.result_type}SampleWriter")
res_sample_writer = Writer(pydant_obj=self.procedure, proceduretype=self.proceduretype)
workbook = res_sample_writer.write_to_workbook(workbook=workbook)
return workbook return workbook

View File

@@ -17,15 +17,19 @@ logger = logging.getLogger(f"submission.{__name__}")
class DefaultResultsManager(DefaultManager): class DefaultResultsManager(DefaultManager):
def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None): def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None, extension: str|None="xlsx"):
self.procedure = procedure self.procedure = procedure
if not fname: if not fname:
self.fname = select_open_file(file_extension="xlsx", obj=get_application_from_parent(parent)) fname = select_open_file(file_extension=extension, obj=get_application_from_parent(parent))
elif isinstance(fname, str): elif isinstance(fname, str):
self.fname = Path(fname) fname = Path(fname)
self.fname = fname
def procedure_to_pydantic(self) -> PydResults: def procedure_to_pydantic(self) -> PydResults:
logger.debug(f"Info parser: {self.info_parser}")
info = self.info_parser.to_pydantic() info = self.info_parser.to_pydantic()
if info:
info.parent = self.procedure info.parent = self.procedure
return info return info
@@ -34,3 +38,4 @@ class DefaultResultsManager(DefaultManager):
return sample return sample
from .pcr_results_manager import PCRManager from .pcr_results_manager import PCRManager
from .qubit_results_manager import QubitManager

View File

@@ -0,0 +1,33 @@
"""
"""
from __future__ import annotations
import logging
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
from openpyxl.reader.excel import load_workbook
from backend.db.models import Procedure
from backend.excel.parsers.results_parsers.qubit_results_parser import QubitSampleParser, QubitInfoParser
from backend.excel.writers.results_writers.qubit_results_writer import QubitInfoWriter, QubitSampleWriter
from . import DefaultResultsManager
if TYPE_CHECKING:
from backend.validators.pydant import PydResults
logger = logging.getLogger(f"submissions.{__name__}")
class QubitManager(DefaultResultsManager):
def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None):
super().__init__(procedure=procedure, parent=parent, fname=fname, extension="csv")
self.parse()
def parse(self):
self.info_parser = QubitInfoParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = QubitSampleParser(filepath=self.fname, procedure=self.procedure, start_row=self.info_parser.end_row)
def write(self):
workbook = load_workbook(BytesIO(self.procedure.proceduretype.template_file))
self.sample_writer = QubitSampleWriter(pydant_obj=self.procedure.to_pydantic(), proceduretype=self.procedure.proceduretype)
workbook = self.sample_writer.write_to_workbook(workbook)
return workbook

View File

@@ -14,10 +14,12 @@ class DefaultRunManager(DefaultManager):
def write(self) -> Workbook: def write(self) -> Workbook:
from backend.managers import DefaultClientSubmissionManager, DefaultProcedureManager from backend.managers import DefaultClientSubmissionManager, DefaultProcedureManager
logger.info(f"Initializing write") logger.info(f"Initializing write")
clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype) self.clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype)
workbook = Workbook() workbook = Workbook()
workbook = clientsubmission.write(workbook=workbook) workbook = self.clientsubmission.write(workbook=workbook)
self.procedures = []
for procedure in self.pyd.procedure: for procedure in self.pyd.procedure:
procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure) procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure)
workbook: Workbook = procedure.write(workbook=workbook) workbook: Workbook = procedure.write(workbook=workbook)
self.procedures.append(procedure)
return workbook return workbook

View File

@@ -1,15 +1,18 @@
""" """
Contains all validators Contains all validators
""" """
from __future__ import annotations
import logging, re import logging, re
import sys import sys
from pathlib import Path from pathlib import Path
from openpyxl import load_workbook from openpyxl import load_workbook
from backend.db.models import Run, SubmissionType
from tools import jinja_template_loading from tools import jinja_template_loading
from jinja2 import Template from jinja2 import Template
from dateutil.parser import parse from dateutil.parser import parse
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from backend.db.models import SubmissionType
logger = logging.getLogger(f"submissions.{__name__}") logger = logging.getLogger(f"submissions.{__name__}")
@@ -27,15 +30,17 @@ class DefaultNamer(object):
class ClientSubmissionNamer(DefaultNamer): class ClientSubmissionNamer(DefaultNamer):
def __init__(self, filepath: str | Path, submissiontype: str|SubmissionType|None=None, def __init__(self, filepath: str | Path, submissiontype: str|"SubmissionType"|None=None,
data: dict | None = None, **kwargs): data: dict | None = None, **kwargs):
from backend.db.models import SubmissionType
super().__init__(filepath=filepath) super().__init__(filepath=filepath)
if not submissiontype: if not submissiontype:
submissiontype = self.retrieve_submissiontype(filepath=self.filepath) self.submissiontype = self.retrieve_submissiontype(filepath=self.filepath)
if isinstance(submissiontype, str): if isinstance(submissiontype, str):
submissiontype = SubmissionType.query(name=submissiontype) self.submissiontype = SubmissionType.query(name=submissiontype)
def retrieve_submissiontype(self, filepath: str | Path): def retrieve_submissiontype(self):
from backend.db.models import SubmissionType
# NOTE: Attempt 1, get from form properties: # NOTE: Attempt 1, get from form properties:
sub_type = self.get_subtype_from_properties() sub_type = self.get_subtype_from_properties()
if not sub_type: if not sub_type:
@@ -51,6 +56,7 @@ class ClientSubmissionNamer(DefaultNamer):
return sub_type return sub_type
def get_subtype_from_regex(self) -> SubmissionType: def get_subtype_from_regex(self) -> SubmissionType:
from backend.db.models import SubmissionType
regex = SubmissionType.regex regex = SubmissionType.regex
m = regex.search(self.filepath.__str__()) m = regex.search(self.filepath.__str__())
try: try:
@@ -64,6 +70,7 @@ class ClientSubmissionNamer(DefaultNamer):
def get_subtype_from_preparse(self) -> SubmissionType: def get_subtype_from_preparse(self) -> SubmissionType:
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser
from backend.db.models import SubmissionType
parser = ClientSubmissionInfoParser(self.filepath) parser = ClientSubmissionInfoParser(self.filepath)
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None) sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type) sub_type = SubmissionType.query(name=sub_type)
@@ -72,6 +79,7 @@ class ClientSubmissionNamer(DefaultNamer):
return sub_type return sub_type
def get_subtype_from_properties(self) -> SubmissionType: def get_subtype_from_properties(self) -> SubmissionType:
from backend.db.models import SubmissionType
wb = load_workbook(self.filepath) wb = load_workbook(self.filepath)
# NOTE: Gets first category in the metadata. # NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";") categories = wb.properties.category.split(";")
@@ -88,6 +96,7 @@ class RSLNamer(object):
""" """
def __init__(self, filename: str, submission_type: str | None = None, data: dict | None = None): def __init__(self, filename: str, submission_type: str | None = None, data: dict | None = None):
from backend.db.models import SubmissionType
# NOTE: Preferred method is path retrieval, but might also need validation for just string. # NOTE: Preferred method is path retrieval, but might also need validation for just string.
filename = Path(filename) if Path(filename).exists() else filename filename = Path(filename) if Path(filename).exists() else filename
self.submission_type = submission_type self.submission_type = submission_type
@@ -113,7 +122,7 @@ class RSLNamer(object):
Returns: Returns:
str: parsed procedure type str: parsed procedure type
""" """
from backend.db.models import SubmissionType
def st_from_path(filepath: Path) -> str: def st_from_path(filepath: Path) -> str:
""" """
Sub def to get proceduretype from a file path Sub def to get proceduretype from a file path
@@ -186,8 +195,9 @@ class RSLNamer(object):
regex (str): string to construct pattern regex (str): string to construct pattern
filename (str): string to be parsed filename (str): string to be parsed
""" """
from backend.db.models import Run
if regex is None: if regex is None:
regex = BasicRun.regex regex = Run.regex
match filename: match filename:
case Path(): case Path():
m = regex.search(filename.stem) m = regex.search(filename.stem)
@@ -215,6 +225,7 @@ class RSLNamer(object):
Returns: Returns:
str: Output filename str: Output filename
""" """
from backend.db.models import Run
if "submitted_date" in data.keys(): if "submitted_date" in data.keys():
if isinstance(data['submitted_date'], dict): if isinstance(data['submitted_date'], dict):
if data['submitted_date']['value'] is not None: if data['submitted_date']['value'] is not None:

View File

@@ -12,7 +12,7 @@ from typing import List, Tuple, Literal, Generator
from types import GeneratorType from types import GeneratorType
from . import RSLNamer from . import RSLNamer
from pathlib import Path from pathlib import Path
from tools import check_not_nan, convert_nans_to_nones, Report, Result, timezone, sort_dict_by_list, row_keys, flatten_list from tools import check_not_nan, convert_nans_to_nones, Report, Alert, timezone, sort_dict_by_list, row_keys, flatten_list
from backend.db import models from backend.db import models
from backend.db.models import * from backend.db.models import *
from sqlalchemy.orm.properties import ColumnProperty from sqlalchemy.orm.properties import ColumnProperty
@@ -37,6 +37,8 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
def prevalidate(cls, data): def prevalidate(cls, data):
sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)] sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)]
output = {} output = {}
match data:
case dict():
try: try:
items = data.items() items = data.items()
except AttributeError as e: except AttributeError as e:
@@ -48,6 +50,8 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
output[new_key] = value output[new_key] = value
else: else:
output[key] = value output[key] = value
case _:
output = data
return output return output
@model_validator(mode='after') @model_validator(mode='after')
@@ -136,6 +140,48 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
return list(set(output)) return list(set(output))
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
result: dict = Field(default={})
result_type: str = Field(default="NA")
img: None | bytes = Field(default=None)
# parent: Procedure | ProcedureSampleAssociation | None = Field(default=None)
parent: Any | None = Field(default=None)
date_analyzed: datetime | None = Field(default=None)
@field_validator("date_analyzed")
@classmethod
def set_today(cls, value):
match value:
case str():
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
case datetime():
pass
case date():
value = datetime.combine(value, datetime.max.time())
case _:
value = datetime.now()
return value
def to_sql(self):
sql, _ = Results.query_or_create(result_type=self.result_type, result=self.results)
try:
check = sql.image
except FileNotFoundError:
check = False
if not check:
sql.image = self.img
if not sql.date_analyzed:
sql.date_analyzed = self.date_analyzed
match self.parent:
case ProcedureSampleAssociation():
sql.sampleprocedureassociation = self.parent
case Procedure():
sql.procedure = self.parent
case _:
logger.error("Improper association found.")
return sql
class PydReagentLot(PydBaseClass): class PydReagentLot(PydBaseClass):
lot: str | None lot: str | None
name: str | None = Field(default=None) name: str | None = Field(default=None)
@@ -143,10 +189,11 @@ class PydReagentLot(PydBaseClass):
missing: bool = Field(default=True) missing: bool = Field(default=True)
comment: str | None = Field(default="", validate_default=True) comment: str | None = Field(default="", validate_default=True)
class PydReagent(PydBaseClass): class PydReagent(PydBaseClass):
lot: str | None # lot: str | None
reagentrole: str | None reagentrole: str | None
expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True) # expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True)
name: str | None = Field(default=None, validate_default=True) name: str | None = Field(default=None, validate_default=True)
missing: bool = Field(default=True) missing: bool = Field(default=True)
comment: str | None = Field(default="", validate_default=True) comment: str | None = Field(default="", validate_default=True)
@@ -177,47 +224,47 @@ class PydReagent(PydBaseClass):
return value return value
return value return value
@field_validator("lot", mode='before') # @field_validator("lot", mode='before')
@classmethod # @classmethod
def rescue_lot_string(cls, value): # def rescue_lot_string(cls, value):
if value is not None: # if value is not None:
return convert_nans_to_nones(str(value).strip()) # return convert_nans_to_nones(str(value).strip())
return value # return value
#
@field_validator("lot") # @field_validator("lot")
@classmethod # @classmethod
def enforce_lot_string(cls, value): # def enforce_lot_string(cls, value):
if value is not None: # if value is not None:
return value.upper().strip() # return value.upper().strip()
return value # return value
#
@field_validator("expiry", mode="before") # @field_validator("expiry", mode="before")
@classmethod # @classmethod
def enforce_date(cls, value): # def enforce_date(cls, value):
if value is not None: # if value is not None:
match value: # match value:
case int(): # case int():
return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2) # return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2)
case 'NA': # case 'NA':
return value # return value
case str(): # case str():
return parse(value) # return parse(value)
case date(): # case date():
return datetime.combine(value, datetime.max.time()) # return datetime.combine(value, datetime.max.time())
case datetime(): # case datetime():
return value # return value
case _: # case _:
return convert_nans_to_nones(str(value)) # return convert_nans_to_nones(str(value))
if value is None: # if value is None:
value = datetime.combine(date.today(), datetime.max.time()) # value = datetime.combine(date.today(), datetime.max.time())
return value # return value
#
@field_validator("expiry") # @field_validator("expiry")
@classmethod # @classmethod
def date_na(cls, value): # def date_na(cls, value):
if isinstance(value, date) and value.year == 1970: # if isinstance(value, date) and value.year == 1970:
value = "NA" # value = "NA"
return value # return value
@field_validator("name", mode="before") @field_validator("name", mode="before")
@classmethod @classmethod
@@ -227,7 +274,6 @@ class PydReagent(PydBaseClass):
else: else:
return values.data['reagentrole'].strip() return values.data['reagentrole'].strip()
def improved_dict(self) -> dict: def improved_dict(self) -> dict:
""" """
Constructs a dictionary consisting of model.fields and model.extras Constructs a dictionary consisting of model.fields and model.extras
@@ -250,15 +296,18 @@ class PydReagent(PydBaseClass):
Returns: Returns:
Tuple[Reagent, Report]: Reagent instance and result of function Tuple[Reagent, Report]: Reagent instance and result of function
""" """
from backend.db.models import ReagentLot, Reagent
report = Report() report = Report()
if self.model_extra is not None: if self.model_extra is not None:
self.__dict__.update(self.model_extra) self.__dict__.update(self.model_extra)
reagent, new = ReagentLot.query_or_create(lot=self.lot, name=self.name) reagentlot, new = ReagentLot.query_or_create(lot=self.lot, name=self.name)
if new: if new:
reagentrole = ReagentRole.query(name=self.reagentrole) reagent = Reagent.query(name=self.name, limit=1)
reagent.reagentrole = reagentrole reagentlot.reagent = reagent
reagent.expiry = self.expiry reagentlot.expiry = self.expiry
return reagent, report if isinstance(reagentlot.expiry, str):
reagentlot.expiry = datetime.combine(datetime.strptime(reagentlot.expiry, "%Y-%m-%d"), datetime.max.time())
return reagentlot, report
class PydSample(PydBaseClass): class PydSample(PydBaseClass):
@@ -267,6 +316,7 @@ class PydSample(PydBaseClass):
enabled: bool = Field(default=True) enabled: bool = Field(default=True)
row: int = Field(default=0) row: int = Field(default=0)
column: int = Field(default=0) column: int = Field(default=0)
results: List[PydResults] | PydResults = Field(default=[])
@field_validator("sample_id", mode="before") @field_validator("sample_id", mode="before")
@classmethod @classmethod
@@ -328,6 +378,7 @@ class PydTips(PydBaseClass):
Returns: Returns:
SubmissionTipsAssociation: Association between queried tips and procedure SubmissionTipsAssociation: Association between queried tips and procedure
""" """
from backend.db.models import TipsLot
report = Report() report = Report()
tips = TipsLot.query(lot=self.lot, limit=1) tips = TipsLot.query(lot=self.lot, limit=1)
return tips, report return tips, report
@@ -345,6 +396,7 @@ class PydEquipment(PydBaseClass):
@field_validator('equipmentrole', mode='before') @field_validator('equipmentrole', mode='before')
@classmethod @classmethod
def get_role_name(cls, value): def get_role_name(cls, value):
from backend.db.models import EquipmentRole
match value: match value:
case list(): case list():
value = value[0] value = value[0]
@@ -359,6 +411,7 @@ class PydEquipment(PydBaseClass):
@field_validator('processes', mode='before') @field_validator('processes', mode='before')
@classmethod @classmethod
def process_to_pydantic(cls, value, values): def process_to_pydantic(cls, value, values):
from backend.db.models import ProcessVersion, Process
if isinstance(value, GeneratorType): if isinstance(value, GeneratorType):
value = [item for item in value] value = [item for item in value]
value = convert_nans_to_nones(value) value = convert_nans_to_nones(value)
@@ -388,23 +441,29 @@ class PydEquipment(PydBaseClass):
@field_validator('tips', mode='before') @field_validator('tips', mode='before')
@classmethod @classmethod
def tips_to_pydantic(cls, value, values): def tips_to_pydantic(cls, value, values):
from backend.db.models import TipsLot
if isinstance(value, GeneratorType): if isinstance(value, GeneratorType):
value = [item for item in value] value = [item for item in value]
value = convert_nans_to_nones(value) value = convert_nans_to_nones(value)
if not value: if not value:
value = [] value = []
if isinstance(value, TipsLot): match value:
case TipsLot():
value = value.to_pydantic(pyd_model_name="PydTips") value = value.to_pydantic(pyd_model_name="PydTips")
else: case dict():
try: value = PydTips(**value)
d: Tips = next( case _:
(tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]), pass
None) # else:
if d: # try:
value = d.to_pydantic() # d: Tips = next(
except AttributeError as e: # (tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]),
logger.error(f"Process Validation error due to {e}") # None)
value = [] # if d:
# value = d.to_pydantic()
# except AttributeError as e:
# logger.error(f"Process Validation error due to {e}")
# value = []
return value return value
@report_result @report_result
@@ -419,6 +478,7 @@ class PydEquipment(PydBaseClass):
Returns: Returns:
Tuple[Equipment, RunEquipmentAssociation]: SQL objects Tuple[Equipment, RunEquipmentAssociation]: SQL objects
""" """
from backend.db.models import Equipment, ProcedureEquipmentAssociation, Process
report = Report() report = Report()
if isinstance(procedure, str): if isinstance(procedure, str):
procedure = Procedure.query(name=procedure) procedure = Procedure.query(name=procedure)
@@ -471,7 +531,6 @@ class PydEquipment(PydBaseClass):
return {k: getattr(self, k) for k in fields} return {k: getattr(self, k) for k in fields}
class PydContact(BaseModel): class PydContact(BaseModel):
name: str name: str
phone: str | None phone: str | None
@@ -633,6 +692,7 @@ class PydProcess(PydBaseClass, extra="allow"):
@report_result @report_result
def to_sql(self): def to_sql(self):
from backend.db.models import ProcessVersion
report = Report() report = Report()
name = self.name.split("-")[0] name = self.name.split("-")[0]
# NOTE: can't use query_or_create due to name not being part of ProcessVersion # NOTE: can't use query_or_create due to name not being part of ProcessVersion
@@ -678,12 +738,12 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
# NOTE: Generified objects below: # NOTE: Generified objects below:
class PydProcedure(PydBaseClass, arbitrary_types_allowed=True): class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
proceduretype: ProcedureType | None = Field(default=None) proceduretype: Any | None = Field(default=None)
run: Run | str | None = Field(default=None) run: Any | str | None = Field(default=None)
name: dict = Field(default=dict(value="NA", missing=True), validate_default=True) name: dict = Field(default=dict(value="NA", missing=True), validate_default=True)
technician: dict = Field(default=dict(value="NA", missing=True)) technician: dict = Field(default=dict(value="NA", missing=True))
repeat: bool = Field(default=False) repeat: bool = Field(default=False)
repeat_of: Procedure | None = Field(default=None) repeat_of: Any | None = Field(default=None)
plate_map: str | None = Field(default=None) plate_map: str | None = Field(default=None)
reagent: list | None = Field(default=[]) reagent: list | None = Field(default=[])
reagentrole: dict | None = Field(default={}, validate_default=True) reagentrole: dict | None = Field(default={}, validate_default=True)
@@ -872,7 +932,10 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
reg.save() reg.save()
def to_sql(self, new: bool = False): def to_sql(self, new: bool = False):
from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation from backend.db.models import (
RunSampleAssociation, ProcedureSampleAssociation, Procedure, ProcedureReagentLotAssociation,
ProcedureEquipmentAssociation
)
logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}") logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}")
if new: if new:
sql = Procedure() sql = Procedure()
@@ -995,9 +1058,11 @@ class PydClientSubmission(PydBaseClass):
def enforce_submitted_date(cls, value): def enforce_submitted_date(cls, value):
match value: match value:
case str(): case str():
value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S"), missing=False) value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S").date(), missing=False)
case date() | datetime(): case date():
value = dict(value=value, missing=False) value = dict(value=value, missing=False)
case datetime():
value = dict(value=value.date(), missing=False)
case _: case _:
pass pass
return value return value
@@ -1115,6 +1180,7 @@ class PydClientSubmission(PydBaseClass):
def to_sql(self): def to_sql(self):
sql = super().to_sql() sql = super().to_sql()
from backend.db.models import SubmissionType
assert not any([isinstance(item, PydSample) for item in sql.sample]) assert not any([isinstance(item, PydSample) for item in sql.sample])
sql.sample = [] sql.sample = []
if not sql.submissiontype: if not sql.submissiontype:
@@ -1397,14 +1463,14 @@ class PydRun(PydBaseClass): #, extra='allow'):
Converts this instance into a backend.db.models.procedure.BasicRun instance Converts this instance into a backend.db.models.procedure.BasicRun instance
Returns: Returns:
Tuple[BasicRun, Result]: BasicRun instance, result object Tuple[BasicRun, Alert]: BasicRun instance, result object
""" """
report = Report() report = Report()
dicto = self.improved_dict() dicto = self.improved_dict()
instance, result = Run.query_or_create(submissiontype=self.submission_type['value'], instance, result = Run.query_or_create(submissiontype=self.submission_type['value'],
rsl_plate_number=self.rsl_plate_number['value']) rsl_plate_number=self.rsl_plate_number['value'])
if instance is None: if instance is None:
report.add_result(Result(msg="Overwrite Cancelled.")) report.add_result(Alert(msg="Overwrite Cancelled."))
return None, report return None, report
report.add_result(result) report.add_result(result)
self.handle_duplicate_samples() self.handle_duplicate_samples()
@@ -1585,7 +1651,7 @@ class PydRun(PydBaseClass): #, extra='allow'):
expired.append(f"{reagent.role}, {reagent.lot}: {reagent.expiry.date()} + {role_eol.days}") expired.append(f"{reagent.role}, {reagent.lot}: {reagent.expiry.date()} + {role_eol.days}")
if expired: if expired:
output = '\n'.join(expired) output = '\n'.join(expired)
result = Result(status="Warning", result = Alert(status="Warning",
msg=f"The following reagents are expired:\n\n{output}" msg=f"The following reagents are expired:\n\n{output}"
) )
report.add_result(result) report.add_result(result)
@@ -1615,44 +1681,3 @@ class PydRun(PydBaseClass): #, extra='allow'):
samples.append(sample) samples.append(sample)
samples = sorted(samples, key=itemgetter("submission_rank")) samples = sorted(samples, key=itemgetter("submission_rank"))
return samples return samples
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
result: dict = Field(default={})
result_type: str = Field(default="NA")
img: None | bytes = Field(default=None)
parent: Procedure | ProcedureSampleAssociation | None = Field(default=None)
date_analyzed: datetime | None = Field(default=None)
@field_validator("date_analyzed")
@classmethod
def set_today(cls, value):
match value:
case str():
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
case datetime():
pass
case date():
value = datetime.combine(value, datetime.max.time())
case _:
value = datetime.now()
return value
def to_sql(self):
sql, _ = Results.query_or_create(result_type=self.result_type, result=self.results)
try:
check = sql.image
except FileNotFoundError:
check = False
if not check:
sql.image = self.img
if not sql.date_analyzed:
sql.date_analyzed = self.date_analyzed
match self.parent:
case ProcedureSampleAssociation():
sql.sampleprocedureassociation = self.parent
case Procedure():
sql.procedure = self.parent
case _:
logger.error("Improper association found.")
return sql

View File

@@ -5,7 +5,7 @@ from datetime import date
from PyQt6.QtCore import QSignalBlocker from PyQt6.QtCore import QSignalBlocker
from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QWidget, QGridLayout from PyQt6.QtWidgets import QWidget, QGridLayout
from tools import Report, report_result, Result from tools import Report, report_result, Alert
from .misc import StartEndDatePicker from .misc import StartEndDatePicker
from .functions import select_save_file, save_pdf from .functions import select_save_file, save_pdf
import logging import logging
@@ -42,7 +42,7 @@ class InfoPane(QWidget):
with QSignalBlocker(self.datepicker.start_date) as blocker: with QSignalBlocker(self.datepicker.start_date) as blocker:
self.datepicker.start_date.setDate(lastmonth) self.datepicker.start_date.setDate(lastmonth)
self.update_data() self.update_data()
report.add_result(Result(owner=self.__str__(), msg=msg, status="Warning")) report.add_result(Alert(owner=self.__str__(), msg=msg, status="Warning"))
return report return report
@classmethod @classmethod

View File

@@ -56,6 +56,14 @@ class ProcedureCreation(QDialog):
proceduretype_dict = self.proceduretype.details_dict() proceduretype_dict = self.proceduretype.details_dict()
# NOTE: Add --New-- as an option for reagents. # NOTE: Add --New-- as an option for reagents.
for key, value in self.procedure.reagentrole.items(): for key, value in self.procedure.reagentrole.items():
try:
check = "--New--" in [v['name'] for v in value]
except TypeError:
try:
check = "--New--" in [v.name for v in value]
except (TypeError, AttributeError):
check = True
if not check:
value.append(dict(name="--New--")) value.append(dict(name="--New--"))
if self.procedure.equipment: if self.procedure.equipment:
for equipmentrole in proceduretype_dict['equipment']: for equipmentrole in proceduretype_dict['equipment']:
@@ -72,7 +80,6 @@ class ProcedureCreation(QDialog):
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']] proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
regex = re.compile(r".*R\d$") regex = re.compile(r".*R\d$")
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))] proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
# sys.exit(f"ProcedureDict:\n{pformat(proceduretype_dict)}")
html = render_details_template( html = render_details_template(
template_name="procedure_creation", template_name="procedure_creation",
js_in=["procedure_form", "grid_drag", "context_menu"], js_in=["procedure_form", "grid_drag", "context_menu"],
@@ -82,12 +89,13 @@ class ProcedureCreation(QDialog):
plate_map=self.plate_map, plate_map=self.plate_map,
edit=self.edit edit=self.edit
) )
# with open("procedure_creation.html", "w") as f:
# f.write(html)
self.webview.setHtml(html) self.webview.setHtml(html)
@pyqtSlot(str, str, str, str) @pyqtSlot(str, str, str, str)
def update_equipment(self, equipmentrole: str, equipment: str, processversion: str, tips: str): def update_equipment(self, equipmentrole: str, equipment: str, processversion: str, tips: str):
from backend.db.models import Equipment, ProcessVersion, TipsLot from backend.db.models import Equipment, ProcessVersion, TipsLot
logger.debug(f"\n\nEquipmentRole: {equipmentrole}, Equipment: {equipment}, Process: {processversion}, Tips: {tips}\n\n")
try: try:
equipment_of_interest = next( equipment_of_interest = next(
(item for item in self.procedure.equipment if item.equipmentrole == equipmentrole)) (item for item in self.procedure.equipment if item.equipmentrole == equipmentrole))
@@ -148,9 +156,10 @@ class ProcedureCreation(QDialog):
@pyqtSlot(str, str, str, str) @pyqtSlot(str, str, str, str)
def add_new_reagent(self, reagentrole: str, name: str, lot: str, expiry: str): def add_new_reagent(self, reagentrole: str, name: str, lot: str, expiry: str):
from backend.validators.pydant import PydReagent from backend.validators.pydant import PydReagentLot
expiry = datetime.datetime.strptime(expiry, "%Y-%m-%d") expiry = datetime.datetime.strptime(expiry, "%Y-%m-%d")
pyd = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) logger.debug(f"{reagentrole}, {name}, {lot}, {expiry}")
pyd = PydReagentLot(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
self.procedure.reagentrole[reagentrole].insert(0, pyd) self.procedure.reagentrole[reagentrole].insert(0, pyd)
self.set_html() self.set_html()
@@ -162,6 +171,12 @@ class ProcedureCreation(QDialog):
return return
self.procedure.update_reagents(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry) self.procedure.update_reagents(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
@pyqtSlot(str, result=list)
def get_reagent_names(self, reagentrole_name: str):
from backend.db.models import ReagentRole
reagentrole = ReagentRole.query(name=reagentrole_name)
return [item.name for item in reagentrole.get_reagents(proceduretype=self.procedure.proceduretype)]
def return_sql(self, new: bool = False): def return_sql(self, new: bool = False):
output = self.procedure.to_sql(new=new) output = self.procedure.to_sql(new=new)
return output return output

View File

@@ -0,0 +1,96 @@
"""
"""
from __future__ import annotations
import json
import logging, sys
from pprint import pformat
from typing import List, Generator
from PyQt6.QtWidgets import (QDialog, QGridLayout, QDialogButtonBox)
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtCore import pyqtSlot
from tools import render_details_template, row_keys
from backend.db.models import Procedure, ProcedureSampleAssociation, Results
logger = logging.getLogger(f"submissions.{__name__}")
class ResultsSampleMatcher(QDialog):
def __init__(self, parent, results_var_name: str, results: Generator[dict, None, None], samples:List[str],
procedure:Procedure, results_type: str):
super().__init__(parent=parent)
self.procedure = procedure
self.results_type = results_type
self.results_var_name = results_var_name
results = [item for item in results]
html = render_details_template("results_sample_match", results=results, results_var_name=self.results_var_name, samples=samples)
self.webview = QWebEngineView()
self.layout = QGridLayout()
self.setLayout(self.layout)
self.channel = QWebChannel()
self.channel.registerObject('backend', self)
self.webview.setHtml(html)
self.webview.page().setWebChannel(self.channel)
self.layout.addWidget(self.webview)
QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
self.layout.addWidget(self.buttonBox)
self.output = []
@pyqtSlot(bool, str, str, str)
def set_match(self, enabled: bool, sample: str, result_text:str, result: str):
logger.debug(f"Sample: {sample}")
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
result = "".join([r for r in result]).replace("\'", "\"")
try:
result = json.loads(result)
except json.decoder.JSONDecoder:
logger.error("Could not decode json.")
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
if enabled:
result = Results(sampleprocedureassociation=association, result=result, result_type=self.results_type)
self.output.append(result)
else:
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
self.output.remove(result)
@pyqtSlot(str, str)
def update_match(self, sample: str, result_text: str):
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
logger.debug(association)
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
result.sampleprocedureassociation = association
logger.debug(f"Output: {pformat(self.output)}")

View File

@@ -23,7 +23,9 @@ class SubmissionsTree(QTreeView):
self.app = get_application_from_parent(parent) self.app = get_application_from_parent(parent)
self.total_count = ClientSubmission.__database_session__.query(ClientSubmission).count() self.total_count = ClientSubmission.__database_session__.query(ClientSubmission).count()
self.setExpandsOnDoubleClick(False) self.setExpandsOnDoubleClick(False)
self.model = model self.model: ClientSubmissionRunModel = model
header_labels = ["Name", "Submission Type", "Client Lab", "Submitted Date"]
self.model.setHorizontalHeaderLabels(header_labels)
self.setModel(self.model) self.setModel(self.model)
self.setSelectionBehavior(QAbstractItemView.selectionBehavior(self).SelectRows) self.setSelectionBehavior(QAbstractItemView.selectionBehavior(self).SelectRows)
self.set_data() self.set_data()
@@ -48,7 +50,8 @@ class SubmissionsTree(QTreeView):
self.setAlternatingRowColors(True) self.setAlternatingRowColors(True)
self.setIndentation(20) self.setIndentation(20)
self.setItemsExpandable(True) self.setItemsExpandable(True)
for ii in range(2): self.setSortingEnabled(True)
for ii, _ in enumerate(header_labels):
self.resizeColumnToContents(ii) self.resizeColumnToContents(ii)
def expand_item(self, event: QModelIndex): def expand_item(self, event: QModelIndex):
@@ -106,16 +109,18 @@ class SubmissionsTree(QTreeView):
sets data in model sets data in model
""" """
self.clear() self.clear()
self.data = [item.to_dict(full_data=True) for item in self.data = [item.to_dict(full_data=True) for item in ClientSubmission.query(chronologic=True, page=page, page_size=page_size)]
ClientSubmission.query(chronologic=True, page=page, page_size=page_size)]
root = self.model.invisibleRootItem() root = self.model.invisibleRootItem()
for submission in self.data: for submission in self.data:
group_str = f"{submission['submissiontype']}-{submission['submitter_plate_id']}-{submission['submitted_date']}" group_str = f"{submission['submissiontype']}-{submission['submitter_plate_id']}-{submission['submitted_date']}"
submission_item = self.model.add_child(parent=root, child=dict( submission_item: QStandardItem = self.model.add_child(parent=root, child=dict(
name=group_str, name=group_str,
client=submission['clientlab'],
date=submission['submitted_date'],
type=submission['submissiontype'],
query_str=submission['submitter_plate_id'], query_str=submission['submitter_plate_id'],
item_type=ClientSubmission item_type=ClientSubmission
)) ), additions=True)
for run in submission['run']: for run in submission['run']:
run_item = self.model.add_child(parent=submission_item, child=dict( run_item = self.model.add_child(parent=submission_item, child=dict(
name=run['plate_number'], name=run['plate_number'],
@@ -156,10 +161,19 @@ class SubmissionsTree(QTreeView):
class ClientSubmissionRunModel(QStandardItemModel): class ClientSubmissionRunModel(QStandardItemModel):
def add_child(self, parent: QStandardItem, child:dict): def __init__(self, parent):
super().__init__(parent)
def add_child(self, parent: QStandardItem, child:dict, additions:bool=False) -> QStandardItem:
item = QStandardItem(child['name']) item = QStandardItem(child['name'])
item.setData(dict(item_type=child['item_type'], query_str=child['query_str']), 1) item.setData(dict(item_type=child['item_type'], query_str=child['query_str']), 1)
parent.appendRow(item) if additions:
item_client = QStandardItem(child['client'])
item_date = QStandardItem(child['date'])
item_type = QStandardItem(child['type'])
parent.appendRow([item, item_type, item_client, item_date])
else:
parent.appendRow([item])
item.setEditable(False) item.setEditable(False)
return item return item

View File

@@ -9,7 +9,7 @@ from PyQt6.QtWidgets import (
from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker from PyQt6.QtCore import pyqtSignal, Qt, QSignalBlocker
from .functions import select_open_file, select_save_file from .functions import select_open_file, select_save_file
from pathlib import Path from pathlib import Path
from tools import Report, Result, check_not_nan, main_form_style, report_result, get_application_from_parent from tools import Report, Alert, check_not_nan, main_form_style, report_result, get_application_from_parent
from backend.validators import PydReagent, PydClientSubmission, PydSample from backend.validators import PydReagent, PydClientSubmission, PydSample
from backend.db.models import ( from backend.db.models import (
ClientLab, SubmissionType, Reagent, ReagentLot, ClientLab, SubmissionType, Reagent, ReagentLot,
@@ -116,7 +116,7 @@ class SubmissionFormContainer(QWidget):
if isinstance(fname, bool) or fname is None: if isinstance(fname, bool) or fname is None:
fname = select_open_file(self, file_extension="xlsx") fname = select_open_file(self, file_extension="xlsx")
if not fname: if not fname:
report.add_result(Result(msg=f"File {fname.__str__()} not found.", status="critical")) report.add_result(Alert(msg=f"File {fname.__str__()} not found.", status="critical"))
return report return report
# NOTE: create sheetparser using excel sheet and context from gui # NOTE: create sheetparser using excel sheet and context from gui
self.clientsubmission_manager = DefaultClientSubmissionManager(parent=self, input_object=fname) self.clientsubmission_manager = DefaultClientSubmissionManager(parent=self, input_object=fname)
@@ -133,7 +133,7 @@ class SubmissionFormContainer(QWidget):
else: else:
message = "Submission cancelled." message = "Submission cancelled."
logger.warning(message) logger.warning(message)
report.add_result(Result(msg=message, owner=self.__class__.__name__, status="Warning")) report.add_result(Alert(msg=message, owner=self.__class__.__name__, status="Warning"))
return report return report
@report_result @report_result
@@ -157,7 +157,7 @@ class SubmissionFormContainer(QWidget):
# NOTE: send reagent to db # NOTE: send reagent to db
sqlobj = reagent.to_sql() sqlobj = reagent.to_sql()
sqlobj.save() sqlobj.save()
report.add_result(Result(owner=__name__, code=0, msg="New reagent created.", status="Information")) report.add_result(Alert(owner=__name__, code=0, msg="New reagent created.", status="Information"))
return reagent, report return reagent, report
@@ -386,7 +386,7 @@ class SubmissionFormWidget(QWidget):
if reagent is not None: if reagent is not None:
reagents.append(reagent) reagents.append(reagent)
else: else:
report.add_result(Result(msg="Failed integrity check", status="Critical")) report.add_result(Alert(msg="Failed integrity check", status="Critical"))
return report return report
case self.InfoItem(): case self.InfoItem():
field, value = widget.parse_form() field, value = widget.parse_form()
@@ -779,7 +779,7 @@ class ClientSubmissionFormWidget(SubmissionFormWidget):
if reagent is not None: if reagent is not None:
reagents.append(reagent) reagents.append(reagent)
else: else:
report.add_result(Result(msg="Failed integrity check", status="Critical")) report.add_result(Alert(msg="Failed integrity check", status="Critical"))
return report return report
case self.InfoItem(): case self.InfoItem():
field, value = widget.parse_form() field, value = widget.parse_form()

View File

@@ -104,8 +104,6 @@ div.gallery {
padding: 5px; padding: 5px;
} }
.plate { .plate {
display: inline-grid; display: inline-grid;
grid-auto-flow: column; grid-auto-flow: column;
@@ -189,3 +187,9 @@ ul.no-bullets {
display: grid; display: grid;
grid-auto-flow: column; grid-auto-flow: column;
} }
.disable_section {
pointer-events: none;
opacity: 0.4;
}

View File

@@ -25,7 +25,7 @@
{% block script %} {% block script %}
{% if not child %} {% if not child %}
{% for j in js%} {% for j in js %}
<script> <script>
{{ j }} {{ j }}

View File

@@ -194,7 +194,7 @@ function contextListener() {
function clickListener() { function clickListener() {
document.addEventListener( "click", function(e) { document.addEventListener( "click", function(e) {
var clickeElIsLink = clickInsideElement( e, contextMenuLinkClassName ); var clickeElIsLink = clickInsideElement( e, contextMenuLinkClassName );
backend.log(e.target.id)
if ( clickeElIsLink ) { if ( clickeElIsLink ) {
e.preventDefault(); e.preventDefault();
menuItemListener( clickeElIsLink ); menuItemListener( clickeElIsLink );

View File

@@ -42,7 +42,7 @@ var changed_it = new Event('change');
var reagentRoles = document.getElementsByClassName("reagentrole"); var reagentRoles = document.getElementsByClassName("reagentrole");
for(let i = 0; i < reagentRoles.length; i++) { for(let i = 0; i < reagentRoles.length; i++) {
reagentRoles[i].addEventListener("change", function() { reagentRoles[i].addEventListener("change", async function() {
if (reagentRoles[i].value.includes("--New--")) { if (reagentRoles[i].value.includes("--New--")) {
// alert("Create new reagent.") // alert("Create new reagent.")
var br = document.createElement("br"); var br = document.createElement("br");
@@ -50,9 +50,15 @@ for(let i = 0; i < reagentRoles.length; i++) {
var new_form = document.createElement("form"); var new_form = document.createElement("form");
new_form.setAttribute("class", "new_reagent_form") new_form.setAttribute("class", "new_reagent_form")
new_form.setAttribute("id", reagentRoles[i].id + "_addition") new_form.setAttribute("id", reagentRoles[i].id + "_addition")
var rr_name = document.createElement("input"); var rr_name = document.createElement("select");
rr_name.setAttribute("type", "text");
rr_name.setAttribute("id", "new_" + reagentRoles[i].id + "_name"); rr_name.setAttribute("id", "new_" + reagentRoles[i].id + "_name");
var rr_options = await backend.get_reagent_names(reagentRoles[i].id).then(
function(result) {
result.forEach( function(item) {
rr_name.options.add( new Option(item));
});
}
);
var rr_name_label = document.createElement("label"); var rr_name_label = document.createElement("label");
rr_name_label.setAttribute("for", "new_" + reagentRoles[i].id + "_name"); rr_name_label.setAttribute("for", "new_" + reagentRoles[i].id + "_name");
rr_name_label.innerHTML = "Name:"; rr_name_label.innerHTML = "Name:";

View File

@@ -0,0 +1,54 @@
{% extends "details.html" %}
{% block head %}
{{ super() }}
<title>Matching results</title>
{% endblock %}
{% block body %}
{% for result in results %}
<div class="resultholder" style="border-style: solid; border-width: 2px" data="{{ result }}">
<input type="checkbox" id="{{ loop.index }}_check" class="checker">&nbsp;&nbsp;
<span id="{{ loop.index }}_var", class="variable" data-value="{{ result }}">{{ result[results_var_name] }}</span>&nbsp;&nbsp;
<select id="{{ loop.index }}_select" class="selecter" disabled>
{% for sample in samples %}
{% if sample.well %}
<option value="{{ sample.sample.sample_id }}:{{ sample.well }}">{{ sample.sample.sample_id }}:{{ sample.well }}</option>
{% else %}
<option value="{{ sample.sample.sample_id }}">{{ sample.sample.sample_id }}</option>
{% endif %}
{% endfor %}
</select>
</div>
{% endfor %}
{% endblock %}
{% block script %}
<script>
var holders = document.getElementsByClassName("resultholder");
for(let i = 0; i < holders.length; i++) {
console.log(i);
holders[i].getElementsByClassName("checker")[0].addEventListener("change", function(){
if ( this.checked ) {
holders[i].getElementsByClassName("selecter")[0].disabled = false;
} else {
holders[i].getElementsByClassName("selecter")[0].disabled = true;
}
var enabled = this.checked;
var sample = holders[i].getElementsByClassName("selecter")[0].value;
var result = holders[i].getElementsByClassName("variable")[0].dataset.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.set_match(enabled, sample, result_text, result);
});
holders[i].getElementsByClassName("selecter")[0].addEventListener("change", function(){
var sample = this.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.update_match(sample, result_text);
});
}
</script>
{{ super() }}
{% endblock %}

View File

@@ -458,7 +458,6 @@ def render_details_template(template_name: str, css_in: List[str] | str = [], js
js_in = ["details"] + js_in js_in = ["details"] + js_in
js_in = [html_folder.joinpath("js", f"{j}.js") for j in js_in] js_in = [html_folder.joinpath("js", f"{j}.js") for j in js_in]
template = env.get_template(f"{template_name}.html") template = env.get_template(f"{template_name}.html")
# template_path = Path(template.environment.loader.__getattribute__("searchpath")[0])
css_out = [] css_out = []
for css in css_in: for css in css_in:
with open(css, "r") as f: with open(css, "r") as f:
@@ -645,7 +644,7 @@ def get_application_from_parent(widget):
return widget return widget
class Result(BaseModel, arbitrary_types_allowed=True): class Alert(BaseModel, arbitrary_types_allowed=True):
owner: str = Field(default="", validate_default=True) owner: str = Field(default="", validate_default=True)
code: int = Field(default=0) code: int = Field(default=0)
msg: str | Exception msg: str | Exception
@@ -704,7 +703,7 @@ class Result(BaseModel, arbitrary_types_allowed=True):
class Report(BaseModel): class Report(BaseModel):
results: List[Result] = Field(default=[]) results: List[Alert] = Field(default=[])
def __repr__(self): def __repr__(self):
return f"<Report(result_count:{len(self.results)})>" return f"<Report(result_count:{len(self.results)})>"
@@ -717,10 +716,10 @@ class Report(BaseModel):
Takes a result object or all results in another report and adds them to this one. Takes a result object or all results in another report and adds them to this one.
Args: Args:
result (Result | Report | None): Results to be added. result (Alert | Report | None): Results to be added.
""" """
match result: match result:
case Result(): case Alert():
logger.info(f"Adding {result} to results.") logger.info(f"Adding {result} to results.")
try: try:
self.results.append(result) self.results.append(result)
@@ -783,7 +782,8 @@ def yaml_regex_creator(loader, node):
nodes = loader.construct_sequence(node) nodes = loader.construct_sequence(node)
name = nodes[0].replace(" ", "_") name = nodes[0].replace(" ", "_")
abbr = nodes[1] abbr = nodes[1]
return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)" # return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)"
return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)?\\d?([^_0123456789\\sA-QS-Z]|$)?R?\\d?)?)"
def super_splitter(ins_str: str, substring: str, idx: int) -> str: def super_splitter(ins_str: str, substring: str, idx: int) -> str:
@@ -853,7 +853,7 @@ def check_authorization(func):
logger.error(error_msg) logger.error(error_msg)
report = Report() report = Report()
report.add_result( report.add_result(
Result(owner=func.__str__(), code=1, msg=error_msg, status="warning")) Alert(owner=func.__str__(), code=1, msg=error_msg, status="warning"))
return report, kwargs return report, kwargs
return wrapper return wrapper
@@ -877,7 +877,7 @@ def under_development(func):
logger.error(error_msg) logger.error(error_msg)
report = Report() report = Report()
report.add_result( report.add_result(
Result(owner=func.__str__(), code=1, msg=error_msg, Alert(owner=func.__str__(), code=1, msg=error_msg,
status="warning")) status="warning"))
return report return report
return wrapper return wrapper