basic procedure creation working

This commit is contained in:
lwark
2025-05-28 14:26:12 -05:00
parent 4e1e06bb5e
commit fef964fba0
7 changed files with 319 additions and 36 deletions

View File

@@ -6,6 +6,7 @@ import json, zipfile, yaml, logging, re, sys
from operator import itemgetter
from pprint import pformat
import numpy as np
from jinja2 import Template, TemplateNotFound
from sqlalchemy import Column, String, TIMESTAMP, JSON, INTEGER, ForeignKey, Interval, Table, FLOAT, BLOB
from sqlalchemy.orm import relationship, validates, Query
@@ -539,9 +540,19 @@ class ReagentRole(BaseClass):
logger.debug(f"Constructing OmniReagentRole with name {self.name}")
return OmniReagentRole(instance_object=self, name=self.name, eol_ext=self.eol_ext)
@property
def reagents(self):
return [f"{reagent.name} - {reagent.lot}" for reagent in self.reagent]
def get_reagents(self, kittype: str | KitType | None = None):
if not kittype:
return [f"{reagent.name} - {reagent.lot}" for reagent in self.reagent]
if isinstance(kittype, str):
kittype = KitType.query(name=kittype)
assoc = next((item for item in self.reagentrolekittypeassociation if item.kittype == kittype), None)
reagents = [reagent for reagent in self.reagent]
if assoc:
last_used = Reagent.query(name=assoc.last_used)
if last_used:
reagents.insert(0, reagents.pop(reagents.index(last_used)))
return [f"{reagent.name} - {reagent.lot}" for reagent in reagents]
class Reagent(BaseClass, LogMixin):
"""
@@ -1151,21 +1162,25 @@ class ProcedureType(BaseClass):
def as_dict(self):
return dict(
name=self.name,
kittype=[item.name for item in self.kittype]
kittype=[item.name for item in self.kittype],
plate_rows=self.plate_rows,
plate_columns=self.plate_columns
)
def construct_dummy_procedure(self):
def construct_dummy_procedure(self, run: Run|None=None):
from backend.validators.pydant import PydProcedure
if run:
samples = run.constuct_sample_dicts_for_proceduretype(proceduretype=self)
output = dict(
proceduretype=self,
#name=dict(value=self.name, missing=True),
#possible_kits=[kittype.name for kittype in self.kittype],
repeat=False,
plate_map=self.construct_plate_map()
repeat=False
# plate_map=plate_map
)
return PydProcedure(**output)
def construct_plate_map(self) -> str:
def construct_plate_map(self, sample_dicts: List[dict]) -> str:
"""
Constructs an html based plate map for procedure details.
@@ -1179,22 +1194,31 @@ class ProcedureType(BaseClass):
"""
if self.plate_rows == 0 or self.plate_columns == 0:
return "<br/>"
plate_rows = range(1, self.plate_rows + 1)
plate_columns = range(1, self.plate_columns + 1)
total_wells = self.plate_columns * self.plate_rows
vw = round((-0.07 * total_wells) + 12.2, 1)
wells = [dict(name="", row=row, column=column, background_color="#ffffff")
for row in plate_rows
for column in plate_columns]
# plate_rows = range(1, self.plate_rows + 1)
# plate_columns = range(1, self.plate_columns + 1)
# total_wells = self.plate_columns * self.plate_rows
vw = round((-0.07 * len(sample_dicts)) + 12.2, 1)
# sample_dicts = run.constuct_sample_dicts_for_proceduretype(proceduretype=self)
# output_samples = [next((item for item in sample_dicts if item['row'] == row and item['column'] == column),
# dict(sample_id="", row=row, column=column, background_color="#ffffff"))
# for row in plate_rows
# for column in plate_columns]
# logger.debug(f"Output samples:\n{pformat(output_samples)}")
# NOTE: An overly complicated list comprehension create a list of sample locations
# NOTE: next will return a blank cell if no value found for row/column
env = jinja_template_loading()
template = env.get_template("plate_map.html")
html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=wells, vw=vw)
html = template.render(plate_rows=self.plate_rows, plate_columns=self.plate_columns, samples=sample_dicts, vw=vw)
return html + "<br/>"
@property
def ranked_plate(self):
matrix = np.array([[0 for yyy in range(1, self.plate_rows + 1)] for xxx in range(1, self.plate_columns + 1)])
return {iii: (item[0][1] + 1, item[0][0] + 1) for iii, item in enumerate(np.ndenumerate(matrix), start=1)}
@property
def total_wells(self):
return self.plate_rows * self.plate_columns
class Procedure(BaseClass):
id = Column(INTEGER, primary_key=True)

View File

@@ -28,11 +28,13 @@ from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report, \
report_result, create_holidays_for_year, check_dictionary_inclusion_equality
from datetime import datetime, date
from typing import List, Any, Tuple, Literal, Generator, Type
from typing import List, Any, Tuple, Literal, Generator, Type, TYPE_CHECKING
from pathlib import Path
from jinja2.exceptions import TemplateNotFound
from jinja2 import Template
from PIL import Image
if TYPE_CHECKING:
from backend.db.models.kits import ProcedureType
from . import kittype_procedure
@@ -653,6 +655,10 @@ class Run(BaseClass, LogMixin):
output_list = [assoc.hitpicked for assoc in self.runsampleassociation]
return output_list
@property
def sample_dicts(self) -> List[dict]:
return [dict(sample_id=assoc.sample.sample_id, row=assoc.row, column=assoc.column, background_color="#6ffe1d") for assoc in self.runsampleassociation]
@classmethod
def make_plate_map(cls, sample_list: list, plate_rows: int = 8, plate_columns=12) -> str:
"""
@@ -1273,6 +1279,51 @@ class Run(BaseClass, LogMixin):
def allowed_procedures(self):
return self.clientsubmission.submissiontype.proceduretype
def get_submission_rank_of_sample(self, sample: Sample|str):
if isinstance(sample, str):
sample = Sample.query(sample_id=sample)
clientsubmissionsampleassoc = next((assoc for assoc in self.clientsubmission.clientsubmissionsampleassociation
if assoc.sample == sample), None)
if clientsubmissionsampleassoc:
return clientsubmissionsampleassoc.submission_rank
else:
return 0
def constuct_sample_dicts_for_proceduretype(self, proceduretype: ProcedureType):
plate_dict = proceduretype.ranked_plate
ranked_samples = []
unranked_samples = []
for sample in self.sample:
submission_rank = self.get_submission_rank_of_sample(sample=sample)
if submission_rank != 0:
row, column = plate_dict[submission_rank]
ranked_samples.append(dict(sample_id=sample.sample_id, row=row, column=column, submission_rank=submission_rank, background_color="#6ffe1d"))
else:
unranked_samples.append(sample)
possible_ranks = (item for item in list(plate_dict.keys()) if item not in [sample['submission_rank'] for sample in ranked_samples])
# logger.debug(possible_ranks)
# possible_ranks = (plate_dict[idx] for idx in possible_ranks)
for sample in unranked_samples:
try:
submission_rank = next(possible_ranks)
except StopIteration:
continue
row, column = plate_dict[submission_rank]
ranked_samples.append(
dict(sample_id=sample.sample_id, row=row, column=column, submission_rank=submission_rank,
background_color="#6ffe1d"))
padded_list = []
for iii in range(1, proceduretype.total_wells+1):
sample = next((item for item in ranked_samples if item['submission_rank']==iii),
dict(sample_id="", row=0, column=0, submission_rank=iii)
)
padded_list.append(sample)
logger.debug(f"Final padded list:\n{pformat(list(sorted(padded_list, key=itemgetter('submission_rank'))))}")
return list(sorted(padded_list, key=itemgetter('submission_rank')))
class SampleType(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key