Compare commits

..

6 Commits

Author SHA1 Message Date
ccee4b3afe Addition of new reagent lots updated. 2025-10-03 10:19:17 -05:00
1445d2b93b Qubit sample results now written to export. 2025-10-01 15:04:02 -05:00
8fee07b0c3 Qubit sample results now written to export. 2025-09-29 12:22:50 -05:00
lwark
7f40e091fa Qubit sample results now written to export. 2025-09-23 15:06:18 -05:00
lwark
e9ff0a2774 Fixed Tips not being written during export. 2025-09-23 10:52:46 -05:00
lwark
4522f5909e Qubit results parsing complete. 2025-09-23 09:00:25 -05:00
25 changed files with 593 additions and 183 deletions

View File

@@ -1,3 +1,7 @@
# 202510.01
- Update for Python 3.13
# 202509.04
- Qubit results parsing complete.

View File

@@ -1,3 +1,5 @@
- [ ] Do results writing.
- [ ] Allow use of multiple tips per process.
- [x] Add in database objects for rsl_run (submission -> run), procedure (run -> procedure), many more things will likely be associated with procedure.
- [x] Add in database object for client submission.
- [ ] Add arbitrary pipette addition to equipment UI.

View File

@@ -48,10 +48,10 @@ class BaseClass(Base):
except AttributeError:
return f"<{self.__class__.__name__}(Name Unavailable)>"
# @classproperty
@classmethod
@declared_attr
def aliases(cls) -> List[str]:
@classmethod
def aliases(cls):
"""
List of other names this class might be known by.
@@ -60,9 +60,9 @@ class BaseClass(Base):
"""
return [cls.query_alias]
@classmethod
@declared_attr
def query_alias(cls) -> str:
@classmethod
def query_alias(cls):
"""
What to query this class as.
@@ -71,8 +71,8 @@ class BaseClass(Base):
"""
return cls.__name__.lower()
@classmethod
@declared_attr
@classmethod
def __tablename__(cls) -> str:
"""
Sets table name to lower case class name.
@@ -82,8 +82,8 @@ class BaseClass(Base):
"""
return f"_{cls.__name__.lower()}"
@classmethod
@declared_attr
@classmethod
def __database_session__(cls) -> Session:
"""
Pull db session from ctx to be used in operations
@@ -93,8 +93,8 @@ class BaseClass(Base):
"""
return ctx.database_session
@classmethod
@declared_attr
@classmethod
def __directory_path__(cls) -> Path:
"""
Pull directory path from ctx to be used in operations.
@@ -104,8 +104,8 @@ class BaseClass(Base):
"""
return ctx.directory_path
@classmethod
@declared_attr
@classmethod
def __backup_path__(cls) -> Path:
"""
Pull backup directory path from ctx to be used in operations.
@@ -119,10 +119,9 @@ class BaseClass(Base):
super().__init__(*args, **kwargs)
self._misc_info = dict()
# @classproperty
@classmethod
@declared_attr
def jsons(cls) -> List[str]:
@classmethod
def jsons(cls):
"""
Get list of JSON db columns
@@ -134,10 +133,9 @@ class BaseClass(Base):
except AttributeError:
return []
# @classproperty
@classmethod
@declared_attr
def timestamps(cls) -> List[str]:
@classmethod
def timestamps(cls):
"""
Get list of TIMESTAMP columns
@@ -392,10 +390,9 @@ class BaseClass(Base):
pass
return dicto
# @classproperty
@classmethod
@declared_attr
def pydantic_model(cls) -> BaseModel:
@classmethod
def pydantic_model(cls):
"""
Gets the pydantic model corresponding to this object.
@@ -414,9 +411,9 @@ class BaseClass(Base):
return model
# @classproperty
@classmethod
@declared_attr
def add_edit_tooltips(cls) -> dict:
@classmethod
def add_edit_tooltips(cls):
"""
Gets tooltips for Omni-add-edit
@@ -425,10 +422,9 @@ class BaseClass(Base):
"""
return dict()
# @classproperty
@classmethod
@declared_attr
def details_template(cls) -> Template:
@classmethod
def details_template(cls):
"""
Get the details jinja template for the correct class
@@ -524,6 +520,7 @@ class BaseClass(Base):
if isinstance(field_type, InstrumentedAttribute):
match field_type.property:
case ColumnProperty():
return super().__setattr__(key, value)
case _RelationshipDeclared():
if field_type.property.uselist:
@@ -660,6 +657,7 @@ class BaseClass(Base):
pyd = getattr(pydant, pyd_model_name)
except AttributeError:
raise AttributeError(f"Could not get pydantic class {pyd_model_name}")
pyd.model_rebuild()
return pyd(**self.details_dict(**kwargs))
def show_details(self, obj):
@@ -699,6 +697,7 @@ class ConfigItem(BaseClass):
"""
Key:JSON objects to store config settings in database.
"""
id = Column(INTEGER, primary_key=True)
key = Column(String(32)) #: Name of the configuration item.
value = Column(JSON) #: Value associated with the config item.

View File

@@ -2,6 +2,8 @@
All kittype and reagent related models
"""
from __future__ import annotations
import sys
import zipfile, logging, re, numpy as np
from operator import itemgetter
from pathlib import Path
@@ -344,6 +346,7 @@ class Reagent(BaseClass, LogMixin):
return [dict(name=self.name, lot=lot.lot, expiry=lot.expiry + self.eol_ext) for lot in self.reagentlot]
class ReagentLot(BaseClass):
pyd_model_name = "Reagent"
@@ -443,6 +446,7 @@ class ReagentLot(BaseClass):
output['reagent'] = output['reagent'].name
return output
class Discount(BaseClass):
"""
Relationship table for client labs for certain kits.
@@ -593,7 +597,7 @@ class SubmissionType(BaseClass):
query: Query = cls.__database_session__.query(cls)
match name:
case str():
logger.debug(f"querying with {name}")
# logger.debug(f"querying with {name}")
query = query.filter(cls.name == name)
limit = 1
case _:
@@ -927,7 +931,7 @@ class Procedure(BaseClass):
logger.info(f"Add Results! {resultstype_name}")
from backend.managers import results
results_manager = getattr(results, f"{resultstype_name}Manager")
rs = results_manager(procedure=self, parent=obj, fname=Path("C:\\Users\lwark\Documents\Submission_Forms\QubitData_18-09-2025_13-43-53.csv"))
rs = results_manager(procedure=self, parent=obj)#, fname=Path("C:\\Users\lwark\Documents\Submission_Forms\QubitData_18-09-2025_13-43-53.csv"))
procedure = rs.procedure_to_pydantic()
samples = rs.samples_to_pydantic()
if procedure:
@@ -982,7 +986,6 @@ class Procedure(BaseClass):
output['sample'] = active_samples + inactive_samples
output['reagent'] = [reagent.details_dict() for reagent in output['procedurereagentlotassociation']]
output['equipment'] = [equipment.details_dict() for equipment in output['procedureequipmentassociation']]
# logger.debug(f"equipment: {pformat([item for item in output['equipment']])}")
output['repeat'] = self.repeat
output['run'] = self.run.name
output['excluded'] += self.get_default_info("details_ignore")
@@ -1009,7 +1012,6 @@ class Procedure(BaseClass):
output.result = [item.to_pydantic() for item in self.results]
output.sample_results = flatten_list(
[[result.to_pydantic() for result in item.results] for item in self.proceduresampleassociation])
return output
def create_proceduresampleassociations(self, sample):
@@ -2148,7 +2150,7 @@ class ProcedureEquipmentAssociation(BaseClass):
@property
def tips(self):
try:
return Tips.query(id=self.tips_id, limit=1)
return TipsLot.query(id=self.tipslot_id, limit=1)
except AttributeError:
return None
@@ -2175,7 +2177,9 @@ class ProcedureEquipmentAssociation(BaseClass):
PydEquipment: pydantic equipment model
"""
from backend.validators import PydEquipment
return PydEquipment(**self.details_dict())
output = PydEquipment(**self.details_dict())
output.tips = self.tips.to_pydantic(pyd_model_name="PydTips")
return output
@classmethod
@setup_lookup
@@ -2232,7 +2236,7 @@ class ProcedureEquipmentAssociation(BaseClass):
output['processversion'] = None
try:
output['tips'] = self.tipslot.details_dict()
except AttributeError:
except AttributeError as e:
output['tips'] = None
return output

