Qubit results parsing complete.

This commit is contained in:
lwark
2025-09-23 09:00:25 -05:00
parent 4d70d751ca
commit 4522f5909e
4 changed files with 241 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
"""
"""
from __future__ import annotations
import logging
from csv import reader
from typing import Generator, TYPE_CHECKING
from frontend.widgets.results_sample_matcher import ResultsSampleMatcher
from backend import Procedure
from backend.db.models import ProcedureSampleAssociation
from backend.excel.parsers.results_parsers import DefaultResultsInfoParser, DefaultResultsSampleParser
from pathlib import Path
if TYPE_CHECKING:
from backend.validators.pydant import PydSample
logger = logging.getLogger(f"submissions.{__name__}")
class QubitInfoParser(DefaultResultsInfoParser):
def __init__(self, filepath: Path | str, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
def to_pydantic(self):
"""
Since there is no overview generated, return blank PydResults object.
Returns:
PydResults
"""
from backend.validators.pydant import PydResults
return None
class QubitSampleParser(DefaultResultsSampleParser):
"""Object to pull data from Design and Analysis PCR export file."""
def __init__(self, filepath: Path | str, sheet: str | None = None, start_row: int = 1, procedure=None, **kwargs):
self.results_type = "Qubit"
self.procedure = procedure
super().__init__(filepath=filepath, proceduretype=self.procedure.proceduretype, results_type="Qubit")
self.sample_matcher()
def sample_matcher(self):
# samples = [item for item in self.procedure.proceduresampleassociation]
dlg = ResultsSampleMatcher(
parent=None,
results_var_name="original_sample_conc.",
results=self.parsed_info,
samples=self.procedure.proceduresampleassociation,
procedure=self.procedure,
results_type="Qubit"
)
if dlg.exec():
for result in dlg.output:
result.save()

View File

@@ -0,0 +1,33 @@
"""
"""
from __future__ import annotations
import logging
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
from openpyxl.reader.excel import load_workbook
from backend.db.models import Procedure
from backend.excel.parsers.results_parsers.qubit_results_parser import QubitSampleParser, QubitInfoParser
# from backend.excel.writers.results_writers.pcr_results_writer import QubitInfoWriter, QubitSampleWriter
from . import DefaultResultsManager
if TYPE_CHECKING:
from backend.validators.pydant import PydResults
logger = logging.getLogger(f"submissions.{__name__}")
class QubitManager(DefaultResultsManager):
def __init__(self, procedure: Procedure, parent, fname: Path | str | None = None):
super().__init__(procedure=procedure, parent=parent, fname=fname, extension="csv")
self.parse()
def parse(self):
self.info_parser = QubitInfoParser(filepath=self.fname, procedure=self.procedure)
self.sample_parser = QubitSampleParser(filepath=self.fname, procedure=self.procedure, start_row=self.info_parser.end_row)
def write(self):
workbook = load_workbook(BytesIO(self.procedure.proceduretype.template_file))
self.info_writer = PCRInfoWriter(pydant_obj=self.procedure.to_pydantic(), proceduretype=self.procedure.proceduretype)
workbook = self.info_writer.write_to_workbook(workbook)
self.sample_writer = PCRSampleWriter(pydant_obj=self.procedure.to_pydantic(), proceduretype=self.procedure.proceduretype)

View File

@@ -0,0 +1,96 @@
"""
"""
from __future__ import annotations
import json
import logging, sys
from pprint import pformat
from typing import List, Generator
from PyQt6.QtWidgets import (QDialog, QGridLayout, QDialogButtonBox)
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWebChannel import QWebChannel
from PyQt6.QtCore import pyqtSlot
from tools import render_details_template, row_keys
from backend.db.models import Procedure, ProcedureSampleAssociation, Results
logger = logging.getLogger(f"submissions.{__name__}")
class ResultsSampleMatcher(QDialog):
def __init__(self, parent, results_var_name: str, results: Generator[dict, None, None], samples:List[str],
procedure:Procedure, results_type: str):
super().__init__(parent=parent)
self.procedure = procedure
self.results_type = results_type
self.results_var_name = results_var_name
results = [item for item in results]
html = render_details_template("results_sample_match", results=results, results_var_name=self.results_var_name, samples=samples)
self.webview = QWebEngineView()
self.layout = QGridLayout()
self.setLayout(self.layout)
self.channel = QWebChannel()
self.channel.registerObject('backend', self)
self.webview.setHtml(html)
self.webview.page().setWebChannel(self.channel)
self.layout.addWidget(self.webview)
QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
self.layout.addWidget(self.buttonBox)
self.output = []
@pyqtSlot(bool, str, str, str)
def set_match(self, enabled: bool, sample: str, result_text:str, result: str):
logger.debug(f"Sample: {sample}")
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
result = "".join([r for r in result]).replace("\'", "\"")
try:
result = json.loads(result)
except json.decoder.JSONDecoder:
logger.error("Could not decode json.")
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
if enabled:
result = Results(sampleprocedureassociation=association, result=result, result_type=self.results_type)
self.output.append(result)
else:
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
self.output.remove(result)
@pyqtSlot(str, str)
def update_match(self, sample: str, result_text: str):
if ":" in sample:
sample_id = sample.split(":")[0]
well = sample.split(":")[1]
row = row_keys[well[0]]
column = int(well[1:])
else:
row = None
column = None
logger.debug(f"Search: {self.procedure}, {sample_id}, {row}, {column}")
association = ProcedureSampleAssociation.query(procedure=self.procedure, sample=sample_id, row=row, column=column)
logger.debug(association)
try:
result = next(
(item for item in self.output if str(item.result[self.results_var_name]) == result_text)
)
except StopIteration:
logger.error(f"Couldn't find association for {result_text}")
return
result.sampleprocedureassociation = association
logger.debug(f"Output: {pformat(self.output)}")

View File

@@ -0,0 +1,54 @@
{% extends "details.html" %}
{% block head %}
{{ super() }}
<title>Matching results</title>
{% endblock %}
{% block body %}
{% for result in results %}
<div class="resultholder" style="border-style: solid; border-width: 2px" data="{{ result }}">
<input type="checkbox" id="{{ loop.index }}_check" class="checker">&nbsp;&nbsp;
<span id="{{ loop.index }}_var", class="variable" data-value="{{ result }}">{{ result[results_var_name] }}</span>&nbsp;&nbsp;
<select id="{{ loop.index }}_select" class="selecter" disabled>
{% for sample in samples %}
{% if sample.well %}
<option value="{{ sample.sample.sample_id }}:{{ sample.well }}">{{ sample.sample.sample_id }}:{{ sample.well }}</option>
{% else %}
<option value="{{ sample.sample.sample_id }}">{{ sample.sample.sample_id }}</option>
{% endif %}
{% endfor %}
</select>
</div>
{% endfor %}
{% endblock %}
{% block script %}
<script>
var holders = document.getElementsByClassName("resultholder");
for(let i = 0; i < holders.length; i++) {
console.log(i);
holders[i].getElementsByClassName("checker")[0].addEventListener("change", function(){
if ( this.checked ) {
holders[i].getElementsByClassName("selecter")[0].disabled = false;
} else {
holders[i].getElementsByClassName("selecter")[0].disabled = true;
}
var enabled = this.checked;
var sample = holders[i].getElementsByClassName("selecter")[0].value;
var result = holders[i].getElementsByClassName("variable")[0].dataset.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.set_match(enabled, sample, result_text, result);
});
holders[i].getElementsByClassName("selecter")[0].addEventListener("change", function(){
var sample = this.value;
var result_text = holders[i].getElementsByClassName("variable")[0].textContent
backend.update_match(sample, result_text);
});
}
</script>
{{ super() }}
{% endblock %}