View File

@@ -43,7 +43,7 @@ class ClientSubmission(BaseClass, LogMixin):
submission_category = Column(String(64)) #: i.e. Surveillance
sample_count = Column(INTEGER) #: Number of sample in the procedure
full_batch_size = Column(INTEGER) #: Number of wells in provided plate. 0 if no plate.
comment = Column(JSON) #: comment objects from users.
comments = Column(JSON) #: comment objects from users.
run = relationship("Run", back_populates="clientsubmission") #: many-to-one relationship
contact = relationship("Contact", back_populates="clientsubmission") #: contact representing submitting lab.
contact_id = Column(INTEGER, ForeignKey("_contact.id", ondelete="SET NULL",
@@ -240,9 +240,9 @@ class ClientSubmission(BaseClass, LogMixin):
custom = None
runs = None
try:
comments = self.comment
comments = self.comments
except Exception as e:
logger.error(f"Error setting comment: {self.comment}, {e}")
logger.error(f"Error setting comment: {self.comments}, {e}")
comments = None
try:
contact = self.contact.name
@@ -646,7 +646,6 @@ class Run(BaseClass, LogMixin):
'permission', "clientsubmission"]
output['sample_count'] = self.sample_count
output['clientsubmission'] = self.clientsubmission.name
# output['clientlab'] = self.clientsubmission.clientlab
output['started_date'] = self.started_date
output['completed_date'] = self.completed_date
return output
@@ -1852,6 +1851,9 @@ class RunSampleAssociation(BaseClass):
class ProcedureSampleAssociation(BaseClass):
pyd_model_name = "PydSample"
id = Column(INTEGER, unique=True, nullable=False)
procedure_id = Column(INTEGER, ForeignKey("_procedure.id"), primary_key=True) #: id of associated procedure
sample_id = Column(INTEGER, ForeignKey("_sample.id"), primary_key=True) #: id of associated equipment
@@ -1924,16 +1926,19 @@ class ProcedureSampleAssociation(BaseClass):
# NOTE: Figure out how to merge the misc_info if doing .update instead.
relevant = {k: v for k, v in output.items() if k not in ['sample']}
output = output['sample'].details_dict()
# logger.debug(output)
misc = output['misc_info']
output.update(relevant)
output['misc_info'] = misc
output['row'] = self.row
output['column'] = self.column
output['results'] = [result.details_dict() for result in output['results']]
output['results'] = [item.details_dict() for item in self.results]
return output
def to_pydantic(self, **kwargs):
output = super().to_pydantic(pyd_model_name="PydSample")
# from backend.validators.pydant import PydSample
# output = PydSample(**self.details_dict(**kwargs))
try:
output.submission_rank = output.misc_info['submission_rank']
except KeyError:

View File

@@ -62,7 +62,7 @@ class DefaultParser(object):
self.sheet = sheet
if not start_row:
start_row = self.__class__.start_row
if self.filepath.suffix == ".xslx":
if self.filepath.suffix == ".xlsx":
self.workbook = load_workbook(self.filepath, data_only=True)
self.worksheet = self.workbook[self.sheet]
elif self.filepath.suffix == ".csv":

View File

@@ -3,6 +3,8 @@ Module for clientsubmission parsing
"""
from __future__ import annotations
import logging
import sys
from datetime import datetime
from pathlib import Path
from string import ascii_lowercase
from typing import Generator, TYPE_CHECKING
@@ -135,6 +137,9 @@ class ClientSubmissionInfoParser(DefaultKEYVALUEParser, SubmissionTyperMixin):
output['submissiontype']['value'] = self.submissiontype.name.title()
except KeyError:
pass
if isinstance(output['submitted_date']['value'], datetime):
output['submitted_date']['value'] = output['submitted_date']['value'].date()
return output

View File

@@ -0,0 +1,58 @@
"""
"""
from __future__ import annotations
import logging
from csv import reader
from typing import Generator, TYPE_CHECKING
from frontend.widgets.results_sample_matcher import ResultsSampleMatcher
from backend import Procedure
from backend.db.models import ProcedureSampleAssociation
from backend.excel.parsers.results_parsers import DefaultResultsInfoParser, DefaultResultsSampleParser
from pathlib import Path
if TYPE_CHECKING:
from backend.validators.pydant import PydSample
logger = logging.getLogger(f"submissions.{__name__}")
class QubitInfoParser(DefaultResultsInfoParser):
def __init__(self, filepath: Path | str, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
def to_pydantic(self):
"""
Since there is no overview generated, return blank PydResults object.
Returns:
PydResults
"""
from backend.validators.pydant import PydResults
return None
class QubitSampleParser(DefaultResultsSampleParser):
"""Object to pull data from Design and Analysis PCR export file."""
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
self.sample_matcher()
def sample_matcher(self):
# samples = [item for item in self.procedure.proceduresampleassociation]
dlg = ResultsSampleMatcher(
parent=None,
results_var_name="original_sample_conc.",
results=self.parsed_info,
samples=self.procedure.proceduresampleassociation,
procedure=self.procedure,
results_type="Qubit"
)
if dlg.exec():
for result in dlg.output:
result.save()

View File

@@ -40,7 +40,6 @@ class DefaultWriter(object):
case x if issubclass(value.__class__, BaseClass):
value = value.name
case x if issubclass(value.__class__, PydBaseClass):
logger.warning(f"PydBaseClass: {value}")
value = value.name
case bytes() | list():
value = None
@@ -241,6 +240,7 @@ class DefaultTABLEWriter(DefaultWriter):
from .procedure_writers import ProcedureInfoWriter, ProcedureSampleWriter, ProcedureReagentWriter, ProcedureEquipmentWriter
from .results_writers import (
PCRInfoWriter, PCRSampleWriter
PCRInfoWriter, PCRSampleWriter,
QubitInfoWriter, QubitSampleWriter
)
from .clientsubmission_writer import ClientSubmissionInfoWriter, ClientSubmissionSampleWriter

View File

@@ -15,7 +15,7 @@ class ProcedureInfoWriter(DefaultKEYVALUEWriter):
header_order = []
exclude = ['control', 'equipment', 'excluded', 'id', 'misc_info', 'plate_map', 'possible_kits',
'procedureequipmentassociation', 'procedurereagentassociation', 'proceduresampleassociation', 'proceduretipsassociation', 'reagent',
'reagentrole', 'results', 'sample', 'tips', 'reagentlot']
'reagentrole', 'results', 'sample', 'tips', 'reagentlot', 'platemap']
def __init__(self, pydant_obj, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, *args, **kwargs)
@@ -45,7 +45,7 @@ class ProcedureReagentWriter(DefaultTABLEWriter):
class ProcedureEquipmentWriter(DefaultTABLEWriter):
exclude = ['id']
exclude = ['id', "equipment_role"]
header_order = ['equipmentrole', 'name', 'asset_number', 'process', 'tips']
def __init__(self, pydant_obj, range_dict: dict | None = None, *args, **kwargs):

View File

@@ -1 +1,32 @@
"""
"""
from openpyxl import Workbook
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from backend.db.models import ProcedureType
from tools import flatten_list
class DefaultResultsInfoWriter(DefaultKEYVALUEWriter):
pass
class DefaultResultsSampleWriter(DefaultTABLEWriter):
def __init__(self, pydant_obj, proceduretype: ProcedureType | None = None, *args, **kwargs):
super().__init__(pydant_obj=pydant_obj, proceduretype=proceduretype, *args, **kwargs)
self.pydant_obj = flatten_list([sample.results for sample in pydant_obj.sample])
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int | None = None, *args, **kwargs) -> Workbook:
try:
self.worksheet = workbook[f"{self.proceduretype.name[:15]} Results"]
except KeyError:
self.worksheet = workbook.create_sheet(f"{self.proceduretype.name[:15]} Results")
# worksheet = workbook[f"{self.proceduretype.name[:15]} Results"]
return workbook
from .qubit_results_writer import QubitInfoWriter, QubitSampleWriter
from .pcr_results_writer import PCRInfoWriter, PCRSampleWriter

View File

@@ -7,14 +7,15 @@ from pprint import pformat
from typing import Generator, TYPE_CHECKING
from openpyxl import Workbook
from openpyxl.styles import Alignment
from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
# from backend.excel.writers import DefaultKEYVALUEWriter, DefaultTABLEWriter
from . import DefaultResultsInfoWriter, DefaultResultsSampleWriter
from tools import flatten_list
if TYPE_CHECKING:
from backend.db.models import ProcedureType
logger = logging.getLogger(f"submissions.{__name__}")
class PCRInfoWriter(DefaultKEYVALUEWriter):
class PCRInfoWriter(DefaultResultsInfoWriter):
start_row = 1
@@ -28,7 +29,7 @@ class PCRInfoWriter(DefaultKEYVALUEWriter):
return workbook
class PCRSampleWriter(DefaultTABLEWriter):
class PCRSampleWriter(DefaultResultsSampleWriter):
def write_to_workbook(self, workbook: Workbook) -> Workbook:
worksheet = workbook[f"{self.proceduretype.name} Results"]

View File

@@ -0,0 +1,51 @@
"""
Writers for PCR results from Qubit device
"""
from __future__ import annotations
import logging
from pprint import pformat
from openpyxl import Workbook
from openpyxl.styles import Alignment
from . import DefaultResultsInfoWriter, DefaultResultsSampleWriter
logger = logging.getLogger(f"submissions.{__name__}")
class QubitInfoWriter(DefaultResultsInfoWriter):
def write_to_workbook(self, workbook: Workbook, sheet: str | None = None,
start_row: int = 1, *args, **kwargs) -> Workbook:
return workbook
class QubitSampleWriter(DefaultResultsSampleWriter):
def write_to_workbook(self, workbook: Workbook, *args, **kwargs) -> Workbook:
workbook = super().write_to_workbook(workbook=workbook, *args, **kwargs)
header_row = self.proceduretype.allowed_result_methods['Qubit']['sample']['start_row']
for iii, header in enumerate(self.column_headers, start=1):
# logger.debug(f"Row: {header_row}, column: {iii}")
self.worksheet.cell(row=header_row, column=iii, value=header.replace("_", " ").title())
# logger.debug(f"Column headers: {self.column_headers}")
for iii, result in enumerate(self.pydant_obj, start = 1):
row = header_row + iii
for k, v in result.result.items():
try:
column = next((col[0].column for col in self.worksheet.iter_cols() if col[0].value == k.replace("_", " ").title()))
except StopIteration:
print(f"fail for {k.replace('_', ' ').title()}")
continue
# logger.debug(f"Writing to row: {row}, column {column}")
cell = self.worksheet.cell(row=row, column=column)
cell.value = v
cell.alignment = Alignment(horizontal='left')
self.worksheet = self.postwrite(self.worksheet)
return workbook
@property
def column_headers(self):
output = []
for result in self.pydant_obj:
for k, value in result.result.items():
output.append(k)
return sorted(list(set(output)))

View File

@@ -2,6 +2,7 @@
Module for manager defaults.
"""
import logging
from pprint import pformat
from pathlib import Path
from frontend.widgets.functions import select_open_file
from tools import get_application_from_parent
@@ -14,6 +15,7 @@ class DefaultManager(object):
def __init__(self, parent, input_object: Path | str | None = None):
self.parent = parent
match input_object:
case str():
self.input_object = Path(input_object)

View File

@@ -22,6 +22,7 @@ class DefaultProcedureManager(DefaultManager):
if isinstance(proceduretype, str):
proceduretype = ProcedureType.query(name=proceduretype)
self.proceduretype = proceduretype
self.procedure = input_object
super().__init__(parent=parent, input_object=input_object)
@@ -84,4 +85,8 @@ class DefaultProcedureManager(DefaultManager):
Writer = getattr(results_writers, f"{result.result_type}InfoWriter")
res_info_writer = Writer(pydant_obj=result, proceduretype=self.proceduretype)
workbook = res_info_writer.write_to_workbook(workbook=workbook)
for result in self.pyd.sample_results:
Writer = getattr(results_writers, f"{result.result_type}SampleWriter")
res_sample_writer = Writer(pydant_obj=self.procedure, proceduretype=self.proceduretype)
workbook = res_sample_writer.write_to_workbook(workbook=workbook)
return workbook

View File

@@ -0,0 +1,33 @@
"""
"""
from __future__ import annotations
import logging
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
from openpyxl.reader.excel import load_workbook
from backend.db.models import Procedure
from backend.excel.parsers.results_parsers.qubit_results_parser import QubitSampleParser, QubitInfoParser
from backend.excel.writers.results_writers.qubit_results_writer import QubitInfoWriter, QubitSampleWriter
from . import DefaultResultsManager
if TYPE_CHECKING:
from backend.validators.pydant import PydResults
logger = logging.getLogger(f"submissions.{__name__}")
class QubitManager(DefaultResultsManager):
def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None):
super().__init__(procedure=procedure, parent=parent, fname=fname, extension="csv")
self.parse()
def parse(self):
self.info_parser = QubitInfoParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = QubitSampleParser(filepath=self.fname, procedure=self.procedure, start_row=self.info_parser.end_row)
def write(self):
workbook = load_workbook(BytesIO(self.procedure.proceduretype.template_file))
self.sample_writer = QubitSampleWriter(pydant_obj=self.procedure.to_pydantic(), proceduretype=self.procedure.proceduretype)
workbook = self.sample_writer.write_to_workbook(workbook)
return workbook

View File

@@ -14,10 +14,12 @@ class DefaultRunManager(DefaultManager):
def write(self) -> Workbook:
from backend.managers import DefaultClientSubmissionManager, DefaultProcedureManager
logger.info(f"Initializing write")
clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype)
self.clientsubmission = DefaultClientSubmissionManager(parent=self.parent, input_object=self.pyd.clientsubmission, submissiontype=self.pyd.clientsubmission.submissiontype)
workbook = Workbook()
workbook = clientsubmission.write(workbook=workbook)
workbook = self.clientsubmission.write(workbook=workbook)
self.procedures = []
for procedure in self.pyd.procedure:
procedure = DefaultProcedureManager(proceduretype=procedure.proceduretype, parent=self.parent, input_object=procedure)
workbook: Workbook = procedure.write(workbook=workbook)
self.procedures.append(procedure)
return workbook

View File

@@ -1,15 +1,18 @@
"""
Contains all validators
"""
from __future__ import annotations
import logging, re
import sys
from pathlib import Path
from openpyxl import load_workbook
from backend.db.models import Run, SubmissionType
from tools import jinja_template_loading
from jinja2 import Template
from dateutil.parser import parse
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from backend.db.models import SubmissionType
logger = logging.getLogger(f"submissions.{__name__}")
@@ -27,15 +30,17 @@ class DefaultNamer(object):
class ClientSubmissionNamer(DefaultNamer):
def __init__(self, filepath: str | Path, submissiontype: str|SubmissionType|None=None,
def __init__(self, filepath: str | Path, submissiontype: str|"SubmissionType"|None=None,
data: dict | None = None, **kwargs):
from backend.db.models import SubmissionType
super().__init__(filepath=filepath)
if not submissiontype:
submissiontype = self.retrieve_submissiontype(filepath=self.filepath)
self.submissiontype = self.retrieve_submissiontype(filepath=self.filepath)
if isinstance(submissiontype, str):
submissiontype = SubmissionType.query(name=submissiontype)
self.submissiontype = SubmissionType.query(name=submissiontype)
def retrieve_submissiontype(self, filepath: str | Path):
def retrieve_submissiontype(self):
from backend.db.models import SubmissionType
# NOTE: Attempt 1, get from form properties:
sub_type = self.get_subtype_from_properties()
if not sub_type:
@@ -51,6 +56,7 @@ class ClientSubmissionNamer(DefaultNamer):
return sub_type
def get_subtype_from_regex(self) -> SubmissionType:
from backend.db.models import SubmissionType
regex = SubmissionType.regex
m = regex.search(self.filepath.__str__())
try:
@@ -64,6 +70,7 @@ class ClientSubmissionNamer(DefaultNamer):
def get_subtype_from_preparse(self) -> SubmissionType:
from backend.excel.parsers.clientsubmission_parser import ClientSubmissionInfoParser
from backend.db.models import SubmissionType
parser = ClientSubmissionInfoParser(self.filepath)
sub_type = next((value for k, value in parser.parsed_info.items() if k == "submissiontype"), None)
sub_type = SubmissionType.query(name=sub_type)
@@ -72,6 +79,7 @@ class ClientSubmissionNamer(DefaultNamer):
return sub_type
def get_subtype_from_properties(self) -> SubmissionType:
from backend.db.models import SubmissionType
wb = load_workbook(self.filepath)
# NOTE: Gets first category in the metadata.
categories = wb.properties.category.split(";")
@@ -88,6 +96,7 @@ class RSLNamer(object):
"""
def __init__(self, filename: str, submission_type: str | None = None, data: dict | None = None):
from backend.db.models import SubmissionType
# NOTE: Preferred method is path retrieval, but might also need validation for just string.
filename = Path(filename) if Path(filename).exists() else filename
self.submission_type = submission_type
@@ -113,7 +122,7 @@ class RSLNamer(object):
Returns:
str: parsed procedure type
"""
from backend.db.models import SubmissionType
def st_from_path(filepath: Path) -> str:
"""
Sub def to get proceduretype from a file path
@@ -186,8 +195,9 @@ class RSLNamer(object):
regex (str): string to construct pattern
filename (str): string to be parsed
"""
from backend.db.models import Run
if regex is None:
regex = BasicRun.regex
regex = Run.regex
match filename:
case Path():
m = regex.search(filename.stem)
@@ -215,6 +225,7 @@ class RSLNamer(object):
Returns:
str: Output filename
"""
from backend.db.models import Run
if "submitted_date" in data.keys():
if isinstance(data['submitted_date'], dict):
if data['submitted_date']['value'] is not None:

View File

@@ -37,17 +37,21 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
def prevalidate(cls, data):
sql_fields = [k for k, v in cls._sql_object.__dict__.items() if isinstance(v, InstrumentedAttribute)]
output = {}
try:
items = data.items()
except AttributeError as e:
logger.error(f"Could not prevalidate {cls.__name__} due to {e} for {pformat(data)}")
return data
for key, value in items:
new_key = key.replace("_", "")
if new_key in sql_fields:
output[new_key] = value
else:
output[key] = value
match data:
case dict():
try:
items = data.items()
except AttributeError as e:
logger.error(f"Could not prevalidate {cls.__name__} due to {e} for {pformat(data)}")
return data
for key, value in items:
new_key = key.replace("_", "")
if new_key in sql_fields:
output[new_key] = value
else:
output[key] = value
case _:
output = data
return output
@model_validator(mode='after')
@@ -136,6 +140,48 @@ class PydBaseClass(BaseModel, extra='allow', validate_assignment=True):
return list(set(output))
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
result: dict = Field(default={})
result_type: str = Field(default="NA")
img: None | bytes = Field(default=None)
# parent: Procedure | ProcedureSampleAssociation | None = Field(default=None)
parent: Any | None = Field(default=None)
date_analyzed: datetime | None = Field(default=None)
@field_validator("date_analyzed")
@classmethod
def set_today(cls, value):
match value:
case str():
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
case datetime():
pass
case date():
value = datetime.combine(value, datetime.max.time())
case _:
value = datetime.now()
return value
def to_sql(self):
sql, _ = Results.query_or_create(result_type=self.result_type, result=self.results)
try:
check = sql.image
except FileNotFoundError:
check = False
if not check:
sql.image = self.img
if not sql.date_analyzed:
sql.date_analyzed = self.date_analyzed
match self.parent:
case ProcedureSampleAssociation():
sql.sampleprocedureassociation = self.parent
case Procedure():
sql.procedure = self.parent
case _:
logger.error("Improper association found.")
return sql
class PydReagentLot(PydBaseClass):
lot: str | None
name: str | None = Field(default=None)
@@ -143,10 +189,11 @@ class PydReagentLot(PydBaseClass):
missing: bool = Field(default=True)
comment: str | None = Field(default="", validate_default=True)
class PydReagent(PydBaseClass):
lot: str | None
# lot: str | None
reagentrole: str | None
expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True)
# expiry: date | datetime | Literal['NA'] | None = Field(default=None, validate_default=True)
name: str | None = Field(default=None, validate_default=True)
missing: bool = Field(default=True)
comment: str | None = Field(default="", validate_default=True)
@@ -177,47 +224,47 @@ class PydReagent(PydBaseClass):
return value
return value
@field_validator("lot", mode='before')
@classmethod
def rescue_lot_string(cls, value):
if value is not None:
return convert_nans_to_nones(str(value).strip())
return value
@field_validator("lot")
@classmethod
def enforce_lot_string(cls, value):
if value is not None:
return value.upper().strip()
return value
@field_validator("expiry", mode="before")
@classmethod
def enforce_date(cls, value):
if value is not None:
match value:
case int():
return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2)
case 'NA':
return value
case str():
return parse(value)
case date():
return datetime.combine(value, datetime.max.time())
case datetime():
return value
case _:
return convert_nans_to_nones(str(value))
if value is None:
value = datetime.combine(date.today(), datetime.max.time())
return value
@field_validator("expiry")
@classmethod
def date_na(cls, value):
if isinstance(value, date) and value.year == 1970:
value = "NA"
return value
# @field_validator("lot", mode='before')
# @classmethod
# def rescue_lot_string(cls, value):
# if value is not None:
# return convert_nans_to_nones(str(value).strip())
# return value
#
# @field_validator("lot")
# @classmethod
# def enforce_lot_string(cls, value):
# if value is not None:
# return value.upper().strip()
# return value
#
# @field_validator("expiry", mode="before")
# @classmethod
# def enforce_date(cls, value):
# if value is not None:
# match value:
# case int():
# return datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value - 2)
# case 'NA':
# return value
# case str():
# return parse(value)
# case date():
# return datetime.combine(value, datetime.max.time())
# case datetime():
# return value
# case _:
# return convert_nans_to_nones(str(value))
# if value is None:
# value = datetime.combine(date.today(), datetime.max.time())
# return value
#
# @field_validator("expiry")
# @classmethod
# def date_na(cls, value):
# if isinstance(value, date) and value.year == 1970:
# value = "NA"
# return value
@field_validator("name", mode="before")
@classmethod
@@ -227,7 +274,6 @@ class PydReagent(PydBaseClass):
else:
return values.data['reagentrole'].strip()
def improved_dict(self) -> dict:
"""
Constructs a dictionary consisting of model.fields and model.extras
@@ -250,15 +296,18 @@ class PydReagent(PydBaseClass):
Returns:
Tuple[Reagent, Report]: Reagent instance and result of function
"""
from backend.db.models import ReagentLot, Reagent
report = Report()
if self.model_extra is not None:
self.__dict__.update(self.model_extra)
reagent, new = ReagentLot.query_or_create(lot=self.lot, name=self.name)
reagentlot, new = ReagentLot.query_or_create(lot=self.lot, name=self.name)
if new:
reagentrole = ReagentRole.query(name=self.reagentrole)
reagent.reagentrole = reagentrole
reagent.expiry = self.expiry
return reagent, report
reagent = Reagent.query(name=self.name, limit=1)
reagentlot.reagent = reagent
reagentlot.expiry = self.expiry
if isinstance(reagentlot.expiry, str):
reagentlot.expiry = datetime.combine(datetime.strptime(reagentlot.expiry, "%Y-%m-%d"), datetime.max.time())
return reagentlot, report
class PydSample(PydBaseClass):
@@ -267,6 +316,7 @@ class PydSample(PydBaseClass):
enabled: bool = Field(default=True)
row: int = Field(default=0)
column: int = Field(default=0)
results: List[PydResults] | PydResults = Field(default=[])
@field_validator("sample_id", mode="before")
@classmethod
@@ -328,6 +378,7 @@ class PydTips(PydBaseClass):
Returns:
SubmissionTipsAssociation: Association between queried tips and procedure
"""
from backend.db.models import TipsLot
report = Report()
tips = TipsLot.query(lot=self.lot, limit=1)
return tips, report
@@ -345,6 +396,7 @@ class PydEquipment(PydBaseClass):
@field_validator('equipmentrole', mode='before')
@classmethod
def get_role_name(cls, value):
from backend.db.models import EquipmentRole
match value:
case list():
value = value[0]
@@ -359,6 +411,7 @@ class PydEquipment(PydBaseClass):
@field_validator('processes', mode='before')
@classmethod
def process_to_pydantic(cls, value, values):
from backend.db.models import ProcessVersion, Process
if isinstance(value, GeneratorType):
value = [item for item in value]
value = convert_nans_to_nones(value)
@@ -388,23 +441,29 @@ class PydEquipment(PydBaseClass):
@field_validator('tips', mode='before')
@classmethod
def tips_to_pydantic(cls, value, values):
from backend.db.models import TipsLot
if isinstance(value, GeneratorType):
value = [item for item in value]
value = convert_nans_to_nones(value)
if not value:
value = []
if isinstance(value, TipsLot):
value = value.to_pydantic(pyd_model_name="PydTips")
else:
try:
d: Tips = next(
(tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]),
None)
if d:
value = d.to_pydantic()
except AttributeError as e:
logger.error(f"Process Validation error due to {e}")
value = []
match value:
case TipsLot():
value = value.to_pydantic(pyd_model_name="PydTips")
case dict():
value = PydTips(**value)
case _:
pass
# else:
# try:
# d: Tips = next(
# (tips for tips in value if values.data['name'] in [item.name for item in tips.equipment]),
# None)
# if d:
# value = d.to_pydantic()
# except AttributeError as e:
# logger.error(f"Process Validation error due to {e}")
# value = []
return value
@report_result
@@ -419,6 +478,7 @@ class PydEquipment(PydBaseClass):
Returns:
Tuple[Equipment, RunEquipmentAssociation]: SQL objects
"""
from backend.db.models import Equipment, ProcedureEquipmentAssociation, Process
report = Report()
if isinstance(procedure, str):
procedure = Procedure.query(name=procedure)
@@ -471,7 +531,6 @@ class PydEquipment(PydBaseClass):
return {k: getattr(self, k) for k in fields}
class PydContact(BaseModel):
name: str
phone: str | None
@@ -633,6 +692,7 @@ class PydProcess(PydBaseClass, extra="allow"):
@report_result
def to_sql(self):
from backend.db.models import ProcessVersion
report = Report()
name = self.name.split("-")[0]
# NOTE: can't use query_or_create due to name not being part of ProcessVersion
@@ -678,12 +738,12 @@ class PydElastic(BaseModel, extra="allow", arbitrary_types_allowed=True):
# NOTE: Generified objects below:
class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
proceduretype: ProcedureType | None = Field(default=None)
run: Run | str | None = Field(default=None)
proceduretype: Any | None = Field(default=None)
run: Any | str | None = Field(default=None)
name: dict = Field(default=dict(value="NA", missing=True), validate_default=True)
technician: dict = Field(default=dict(value="NA", missing=True))
repeat: bool = Field(default=False)
repeat_of: Procedure | None = Field(default=None)
repeat_of: Any | None = Field(default=None)
plate_map: str | None = Field(default=None)
reagent: list | None = Field(default=[])
reagentrole: dict | None = Field(default={}, validate_default=True)
@@ -872,7 +932,10 @@ class PydProcedure(PydBaseClass, arbitrary_types_allowed=True):
reg.save()
def to_sql(self, new: bool = False):
from backend.db.models import RunSampleAssociation, ProcedureSampleAssociation
from backend.db.models import (
RunSampleAssociation, ProcedureSampleAssociation, Procedure, ProcedureReagentLotAssociation,
ProcedureEquipmentAssociation
)
logger.debug(f"incoming pyd: {pformat([item.__dict__ for item in self.equipment])}")
if new:
sql = Procedure()
@@ -995,9 +1058,11 @@ class PydClientSubmission(PydBaseClass):
def enforce_submitted_date(cls, value):
match value:
case str():
value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S"), missing=False)
case date() | datetime():
value = dict(value=datetime.strptime(value, "%Y-%m-%d %H:%M:%S").date(), missing=False)
case date():
value = dict(value=value, missing=False)
case datetime():
value = dict(value=value.date(), missing=False)
case _:
pass
return value
@@ -1115,6 +1180,7 @@ class PydClientSubmission(PydBaseClass):
def to_sql(self):
sql = super().to_sql()
from backend.db.models import SubmissionType
assert not any([isinstance(item, PydSample) for item in sql.sample])
sql.sample = []
if not sql.submissiontype:
@@ -1615,44 +1681,3 @@ class PydRun(PydBaseClass): #, extra='allow'):
samples.append(sample)
samples = sorted(samples, key=itemgetter("submission_rank"))
return samples
class PydResults(PydBaseClass, arbitrary_types_allowed=True):
result: dict = Field(default={})
result_type: str = Field(default="NA")
img: None | bytes = Field(default=None)
parent: Procedure | ProcedureSampleAssociation | None = Field(default=None)
date_analyzed: datetime | None = Field(default=None)
@field_validator("date_analyzed")
@classmethod
def set_today(cls, value):
match value:
case str():
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
case datetime():
pass
case date():
value = datetime.combine(value, datetime.max.time())
case _:
value = datetime.now()
return value
def to_sql(self):
sql, _ = Results.query_or_create(result_type=self.result_type, result=self.results)
try:
check = sql.image
except FileNotFoundError:
check = False
if not check:
sql.image = self.img
if not sql.date_analyzed:
sql.date_analyzed = self.date_analyzed
match self.parent:
case ProcedureSampleAssociation():
sql.sampleprocedureassociation = self.parent
case Procedure():
sql.procedure = self.parent
case _:
logger.error("Improper association found.")
return sql

View File

@@ -56,7 +56,15 @@ class ProcedureCreation(QDialog):
proceduretype_dict = self.proceduretype.details_dict()
# NOTE: Add --New-- as an option for reagents.
for key, value in self.procedure.reagentrole.items():
value.append(dict(name="--New--"))
try:
check = "--New--" in [v['name'] for v in value]
except TypeError:
try:
check = "--New--" in [v.name for v in value]
except (TypeError, AttributeError):
check = True
if not check:
value.append(dict(name="--New--"))
if self.procedure.equipment:
for equipmentrole in proceduretype_dict['equipment']:
# NOTE: Check if procedure equipment is present and move to head of the list if so.
@@ -72,7 +80,6 @@ class ProcedureCreation(QDialog):
proceduretype_dict['equipment'] = [sanitize_object_for_json(object) for object in proceduretype_dict['equipment']]
regex = re.compile(r".*R\d$")
proceduretype_dict['previous'] = [""] + [item.name for item in self.run.procedure if item.proceduretype == self.proceduretype and not bool(regex.match(item.name))]
# sys.exit(f"ProcedureDict:\n{pformat(proceduretype_dict)}")
html = render_details_template(
template_name="procedure_creation",
js_in=["procedure_form", "grid_drag", "context_menu"],
@@ -82,12 +89,13 @@ class ProcedureCreation(QDialog):
plate_map=self.plate_map,
edit=self.edit
)
# with open("procedure_creation.html", "w") as f:
# f.write(html)
self.webview.setHtml(html)
@pyqtSlot(str, str, str, str)
def update_equipment(self, equipmentrole: str, equipment: str, processversion: str, tips: str):
from backend.db.models import Equipment, ProcessVersion, TipsLot
logger.debug(f"\n\nEquipmentRole: {equipmentrole}, Equipment: {equipment}, Process: {processversion}, Tips: {tips}\n\n")
try:
equipment_of_interest = next(
(item for item in self.procedure.equipment if item.equipmentrole == equipmentrole))
@@ -148,9 +156,10 @@ class ProcedureCreation(QDialog):
@pyqtSlot(str, str, str, str)
def add_new_reagent(self, reagentrole: str, name: str, lot: str, expiry: str):
from backend.validators.pydant import PydReagent
from backend.validators.pydant import PydReagentLot
expiry = datetime.datetime.strptime(expiry, "%Y-%m-%d")
pyd = PydReagent(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
logger.debug(f"{reagentrole}, {name}, {lot}, {expiry}")
pyd = PydReagentLot(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
self.procedure.reagentrole[reagentrole].insert(0, pyd)
self.set_html()
@@ -162,6 +171,12 @@ class ProcedureCreation(QDialog):
return
self.procedure.update_reagents(reagentrole=reagentrole, name=name, lot=lot, expiry=expiry)
@pyqtSlot(str, result=list)
def get_reagent_names(self, reagentrole_name: str):
from backend.db.models import ReagentRole
reagentrole = ReagentRole.query(name=reagentrole_name)
return [item.name for item in reagentrole.get_reagents(proceduretype=self.procedure.proceduretype)]
def return_sql(self, new: bool = False):
output = self.procedure.to_sql(new=new)
return output

View File

@@ -0,0 +1,96 @@
"""
"""
from __future__ import annotations
import json
import logging, sys
from pprint import pformat
from typing import List, Generator
from PyQt6.QtWidgets import (QDialog, QGridLayout, QDialogButtonBox)
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtCore import pyqtSlot
from tools import render_details_template, row_keys
from backend.db.models import Procedure, ProcedureSampleAssociation, Results
logger = logging.getLogger(f"submissions.{__name__}")
class ResultsSampleMatcher(QDialog):
def __init__(self, parent, results_var_name: str, results: Generator[dict, None, None], samples:List[str],
procedure:Procedure, results_type: str):
super().__init__(parent=parent)
self.procedure = procedure
self.results_type = results_type
self.results_var_name = results_var_name
results = [item for item in results]
html = render_details_template("results_sample_match", results=results, results_var_name=self.results_var_name, samples=samples)
self.webview = QWebEngineView()
self.layout = QGridLayout()
self.setLayout(self.layout)
self.channel = QWebChannel()
self.channel.registerObject('backend', self)
self.webview.setHtml(html)
self.webview.page().setWebChannel(self.channel)
self.layout.addWidget(self.webview)
QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
self.layout.addWidget(self.buttonBox)
self.output = []
@pyqtSlot(bool, str, str, str)
def set_match(self, enabled: bool, sample: str, result_text:str, result: str):
logger.debug(f"Sample: {sample}")
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
result = "".join([r for r in result]).replace("\'", "\"")
try:
result = json.loads(result)
except json.decoder.JSONDecoder:
logger.error("Could not decode json.")
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
if enabled:
result = Results(sampleprocedureassociation=association, result=result, result_type=self.results_type)
self.output.append(result)
else:
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
self.output.remove(result)
@pyqtSlot(str, str)
def update_match(self, sample: str, result_text: str):
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
logger.debug(association)
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
result.sampleprocedureassociation = association
logger.debug(f"Output: {pformat(self.output)}")

View File

@@ -194,7 +194,7 @@ function contextListener() {
function clickListener() {
document.addEventListener( "click", function(e) {
var clickeElIsLink = clickInsideElement( e, contextMenuLinkClassName );
backend.log(e.target.id)
if ( clickeElIsLink ) {
e.preventDefault();
menuItemListener( clickeElIsLink );

View File

@@ -42,7 +42,7 @@ var changed_it = new Event('change');
var reagentRoles = document.getElementsByClassName("reagentrole");
for(let i = 0; i < reagentRoles.length; i++) {
reagentRoles[i].addEventListener("change", function() {
reagentRoles[i].addEventListener("change", async function() {
if (reagentRoles[i].value.includes("--New--")) {
// alert("Create new reagent.")
var br = document.createElement("br");
@@ -50,9 +50,15 @@ for(let i = 0; i < reagentRoles.length; i++) {
var new_form = document.createElement("form");
new_form.setAttribute("class", "new_reagent_form")
new_form.setAttribute("id", reagentRoles[i].id + "_addition")
var rr_name = document.createElement("input");
rr_name.setAttribute("type", "text");
var rr_name = document.createElement("select");
rr_name.setAttribute("id", "new_" + reagentRoles[i].id + "_name");
var rr_options = await backend.get_reagent_names(reagentRoles[i].id).then(
function(result) {
result.forEach( function(item) {
rr_name.options.add( new Option(item));
});
}
);
var rr_name_label = document.createElement("label");
rr_name_label.setAttribute("for", "new_" + reagentRoles[i].id + "_name");
rr_name_label.innerHTML = "Name:";

View File

@@ -0,0 +1,54 @@
{% extends "details.html" %}
{% block head %}
{{ super() }}
<title>Matching results</title>
{% endblock %}
{% block body %}
{% for result in results %}
<div class="resultholder" style="border-style: solid; border-width: 2px" data="{{ result }}">
<input type="checkbox" id="{{ loop.index }}_check" class="checker">&nbsp;&nbsp;
<span id="{{ loop.index }}_var", class="variable" data-value="{{ result }}">{{ result[results_var_name] }}</span>&nbsp;&nbsp;
<select id="{{ loop.index }}_select" class="selecter" disabled>
{% for sample in samples %}
{% if sample.well %}
<option value="{{ sample.sample.sample_id }}:{{ sample.well }}">{{ sample.sample.sample_id }}:{{ sample.well }}</option>
{% else %}
<option value="{{ sample.sample.sample_id }}">{{ sample.sample.sample_id }}</option>
{% endif %}
{% endfor %}
</select>
</div>
{% endfor %}
{% endblock %}
{% block script %}
<script>
var holders = document.getElementsByClassName("resultholder");
for(let i = 0; i < holders.length; i++) {
console.log(i);
holders[i].getElementsByClassName("checker")[0].addEventListener("change", function(){
if ( this.checked ) {
holders[i].getElementsByClassName("selecter")[0].disabled = false;
} else {
holders[i].getElementsByClassName("selecter")[0].disabled = true;
}
var enabled = this.checked;
var sample = holders[i].getElementsByClassName("selecter")[0].value;
var result = holders[i].getElementsByClassName("variable")[0].dataset.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.set_match(enabled, sample, result_text, result);
});
holders[i].getElementsByClassName("selecter")[0].addEventListener("change", function(){
var sample = this.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.update_match(sample, result_text);
});
}
</script>
{{ super() }}
{% endblock %}

View File

@@ -782,7 +782,8 @@ def yaml_regex_creator(loader, node):
nodes = loader.construct_sequence(node)
name = nodes[0].replace(" ", "_")
abbr = nodes[1]
return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)"
# return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)"
return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\\d{2}-?\\d{2}-?\\d{2}(?:(_|-)?\\d?([^_0123456789\\sA-QS-Z]|$)?R?\\d?)?)"
def super_splitter(ins_str: str, substring: str, idx: int) -> str: