Context menu for runs working.

This commit is contained in:
lwark
2025-05-22 10:00:25 -05:00
parent 75c665ea05
commit d850166e08
40 changed files with 2852 additions and 3329 deletions

View File

@@ -55,7 +55,7 @@ def update_log(mapper, connection, target):
continue
added = [str(item) for item in hist.added]
# NOTE: Attributes left out to save space
# if attr.key in ['artic_technician', 'submission_sample_associations', 'submission_reagent_associations',
# if attr.key in ['artic_technician', 'clientsubmissionsampleassociation', 'submission_reagent_associations',
# 'submission_equipment_associations', 'submission_tips_associations', 'contact_id', 'gel_info',
# 'gel_controls', 'source_plates']:
if attr.key in LogMixin.tracking_exclusion:

View File

@@ -3,6 +3,8 @@ Contains all models for sqlalchemy
"""
from __future__ import annotations
import sys, logging
from dateutil.parser import parse
from pandas import DataFrame
from pydantic import BaseModel
from sqlalchemy import Column, INTEGER, String, JSON
@@ -21,7 +23,7 @@ if 'pytest' in sys.modules:
# NOTE: For inheriting in LogMixin
Base: DeclarativeMeta = declarative_base()
logger = logging.getLogger(f"submissions.{__name__}")
logger = logging.getLogger(f"procedure.{__name__}")
class BaseClass(Base):
@@ -33,12 +35,12 @@ class BaseClass(Base):
__table_args__ = {'extend_existing': True} #: NOTE Will only add new columns
singles = ['id']
omni_removes = ["id", 'runs', "omnigui_class_dict", "omnigui_instance_dict"]
omni_removes = ["id", 'run', "omnigui_class_dict", "omnigui_instance_dict"]
omni_sort = ["name"]
omni_inheritable = []
searchables = []
misc_info = Column(JSON)
_misc_info = Column(JSON)
def __repr__(self) -> str:
try:
@@ -122,6 +124,10 @@ class BaseClass(Base):
from test_settings import ctx
return ctx.backup_path
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._misc_info = dict()
@classproperty
def jsons(cls) -> List[str]:
"""
@@ -130,7 +136,10 @@ class BaseClass(Base):
Returns:
List[str]: List of column names
"""
return [item.name for item in cls.__table__.columns if isinstance(item.type, JSON)]
try:
return [item.name for item in cls.__table__.columns if isinstance(item.type, JSON)]
except AttributeError:
return []
@classproperty
def timestamps(cls) -> List[str]:
@@ -140,7 +149,10 @@ class BaseClass(Base):
Returns:
List[str]: List of column names
"""
return [item.name for item in cls.__table__.columns if isinstance(item.type, TIMESTAMP)]
try:
return [item.name for item in cls.__table__.columns if isinstance(item.type, TIMESTAMP)]
except AttributeError:
return []
@classmethod
def get_default_info(cls, *args) -> dict | list | str:
@@ -155,7 +167,7 @@ class BaseClass(Base):
return dict(singles=singles)
@classmethod
def find_regular_subclass(cls, name: str|None = None) -> Any:
def find_regular_subclass(cls, name: str | None = None) -> Any:
"""
Args:
name (str): name of subclass of interest.
@@ -198,11 +210,11 @@ class BaseClass(Base):
@classmethod
def results_to_df(cls, objects: list | None = None, **kwargs) -> DataFrame:
"""
Converts class sub_dicts into a Dataframe for all controls of the class.
Converts class sub_dicts into a Dataframe for all control of the class.
Args:
objects (list): Objects to be converted to dataframe.
**kwargs (): Arguments necessary for the to_sub_dict method. eg extraction_kit=X
**kwargs (): Arguments necessary for the to_sub_dict method. eg kittype=X
Returns:
Dataframe
@@ -219,6 +231,24 @@ class BaseClass(Base):
records = [{k: v['instance_attr'] for k, v in obj.omnigui_instance_dict.items()} for obj in objects]
return DataFrame.from_records(records)
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Any, bool]:
new = False
allowed = [k for k, v in cls.__dict__.items() if isinstance(v, InstrumentedAttribute)
and not isinstance(v.property, _RelationshipDeclared)]
sanitized_kwargs = {k: v for k, v in kwargs.items() if k in allowed}
logger.debug(f"Sanitized kwargs: {sanitized_kwargs}")
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
logger.debug(f"QorC Setting {k} to {v}")
setattr(instance, k, v)
logger.info(f"Instance from query or create: {instance}, new: {new}")
return instance, new
@classmethod
def query(cls, **kwargs) -> Any | List[Any]:
"""
@@ -227,6 +257,8 @@ class BaseClass(Base):
Returns:
Any | List[Any]: Result of query execution.
"""
if "name" in kwargs.keys():
kwargs['limit'] = 1
return cls.execute_query(**kwargs)
@classmethod
@@ -243,16 +275,17 @@ class BaseClass(Base):
Any | List[Any]: Single result if limit = 1 or List if other.
"""
# logger.debug(f"Kwargs: {kwargs}")
if model is None:
model = cls
# if model is None:
# model = cls
# logger.debug(f"Model: {model}")
if query is None:
query: Query = cls.__database_session__.query(model)
singles = model.get_default_info('singles')
query: Query = cls.__database_session__.query(cls)
singles = cls.get_default_info('singles')
for k, v in kwargs.items():
logger.info(f"Using key: {k} with value: {v}")
try:
attr = getattr(model, k)
attr = getattr(cls, k)
# NOTE: account for attrs that use list.
if attr.property.uselist:
query = query.filter(attr.contains(v))
@@ -341,6 +374,26 @@ class BaseClass(Base):
"""
return dict()
@classproperty
def details_template(cls) -> Template:
"""
Get the details jinja template for the correct class
Args:
base_dict (dict): incoming dictionary of Submission fields
Returns:
Tuple(dict, Template): (Updated dictionary, Template to be rendered)
"""
env = jinja_template_loading()
temp_name = f"{cls.__name__.lower()}_details.html"
try:
template = env.get_template(temp_name)
except TemplateNotFound as e:
# logger.error(f"Couldn't find template {e}")
template = env.get_template("details.html")
return template
def check_all_attributes(self, attributes: dict) -> bool:
"""
Checks this instance against a dictionary of attributes to determine if they are a match.
@@ -405,15 +458,29 @@ class BaseClass(Base):
"""
Custom dunder method to handle potential list relationship issues.
"""
# logger.debug(f"Attempting to set: {key} to {value}")
if key.startswith("_"):
return super().__setattr__(key, value)
try:
check = not hasattr(self, key)
except:
return
if check:
try:
json.dumps(value)
except TypeError:
value = str(value)
self._misc_info.update({key: value})
return
try:
field_type = getattr(self.__class__, key)
except AttributeError:
return super().__setattr__(key, value)
if isinstance(field_type, InstrumentedAttribute):
logger.debug(f"{key} is an InstrumentedAttribute.")
# logger.debug(f"{key} is an InstrumentedAttribute.")
match field_type.property:
case ColumnProperty():
logger.debug(f"Setting ColumnProperty to {value}")
# logger.debug(f"Setting ColumnProperty to {value}")
return super().__setattr__(key, value)
case _RelationshipDeclared():
logger.debug(f"{self.__class__.__name__} Setting _RelationshipDeclared for {key} to {value}")
@@ -446,10 +513,13 @@ class BaseClass(Base):
try:
return super().__setattr__(key, value)
except AttributeError:
logger.debug(f"Possible attempt to set relationship to simple var type.")
logger.debug(f"Possible attempt to set relationship {key} to simple var type. {value}")
relationship_class = field_type.property.entity.entity
value = relationship_class.query(name=value)
return super().__setattr__(key, value)
try:
return super().__setattr__(key, value)
except AttributeError:
return super().__setattr__(key, None)
case _:
return super().__setattr__(key, value)
else:
@@ -458,7 +528,7 @@ class BaseClass(Base):
def delete(self):
logger.error(f"Delete has not been implemented for {self.__class__.__name__}")
def rectify_query_date(input_date, eod: bool = False) -> str:
def rectify_query_date(input_date: datetime, eod: bool = False) -> str:
"""
Converts input into a datetime string for querying purposes
@@ -486,8 +556,7 @@ class BaseClass(Base):
class LogMixin(Base):
tracking_exclusion: ClassVar = ['artic_technician', 'submission_sample_associations',
tracking_exclusion: ClassVar = ['artic_technician', 'clientsubmissionsampleassociation',
'submission_reagent_associations', 'submission_equipment_associations',
'submission_tips_associations', 'contact_id', 'gel_info', 'gel_controls',
'source_plates']
@@ -540,13 +609,12 @@ class ConfigItem(BaseClass):
from .controls import *
# NOTE: import order must go: orgs, kit, runs due to circular import issues
# NOTE: import order must go: orgs, kittype, run due to circular import issues
from .organizations import *
from .runs import *
from .kits import *
from .submissions import *
from .audit import AuditLog
# NOTE: Add a creator to the run for reagent association. Assigned here due to circular import constraints.
# NOTE: Add a creator to the procedure for reagent association. Assigned here due to circular import constraints.
# https://docs.sqlalchemy.org/en/20/orm/extensions/associationproxy.html#sqlalchemy.ext.associationproxy.association_proxy.params.creator
Procedure.reagents.creator = lambda reg: ProcedureReagentAssociation(reagent=reg)
# Procedure.reagents.creator = lambda reg: ProcedureReagentAssociation(reagent=reg)

View File

@@ -27,7 +27,7 @@ class ControlType(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(255), unique=True) #: controltype name (e.g. Irida Control)
targets = Column(JSON) #: organisms checked for
controls = relationship("Control", back_populates="controltype") #: control samples created of this type.
control = relationship("Control", back_populates="controltype") #: control sample created of this type.
@classmethod
@setup_lookup
@@ -59,16 +59,16 @@ class ControlType(BaseClass):
Get subtypes associated with this controltype (currently used only for Kraken)
Args:
mode (str): analysis mode sub_type
mode (str): analysis mode submissiontype
Returns:
List[str]: list of subtypes available
"""
if not self.controls:
if not self.control:
return
# NOTE: Get first instance since all should have same subtypes
# NOTE: Get mode of instance
jsoner = getattr(self.controls[0], mode)
jsoner = getattr(self.control[0], mode)
try:
# NOTE: Pick genera (all should have same subtypes)
genera = list(jsoner.keys())[0]
@@ -79,7 +79,7 @@ class ControlType(BaseClass):
return subtypes
@property
def instance_class(self) -> Control:
def control_class(self) -> Control:
"""
Retrieves the Control class associated with this controltype
@@ -119,27 +119,27 @@ class Control(BaseClass):
id = Column(INTEGER, primary_key=True) #: primary key
controltype_name = Column(String, ForeignKey("_controltype.name", ondelete="SET NULL",
name="fk_BC_subtype_name")) #: name of joined run type
controltype = relationship("ControlType", back_populates="controls",
name="fk_BC_subtype_name")) #: name of joined procedure type
controltype = relationship("ControlType", back_populates="control",
foreign_keys=[controltype_name]) #: reference to parent control type
name = Column(String(255), unique=True) #: Sample ID
sample_id = Column(String, ForeignKey("_basicsample.id", ondelete="SET NULL",
name="fk_Cont_sample_id")) #: name of joined run type
sample = relationship("BasicSample", back_populates="control") #: This control's run sample
sample_id = Column(String, ForeignKey("_sample.id", ondelete="SET NULL",
name="fk_Cont_sample_id")) #: name of joined procedure type
sample = relationship("Sample", back_populates="control") #: This control's procedure sample
submitted_date = Column(TIMESTAMP) #: Date submitted to Robotics
procedure_id = Column(INTEGER, ForeignKey("_procedure.id")) #: parent run id
procedure = relationship("Procedure", back_populates="controls",
foreign_keys=[procedure_id]) #: parent run
procedure_id = Column(INTEGER, ForeignKey("_procedure.id")) #: parent procedure id
procedure = relationship("Procedure", back_populates="control",
foreign_keys=[procedure_id]) #: parent procedure
__mapper_args__ = {
"polymorphic_identity": "Basic Control",
"polymorphic_on": case(
(controltype_name == "PCR Control", "PCR Control"),
(controltype_name == "Irida Control", "Irida Control"),
else_="Basic Control"
),
"with_polymorphic": "*",
}
# __mapper_args__ = {
# "polymorphic_identity": "Basic Control",
# "polymorphic_on": case(
# (controltype_name == "PCR Control", "PCR Control"),
# (controltype_name == "Irida Control", "Irida Control"),
# else_="Basic Control"
# ),
# "with_polymorphic": "*",
# }
def __repr__(self) -> str:
return f"<{self.controltype_name}({self.name})>"
@@ -284,448 +284,448 @@ class Control(BaseClass):
self.__database_session__.commit()
class PCRControl(Control):
"""
Class made to hold info from Design & Analysis software.
"""
id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
subtype = Column(String(16)) #: PC or NC
target = Column(String(16)) #: N1, N2, etc.
ct = Column(FLOAT) #: PCR result
reagent_lot = Column(String(64), ForeignKey("_reagent.lot", ondelete="SET NULL",
name="fk_reagent_lot"))
reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control
__mapper_args__ = dict(polymorphic_identity="PCR Control",
polymorphic_load="inline",
inherit_condition=(id == Control.id))
def to_sub_dict(self) -> dict:
"""
Creates dictionary of fields for this object.
Returns:
dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date
"""
return dict(
name=self.name,
ct=self.ct,
subtype=self.subtype,
target=self.target,
reagent_lot=self.reagent_lot,
submitted_date=self.submitted_date.date()
)
@classmethod
@report_result
def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]:
"""
Creates a PCRFigure. Overrides parent
Args:
parent (__type__): Widget to contain the chart.
chart_settings (dict): settings passed down from chart widget
ctx (Settings): settings passed down from gui. Not used here.
Returns:
Tuple[Report, "PCRFigure"]: Report of status and resulting figure.
"""
from frontend.visualizations.pcr_charts import PCRFigure
parent.mode_typer.clear()
parent.mode_typer.setEnabled(False)
report = Report()
controls = cls.query(proceduretype=chart_settings['sub_type'], start_date=chart_settings['start_date'],
end_date=chart_settings['end_date'])
data = [control.to_sub_dict() for control in controls]
df = DataFrame.from_records(data)
# NOTE: Get all PCR controls with ct over 0
try:
df = df[df.ct > 0.0]
except AttributeError:
df = df
fig = PCRFigure(df=df, modes=[], settings=chart_settings)
return report, fig
def to_pydantic(self):
from backend.validators import PydPCRControl
return PydPCRControl(**self.to_sub_dict(),
controltype_name=self.controltype_name,
submission_id=self.submission_id)
class IridaControl(Control):
subtyping_allowed = ['kraken']
id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism
matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism
kraken = Column(JSON) #: unstructured output from kraken_report
subtype = Column(String(16), nullable=False) #: EN-NOS, MCS-NOS, etc
refseq_version = Column(String(16)) #: version of refseq used in fastq parsing
kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing
kraken2_db_version = Column(String(32)) #: folder name of kraken2 db
sample_id = Column(INTEGER,
ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key
__mapper_args__ = dict(polymorphic_identity="Irida Control",
polymorphic_load="inline",
inherit_condition=(id == Control.id))
@property
def targets(self):
if self.controltype.targets:
return list(itertools.chain.from_iterable([value for key, value in self.controltype.targets.items()
if key == self.subtype]))
else:
return ["None"]
@validates("subtype")
def enforce_subtype_literals(self, key: str, value: str) -> str:
"""
Validates sub_type field with acceptable values
Args:
key (str): Field name
value (str): Field Value
Raises:
KeyError: Raised if value is not in the acceptable list.
Returns:
str: Validated string.
"""
acceptables = ['ATCC49226', 'ATCC49619', 'EN-NOS', "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"]
if value.upper() not in acceptables:
raise KeyError(f"Sub-type must be in {acceptables}")
return value
def to_sub_dict(self) -> dict:
"""
Converts object into convenient dictionary for use in run summary
Returns:
dict: output dictionary containing: Name, Type, Targets, Top Kraken results
"""
try:
kraken = self.kraken
except TypeError:
kraken = {}
try:
kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()])
except AttributeError:
kraken_cnt_total = 0
try:
new_kraken = [dict(name=key, kraken_count=value['kraken_count'],
kraken_percent=f"{value['kraken_count'] / kraken_cnt_total:0.2%}",
target=key in self.controltype.targets)
for key, value in kraken.items()]
new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)[0:10]
except (AttributeError, ZeroDivisionError):
new_kraken = []
output = dict(
name=self.name,
type=self.controltype.name,
targets=", ".join(self.targets),
kraken=new_kraken
)
return output
def convert_by_mode(self, control_sub_type: str, mode: Literal['kraken', 'matches', 'contains'],
consolidate: bool = False) -> Generator[dict, None, None]:
"""
split this instance into analysis types ('kraken', 'matches', 'contains') for controls graphs
Args:
consolidate (bool): whether to merge all off-target genera. Defaults to False
control_sub_type (str): control subtype, 'MCS-NOS', etc.
mode (Literal['kraken', 'matches', 'contains']): analysis type, 'contains', etc.
Returns:
List[dict]: list of records
"""
try:
data = self.__getattribute__(mode)
except TypeError:
data = {}
if data is None:
data = {}
# NOTE: Data truncation and consolidation.
if "kraken" in mode:
data = {k: v for k, v in sorted(data.items(), key=lambda d: d[1][f"{mode}_count"], reverse=True)[:50]}
else:
if consolidate:
on_tar = {k: v for k, v in data.items() if k.strip("*") in self.controltype.targets[control_sub_type]}
off_tar = sum(v[f'{mode}_ratio'] for k, v in data.items() if
k.strip("*") not in self.controltype.targets[control_sub_type])
on_tar['Off-target'] = {f"{mode}_ratio": off_tar}
data = on_tar
for genus in data:
_dict = dict(
name=self.name,
submitted_date=self.submitted_date,
genus=genus,
target='Target' if genus.strip("*") in self.controltype.targets[control_sub_type] else "Off-target"
)
for key in data[genus]:
_dict[key] = data[genus][key]
yield _dict
@classproperty
def modes(cls) -> List[str]:
"""
Get all control modes from database
Returns:
List[str]: List of control mode names.
"""
try:
cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)]
except AttributeError as e:
logger.error(f"Failed to get available modes from db: {e}")
cols = []
return cols
@classmethod
def make_parent_buttons(cls, parent: QWidget) -> None:
"""
Creates buttons for controlling
Args:
parent (QWidget): chart holding widget to add buttons to.
"""
super().make_parent_buttons(parent=parent)
rows = parent.layout.rowCount() - 2
# NOTE: check box for consolidating off-target items
checker = QCheckBox(parent)
checker.setChecked(True)
checker.setObjectName("irida_check")
checker.setToolTip("Pools off-target genera to save time.")
parent.layout.addWidget(QLabel("Consolidate Off-targets"), rows, 0, 1, 1)
parent.layout.addWidget(checker, rows, 1, 1, 2)
checker.checkStateChanged.connect(parent.update_data)
@classmethod
@report_result
def make_chart(cls, chart_settings: dict, parent, ctx) -> Tuple[Report, "IridaFigure" | None]:
"""
Creates a IridaFigure. Overrides parent
Args:
parent (__type__): Widget to contain the chart.
chart_settings (dict): settings passed down from chart widget
ctx (Settings): settings passed down from gui.
Returns:
Tuple[Report, "IridaFigure"]: Report of status and resulting figure.
"""
from frontend.visualizations import IridaFigure
try:
checker = parent.findChild(QCheckBox, name="irida_check")
if chart_settings['mode'] == "kraken":
checker.setEnabled(False)
checker.setChecked(False)
else:
checker.setEnabled(True)
consolidate = checker.isChecked()
except AttributeError:
consolidate = False
report = Report()
controls = cls.query(subtype=chart_settings['sub_type'], start_date=chart_settings['start_date'],
end_date=chart_settings['end_date'])
if not controls:
report.add_result(Result(status="Critical", msg="No controls found in given date range."))
return report, None
# NOTE: change each control to list of dictionaries
data = [control.convert_by_mode(control_sub_type=chart_settings['sub_type'], mode=chart_settings['mode'],
consolidate=consolidate) for
control in controls]
# NOTE: flatten data to one dimensional list
# data = [item for sublist in data for item in sublist]
data = flatten_list(data)
if not data:
report.add_result(Result(status="Critical", msg="No data found for controls in given date range."))
return report, None
df = cls.convert_data_list_to_df(input_df=data, sub_mode=chart_settings['sub_mode'])
if chart_settings['sub_mode'] is None:
title = chart_settings['sub_mode']
else:
title = f"{chart_settings['mode']} - {chart_settings['sub_mode']}"
# NOTE: send dataframe to chart maker
df, modes = cls.prep_df(ctx=ctx, df=df)
fig = IridaFigure(df=df, ytitle=title, modes=modes, parent=parent,
settings=chart_settings)
return report, fig
@classmethod
def convert_data_list_to_df(cls, input_df: list[dict], sub_mode) -> DataFrame:
"""
Convert list of control records to dataframe
Args:
input_df (list[dict]): list of dictionaries containing records
sub_mode (str | None, optional): sub_type of run type. Defaults to None.
Returns:
DataFrame: dataframe of controls
"""
df = DataFrame.from_records(input_df)
safe = ['name', 'submitted_date', 'genus', 'target']
for column in df.columns:
if column not in safe:
if sub_mode is not None and column != sub_mode:
continue
else:
safe.append(column)
if "percent" in column:
try:
count_col = next(item for item in df.columns if "count" in item)
except StopIteration:
continue
# NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating.
df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
df = df[[c for c in df.columns if c in safe]]
# NOTE: move date of sample submitted on same date as previous ahead one.
df = cls.displace_date(df=df)
# NOTE: ad hoc method to make data labels more accurate.
df = cls.df_column_renamer(df=df)
return df
@classmethod
def df_column_renamer(cls, df: DataFrame) -> DataFrame:
"""
Ad hoc function I created to clarify some fields
Args:
df (DataFrame): input dataframe
Returns:
DataFrame: dataframe with 'clarified' column names
"""
df = df[df.columns.drop(list(df.filter(regex='_hashes')))]
return df.rename(columns={
"contains_ratio": "contains_shared_hashes_ratio",
"matches_ratio": "matches_shared_hashes_ratio",
"kraken_count": "kraken2_read_count_(top_50)",
"kraken_percent": "kraken2_read_percent_(top_50)"
})
@classmethod
def displace_date(cls, df: DataFrame) -> DataFrame:
"""
This function serves to split samples that were submitted on the same date by incrementing dates.
It will shift the date forward by one day if it is the same day as an existing date in a list.
Args:
df (DataFrame): input dataframe composed of control records
Returns:
DataFrame: output dataframe with dates incremented.
"""
# NOTE: get submitted dates for each control
dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in
sorted(df['name'].unique())]
previous_dates = set()
for item in dict_list:
df, previous_dates = cls.check_date(df=df, item=item, previous_dates=previous_dates)
return df
@classmethod
def check_date(cls, df: DataFrame, item: dict, previous_dates: set) -> Tuple[DataFrame, list]:
"""
Checks if an items date is already present in df and adjusts df accordingly
Args:
df (DataFrame): input dataframe
item (dict): control for checking
previous_dates (list): list of dates found in previous controls
Returns:
Tuple[DataFrame, list]: Output dataframe and appended list of previous dates
"""
try:
check = item['date'] in previous_dates
except IndexError:
check = False
previous_dates.add(item['date'])
if check:
# NOTE: get df locations where name == item name
mask = df['name'] == item['name']
# NOTE: increment date in dataframe
df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
item['date'] += timedelta(days=1)
passed = False
else:
passed = True
# NOTE: if run didn't lead to changed date, return values
if passed:
return df, previous_dates
# NOTE: if date was changed, rerun with new date
else:
logger.warning(f"Date check failed, running recursion.")
df, previous_dates = cls.check_date(df, item, previous_dates)
return df, previous_dates
@classmethod
def prep_df(cls, ctx: Settings, df: DataFrame) -> Tuple[DataFrame | None, list]:
"""
Constructs figures based on parsed pandas dataframe.
Args:
ctx (Settings): settings passed down from gui
df (pd.DataFrame): input dataframe
ytitle (str | None, optional): title for the y-axis. Defaults to None.
Returns:
Figure: Plotly figure
"""
# NOTE: converts starred genera to normal and splits off list of starred
if df.empty:
return None, []
df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"})
df['genera'] = [item[-1] if item and item[-1] == "*" else "" for item in df['genus'].to_list()]
# NOTE: remove original runs, using reruns if applicable
df = cls.drop_reruns_from_df(ctx=ctx, df=df)
# NOTE: sort by and exclude from
sorts = ['submitted_date', "target", "genus"]
exclude = ['name', 'genera']
modes = [item for item in df.columns if item not in sorts and item not in exclude]
# NOTE: Set descending for any columns that have "{mode}" in the header.
ascending = [False if item == "target" else True for item in sorts]
df = df.sort_values(by=sorts, ascending=ascending)
# NOTE: actual chart construction is done by
return df, modes
@classmethod
def drop_reruns_from_df(cls, ctx: Settings, df: DataFrame) -> DataFrame:
"""
Removes semi-duplicates from dataframe after finding sequencing repeats.
Args:
ctx (Settings): settings passed from gui
df (DataFrame): initial dataframe
Returns:
DataFrame: dataframe with originals removed in favour of repeats.
"""
if 'rerun_regex' in ctx.model_extra:
sample_names = get_unique_values_in_df_column(df, column_name="name")
rerun_regex = re.compile(fr"{ctx.rerun_regex}")
exclude = [re.sub(rerun_regex, "", sample) for sample in sample_names if rerun_regex.search(sample)]
df = df[~df.name.isin(exclude)]
return df
def to_pydantic(self) -> "PydIridaControl":
"""
Constructs a pydantic version of this object.
Returns:
PydIridaControl: This object as a pydantic model.
"""
from backend.validators import PydIridaControl
return PydIridaControl(**self.__dict__)
@property
def is_positive_control(self):
return not self.subtype.lower().startswith("en")
# class PCRControl(Control):
# """
# Class made to hold info from Design & Analysis software.
# """
#
# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
# subtype = Column(String(16)) #: PC or NC
# target = Column(String(16)) #: N1, N2, etc.
# ct = Column(FLOAT) #: PCR result
# reagent_lot = Column(String(64), ForeignKey("_reagent.lot", ondelete="SET NULL",
# name="fk_reagent_lot"))
# reagent = relationship("Reagent", foreign_keys=reagent_lot) #: reagent used for this control
#
# __mapper_args__ = dict(polymorphic_identity="PCR Control",
# polymorphic_load="inline",
# inherit_condition=(id == Control.id))
#
# def to_sub_dict(self) -> dict:
# """
# Creates dictionary of fields for this object.
#
# Returns:
# dict: Output dict of name, ct, subtype, target, reagent_lot and submitted_date
# """
# return dict(
# name=self.name,
# ct=self.ct,
# subtype=self.subtype,
# target=self.target,
# reagent_lot=self.reagent_lot,
# submitted_date=self.submitted_date.date()
# )
#
# @classmethod
# @report_result
# def make_chart(cls, parent, chart_settings: dict, ctx: Settings) -> Tuple[Report, "PCRFigure"]:
# """
# Creates a PCRFigure. Overrides parent
#
# Args:
# parent (__type__): Widget to contain the chart.
# chart_settings (dict): settings passed down from chart widget
# ctx (Settings): settings passed down from gui. Not used here.
#
# Returns:
# Tuple[Report, "PCRFigure"]: Report of status and resulting figure.
# """
# from frontend.visualizations.pcr_charts import PCRFigure
# parent.mode_typer.clear()
# parent.mode_typer.setEnabled(False)
# report = Report()
# control = cls.query(proceduretype=chart_settings['submissiontype'], start_date=chart_settings['start_date'],
# end_date=chart_settings['end_date'])
# data = [control.to_sub_dict() for control in control]
# df = DataFrame.from_records(data)
# # NOTE: Get all PCR control with ct over 0
# try:
# df = df[df.ct > 0.0]
# except AttributeError:
# df = df
# fig = PCRFigure(df=df, modes=[], settings=chart_settings)
# return report, fig
#
# def to_pydantic(self):
# from backend.validators import PydPCRControl
# return PydPCRControl(**self.to_sub_dict(),
# controltype_name=self.controltype_name,
# clientsubmission_id=self.clientsubmission_id)
#
#
# class IridaControl(Control):
# subtyping_allowed = ['kraken']
#
# id = Column(INTEGER, ForeignKey('_control.id'), primary_key=True)
# contains = Column(JSON) #: unstructured hashes in contains.tsv for each organism
# matches = Column(JSON) #: unstructured hashes in matches.tsv for each organism
# kraken = Column(JSON) #: unstructured output from kraken_report
# subtype = Column(String(16), nullable=False) #: EN-NOS, MCS-NOS, etc
# refseq_version = Column(String(16)) #: version of refseq used in fastq parsing
# kraken2_version = Column(String(16)) #: version of kraken2 used in fastq parsing
# kraken2_db_version = Column(String(32)) #: folder name of kraken2 db
# sample_id = Column(INTEGER,
# ForeignKey("_basicsample.id", ondelete="SET NULL", name="cont_BCS_id")) #: sample id key
#
# __mapper_args__ = dict(polymorphic_identity="Irida Control",
# polymorphic_load="inline",
# inherit_condition=(id == Control.id))
#
# @property
# def targets(self):
# if self.controltype.targets:
# return list(itertools.chain.from_iterable([value for key, value in self.controltype.targets.items()
# if key == self.subtype]))
# else:
# return ["None"]
#
# @validates("subtype")
# def enforce_subtype_literals(self, key: str, value: str) -> str:
# """
# Validates submissiontype field with acceptable values
#
# Args:
# key (str): Field name
# value (str): Field Value
#
# Raises:
# KeyError: Raised if value is not in the acceptable list.
#
# Returns:
# str: Validated string.
# """
# acceptables = ['ATCC49226', 'ATCC49619', 'EN-NOS', "EN-SSTI", "MCS-NOS", "MCS-SSTI", "SN-NOS", "SN-SSTI"]
# if value.upper() not in acceptables:
# raise KeyError(f"Sub-type must be in {acceptables}")
# return value
#
# def to_sub_dict(self) -> dict:
# """
# Converts object into convenient dictionary for use in procedure summary
#
# Returns:
# dict: output dictionary containing: Name, Type, Targets, Top Kraken results
# """
# try:
# kraken = self.kraken
# except TypeError:
# kraken = {}
# try:
# kraken_cnt_total = sum([item['kraken_count'] for item in kraken.values()])
# except AttributeError:
# kraken_cnt_total = 0
# try:
# new_kraken = [dict(name=key, kraken_count=value['kraken_count'],
# kraken_percent=f"{value['kraken_count'] / kraken_cnt_total:0.2%}",
# target=key in self.controltype.targets)
# for key, value in kraken.items()]
# new_kraken = sorted(new_kraken, key=itemgetter('kraken_count'), reverse=True)[0:10]
# except (AttributeError, ZeroDivisionError):
# new_kraken = []
# output = dict(
# name=self.name,
# type=self.controltype.name,
# targets=", ".join(self.targets),
# kraken=new_kraken
# )
# return output
#
# def convert_by_mode(self, control_sub_type: str, mode: Literal['kraken', 'matches', 'contains'],
# consolidate: bool = False) -> Generator[dict, None, None]:
# """
# split this instance into analysis types ('kraken', 'matches', 'contains') for control graphs
#
# Args:
# consolidate (bool): whether to merge all off-target genera. Defaults to False
# control_sub_type (str): control subtype, 'MCS-NOS', etc.
# mode (Literal['kraken', 'matches', 'contains']): analysis type, 'contains', etc.
#
# Returns:
# List[dict]: list of records
# """
# try:
# data = self.__getattribute__(mode)
# except TypeError:
# data = {}
# if data is None:
# data = {}
# # NOTE: Data truncation and consolidation.
# if "kraken" in mode:
# data = {k: v for k, v in sorted(data.items(), key=lambda d: d[1][f"{mode}_count"], reverse=True)[:50]}
# else:
# if consolidate:
# on_tar = {k: v for k, v in data.items() if k.strip("*") in self.controltype.targets[control_sub_type]}
# off_tar = sum(v[f'{mode}_ratio'] for k, v in data.items() if
# k.strip("*") not in self.controltype.targets[control_sub_type])
# on_tar['Off-target'] = {f"{mode}_ratio": off_tar}
# data = on_tar
# for genus in data:
# _dict = dict(
# name=self.name,
# submitted_date=self.submitted_date,
# genus=genus,
# target='Target' if genus.strip("*") in self.controltype.targets[control_sub_type] else "Off-target"
# )
# for key in data[genus]:
# _dict[key] = data[genus][key]
# yield _dict
#
# @classproperty
# def modes(cls) -> List[str]:
# """
# Get all control modes from database
#
# Returns:
# List[str]: List of control mode names.
# """
# try:
# cols = [item.name for item in list(cls.__table__.columns) if isinstance(item.type, JSON)]
# except AttributeError as e:
# logger.error(f"Failed to get available modes from db: {e}")
# cols = []
# return cols
#
# @classmethod
# def make_parent_buttons(cls, parent: QWidget) -> None:
# """
# Creates buttons for controlling
#
# Args:
# parent (QWidget): chart holding widget to add buttons to.
#
# """
# super().make_parent_buttons(parent=parent)
# rows = parent.layout.rowCount() - 2
# # NOTE: check box for consolidating off-target items
# checker = QCheckBox(parent)
# checker.setChecked(True)
# checker.setObjectName("irida_check")
# checker.setToolTip("Pools off-target genera to save time.")
# parent.layout.addWidget(QLabel("Consolidate Off-targets"), rows, 0, 1, 1)
# parent.layout.addWidget(checker, rows, 1, 1, 2)
# checker.checkStateChanged.connect(parent.update_data)
#
# @classmethod
# @report_result
# def make_chart(cls, chart_settings: dict, parent, ctx) -> Tuple[Report, "IridaFigure" | None]:
# """
# Creates a IridaFigure. Overrides parent
#
# Args:
# parent (__type__): Widget to contain the chart.
# chart_settings (dict): settings passed down from chart widget
# ctx (Settings): settings passed down from gui.
#
# Returns:
# Tuple[Report, "IridaFigure"]: Report of status and resulting figure.
# """
# from frontend.visualizations import IridaFigure
# try:
# checker = parent.findChild(QCheckBox, name="irida_check")
# if chart_settings['mode'] == "kraken":
# checker.setEnabled(False)
# checker.setChecked(False)
# else:
# checker.setEnabled(True)
# consolidate = checker.isChecked()
# except AttributeError:
# consolidate = False
# report = Report()
# control = cls.query(subtype=chart_settings['submissiontype'], start_date=chart_settings['start_date'],
# end_date=chart_settings['end_date'])
# if not control:
# report.add_result(Result(status="Critical", msg="No control found in given date range."))
# return report, None
# # NOTE: change each control to list of dictionaries
# data = [control.convert_by_mode(control_sub_type=chart_settings['submissiontype'], mode=chart_settings['mode'],
# consolidate=consolidate) for
# control in control]
# # NOTE: flatten data to one dimensional list
# # data = [item for sublist in data for item in sublist]
# data = flatten_list(data)
# if not data:
# report.add_result(Result(status="Critical", msg="No data found for control in given date range."))
# return report, None
# df = cls.convert_data_list_to_df(input_df=data, sub_mode=chart_settings['sub_mode'])
# if chart_settings['sub_mode'] is None:
# title = chart_settings['sub_mode']
# else:
# title = f"{chart_settings['mode']} - {chart_settings['sub_mode']}"
# # NOTE: send dataframe to chart maker
# df, modes = cls.prep_df(ctx=ctx, df=df)
# fig = IridaFigure(df=df, ytitle=title, modes=modes, parent=parent,
# settings=chart_settings)
# return report, fig
#
# @classmethod
# def convert_data_list_to_df(cls, input_df: list[dict], sub_mode) -> DataFrame:
# """
# Convert list of control records to dataframe
#
# Args:
# input_df (list[dict]): list of dictionaries containing records
# sub_mode (str | None, optional): submissiontype of procedure type. Defaults to None.
#
# Returns:
# DataFrame: dataframe of control
# """
# df = DataFrame.from_records(input_df)
# safe = ['name', 'submitted_date', 'genus', 'target']
# for column in df.columns:
# if column not in safe:
# if sub_mode is not None and column != sub_mode:
# continue
# else:
# safe.append(column)
# if "percent" in column:
# try:
# count_col = next(item for item in df.columns if "count" in item)
# except StopIteration:
# continue
# # NOTE: The actual percentage from kraken was off due to exclusion of NaN, recalculating.
# df[column] = 100 * df[count_col] / df.groupby('name')[count_col].transform('sum')
# df = df[[c for c in df.columns if c in safe]]
# # NOTE: move date of sample submitted on same date as previous ahead one.
# df = cls.displace_date(df=df)
# # NOTE: ad hoc method to make data labels more accurate.
# df = cls.df_column_renamer(df=df)
# return df
#
# @classmethod
# def df_column_renamer(cls, df: DataFrame) -> DataFrame:
# """
# Ad hoc function I created to clarify some fields
#
# Args:
# df (DataFrame): input dataframe
#
# Returns:
# DataFrame: dataframe with 'clarified' column names
# """
# df = df[df.columns.drop(list(df.filter(regex='_hashes')))]
# return df.rename(columns={
# "contains_ratio": "contains_shared_hashes_ratio",
# "matches_ratio": "matches_shared_hashes_ratio",
# "kraken_count": "kraken2_read_count_(top_50)",
# "kraken_percent": "kraken2_read_percent_(top_50)"
# })
#
# @classmethod
# def displace_date(cls, df: DataFrame) -> DataFrame:
# """
# This function serves to split sample that were submitted on the same date by incrementing dates.
# It will shift the date forward by one day if it is the same day as an existing date in a list.
#
# Args:
# df (DataFrame): input dataframe composed of control records
#
# Returns:
# DataFrame: output dataframe with dates incremented.
# """
# # NOTE: get submitted dates for each control
# dict_list = [dict(name=item, date=df[df.name == item].iloc[0]['submitted_date']) for item in
# sorted(df['name'].unique())]
# previous_dates = set()
# for item in dict_list:
# df, previous_dates = cls.check_date(df=df, item=item, previous_dates=previous_dates)
# return df
#
# @classmethod
# def check_date(cls, df: DataFrame, item: dict, previous_dates: set) -> Tuple[DataFrame, list]:
# """
# Checks if an items date is already present in df and adjusts df accordingly
#
# Args:
# df (DataFrame): input dataframe
# item (dict): control for checking
# previous_dates (list): list of dates found in previous control
#
# Returns:
# Tuple[DataFrame, list]: Output dataframe and appended list of previous dates
# """
# try:
# check = item['date'] in previous_dates
# except IndexError:
# check = False
# previous_dates.add(item['date'])
# if check:
# # NOTE: get df locations where name == item name
# mask = df['name'] == item['name']
# # NOTE: increment date in dataframe
# df.loc[mask, 'submitted_date'] = df.loc[mask, 'submitted_date'].apply(lambda x: x + timedelta(days=1))
# item['date'] += timedelta(days=1)
# passed = False
# else:
# passed = True
# # NOTE: if procedure didn't lead to changed date, return values
# if passed:
# return df, previous_dates
# # NOTE: if date was changed, rerun with new date
# else:
# logger.warning(f"Date check failed, running recursion.")
# df, previous_dates = cls.check_date(df, item, previous_dates)
# return df, previous_dates
#
# @classmethod
# def prep_df(cls, ctx: Settings, df: DataFrame) -> Tuple[DataFrame | None, list]:
# """
# Constructs figures based on parsed pandas dataframe.
#
# Args:
# ctx (Settings): settings passed down from gui
# df (pd.DataFrame): input dataframe
# ytitle (str | None, optional): title for the y-axis. Defaults to None.
#
# Returns:
# Figure: Plotly figure
# """
# # NOTE: converts starred genera to normal and splits off list of starred
# if df.empty:
# return None, []
# df['genus'] = df['genus'].replace({'\*': ''}, regex=True).replace({"NaN": "Unknown"})
# df['genera'] = [item[-1] if item and item[-1] == "*" else "" for item in df['genus'].to_list()]
# # NOTE: remove original run, using reruns if applicable
# df = cls.drop_reruns_from_df(ctx=ctx, df=df)
# # NOTE: sort by and exclude from
# sorts = ['submitted_date', "target", "genus"]
# exclude = ['name', 'genera']
# modes = [item for item in df.columns if item not in sorts and item not in exclude]
# # NOTE: Set descending for any columns that have "{mode}" in the header.
# ascending = [False if item == "target" else True for item in sorts]
# df = df.sort_values(by=sorts, ascending=ascending)
# # NOTE: actual chart construction is done by
# return df, modes
#
# @classmethod
# def drop_reruns_from_df(cls, ctx: Settings, df: DataFrame) -> DataFrame:
# """
# Removes semi-duplicates from dataframe after finding sequencing repeats.
#
# Args:
# ctx (Settings): settings passed from gui
# df (DataFrame): initial dataframe
#
# Returns:
# DataFrame: dataframe with originals removed in favour of repeats.
# """
# if 'rerun_regex' in ctx.model_extra:
# sample_names = get_unique_values_in_df_column(df, column_name="name")
# rerun_regex = re.compile(fr"{ctx.rerun_regex}")
# exclude = [re.sub(rerun_regex, "", sample) for sample in sample_names if rerun_regex.search(sample)]
# df = df[~df.name.isin(exclude)]
# return df
#
# def to_pydantic(self) -> "PydIridaControl":
# """
# Constructs a pydantic version of this object.
#
# Returns:
# PydIridaControl: This object as a pydantic model.
# """
# from backend.validators import PydIridaControl
# return PydIridaControl(**self.__dict__)
#
# @property
# def is_positive_control(self):
# return not self.subtype.lower().startswith("en")

File diff suppressed because it is too large Load Diff

View File

@@ -14,32 +14,27 @@ from typing import List, Tuple
logger = logging.getLogger(f"submissions.{__name__}")
# table containing organization/contact relationship
orgs_contacts = Table(
"_orgs_contacts",
# table containing clientlab/contact relationship
clientlab_contact = Table(
"_clientlab_contact",
Base.metadata,
Column("org_id", INTEGER, ForeignKey("_organization.id")),
Column("clientlab_id", INTEGER, ForeignKey("_clientlab.id")),
Column("contact_id", INTEGER, ForeignKey("_contact.id")),
extend_existing=True
)
class Organization(BaseClass):
class ClientLab(BaseClass):
"""
Base of organization
Base of clientlab
"""
id = Column(INTEGER, primary_key=True) #: primary key
name = Column(String(64)) #: organization name
submissions = relationship("ClientSubmission",
back_populates="submitting_lab") #: submissions this organization has submitted
name = Column(String(64)) #: clientlab name
clientsubmission = relationship("ClientSubmission", back_populates="clientlab") #: procedure this clientlab has submitted
cost_centre = Column(String()) #: cost centre used by org for payment
contacts = relationship("Contact", back_populates="organization",
secondary=orgs_contacts) #: contacts involved with this org
@hybrid_property
def contact(self):
return self.contacts
contact = relationship("Contact", back_populates="clientlab",
secondary=clientlab_contact) #: contact involved with this org
@classmethod
@setup_lookup
@@ -47,16 +42,16 @@ class Organization(BaseClass):
id: int | None = None,
name: str | None = None,
limit: int = 0,
) -> Organization | List[Organization]:
) -> ClientLab | List[ClientLab]:
"""
Lookup organizations in the database by a number of parameters.
Lookup clientlabs in the database by a number of parameters.
Args:
name (str | None, optional): Name of the organization. Defaults to None.
name (str | None, optional): Name of the clientlab. Defaults to None.
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
Returns:
Organization|List[Organization]:
ClientLab|List[ClientLab]:
"""
query: Query = cls.__database_session__.query(cls)
match id:
@@ -89,7 +84,7 @@ class Organization(BaseClass):
name = "NA"
return OmniOrganization(instance_object=self,
name=name, cost_centre=cost_centre,
contact=[item.to_omni() for item in self.contacts])
contact=[item.to_omni() for item in self.contact])
class Contact(BaseClass):
@@ -101,27 +96,27 @@ class Contact(BaseClass):
name = Column(String(64)) #: contact name
email = Column(String(64)) #: contact email
phone = Column(String(32)) #: contact phone number
organization = relationship("Organization", back_populates="contacts", uselist=True,
secondary=orgs_contacts) #: relationship to joined organization
submissions = relationship("ClientSubmission", back_populates="contact") #: submissions this contact has submitted
clientlab = relationship("ClientLab", back_populates="contact", uselist=True,
secondary=clientlab_contact) #: relationship to joined clientlab
clientsubmission = relationship("ClientSubmission", back_populates="contact") #: procedure this contact has submitted
@classproperty
def searchables(cls):
return []
@classmethod
def query_or_create(cls, **kwargs) -> Tuple[Contact, bool]:
new = False
disallowed = []
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
instance = cls.query(**sanitized_kwargs)
if not instance or isinstance(instance, list):
instance = cls()
new = True
for k, v in sanitized_kwargs.items():
setattr(instance, k, v)
logger.info(f"Instance from contact query or create: {instance}")
return instance, new
# @classmethod
# def query_or_create(cls, **kwargs) -> Tuple[Contact, bool]:
# new = False
# disallowed = []
# sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
# instance = cls.query(**sanitized_kwargs)
# if not instance or isinstance(instance, list):
# instance = cls()
# new = True
# for k, v in sanitized_kwargs.items():
# setattr(instance, k, v)
# logger.info(f"Instance from contact query or create: {instance}")
# return instance, new
@classmethod
@setup_lookup
@@ -133,7 +128,7 @@ class Contact(BaseClass):
limit: int = 0,
) -> Contact | List[Contact]:
"""
Lookup contacts in the database by a number of parameters.
Lookup contact in the database by a number of parameters.
Args:
name (str | None, optional): Name of the contact. Defaults to None.

File diff suppressed because it is too large Load Diff

View File

@@ -3,6 +3,6 @@ Contains pandas and openpyxl convenience functions for interacting with excel wo
'''
from .parser import *
from .submission_parser import *
from backend.excel.parsers.submission_parser import *
from .reports import *
from .writer import *

View File

@@ -1,5 +1,5 @@
"""
contains parser objects for pulling values from client generated run sheets.
contains clientsubmissionparser objects for pulling values from client generated procedure sheets.
"""
import logging
from copy import copy
@@ -42,11 +42,11 @@ class SheetParser(object):
raise FileNotFoundError(f"Couldn't parse file {self.filepath}")
self.sub = OrderedDict()
# NOTE: make decision about type of sample we have
self.sub['submission_type'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath),
self.sub['proceduretype'] = dict(value=RSLNamer.retrieve_submission_type(filename=self.filepath),
missing=True)
self.submission_type = SubmissionType.query(name=self.sub['submission_type'])
self.submission_type = SubmissionType.query(name=self.sub['proceduretype'])
self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
# NOTE: grab the info map from the run type in database
# NOTE: grab the info map from the procedure type in database
self.parse_info()
self.import_kit_validation_check()
self.parse_reagents()
@@ -60,19 +60,19 @@ class SheetParser(object):
"""
parser = InfoParser(xl=self.xl, submission_type=self.submission_type, sub_object=self.sub_object)
self.info_map = parser.info_map
# NOTE: in order to accommodate generic run types we have to check for the type in the excel sheet and rerun accordingly
# NOTE: in order to accommodate generic procedure types we have to check for the type in the excel sheet and rerun accordingly
try:
check = parser.parsed_info['submission_type']['value'] not in [None, "None", "", " "]
check = parser.parsed_info['proceduretype']['value'] not in [None, "None", "", " "]
except KeyError as e:
logger.error(f"Couldn't check run type due to KeyError: {e}")
logger.error(f"Couldn't check procedure type due to KeyError: {e}")
return
logger.info(
f"Checking for updated run type: {self.submission_type.name} against new: {parser.parsed_info['submission_type']['value']}")
if self.submission_type.name != parser.parsed_info['submission_type']['value']:
f"Checking for updated procedure type: {self.submission_type.name} against new: {parser.parsed_info['proceduretype']['value']}")
if self.submission_type.name != parser.parsed_info['proceduretype']['value']:
if check:
# NOTE: If initial run type doesn't match parsed run type, defer to parsed run type.
self.submission_type = SubmissionType.query(name=parser.parsed_info['submission_type']['value'])
logger.info(f"Updated self.submission_type to {self.submission_type}. Rerunning parse.")
# NOTE: If initial procedure type doesn't match parsed procedure type, defer to parsed procedure type.
self.submission_type = SubmissionType.query(name=parser.parsed_info['proceduretype']['value'])
logger.info(f"Updated self.proceduretype to {self.submission_type}. Rerunning parse.")
self.parse_info()
else:
self.submission_type = RSLNamer.retrieve_submission_type(filename=self.filepath)
@@ -82,53 +82,53 @@ class SheetParser(object):
def parse_reagents(self, extraction_kit: str | None = None):
"""
Calls reagent parser class to pull info from the excel sheet
Calls reagent clientsubmissionparser class to pull info from the excel sheet
Args:
extraction_kit (str | None, optional): Relevant extraction kit for reagent map. Defaults to None.
extraction_kit (str | None, optional): Relevant extraction kittype for reagent map. Defaults to None.
"""
if extraction_kit is None:
extraction_kit = self.sub['extraction_kit']
extraction_kit = self.sub['kittype']
parser = ReagentParser(xl=self.xl, submission_type=self.submission_type,
extraction_kit=extraction_kit)
self.sub['reagents'] = parser.parsed_reagents
def parse_samples(self):
"""
Calls sample parser to pull info from the excel sheet
Calls sample clientsubmissionparser to pull info from the excel sheet
"""
parser = SampleParser(xl=self.xl, submission_type=self.submission_type)
self.sub['samples'] = parser.parsed_samples
self.sub['sample'] = parser.parsed_samples
def parse_equipment(self):
"""
Calls equipment parser to pull info from the excel sheet
Calls equipment clientsubmissionparser to pull info from the excel sheet
"""
parser = EquipmentParser(xl=self.xl, submission_type=self.submission_type)
self.sub['equipment'] = parser.parsed_equipment
def parse_tips(self):
"""
Calls tips parser to pull info from the excel sheet
Calls tips clientsubmissionparser to pull info from the excel sheet
"""
parser = TipParser(xl=self.xl, submission_type=self.submission_type)
self.sub['tips'] = parser.parsed_tips
def import_kit_validation_check(self):
"""
Enforce that the parser has an extraction kit
Enforce that the clientsubmissionparser has an extraction kittype
"""
if 'extraction_kit' not in self.sub.keys() or not check_not_nan(self.sub['extraction_kit']['value']):
if 'kittype' not in self.sub.keys() or not check_not_nan(self.sub['kittype']['value']):
from frontend.widgets.pop_ups import ObjectSelector
dlg = ObjectSelector(title="Kit Needed", message="At minimum a kit is needed. Please select one.",
dlg = ObjectSelector(title="Kit Needed", message="At minimum a kittype is needed. Please select one.",
obj_type=KitType)
if dlg.exec():
self.sub['extraction_kit'] = dict(value=dlg.parse_form(), missing=True)
self.sub['kittype'] = dict(value=dlg.parse_form(), missing=True)
else:
raise ValueError("Extraction kit needed.")
raise ValueError("Extraction kittype needed.")
else:
if isinstance(self.sub['extraction_kit'], str):
self.sub['extraction_kit'] = dict(value=self.sub['extraction_kit'], missing=True)
if isinstance(self.sub['kittype'], str):
self.sub['kittype'] = dict(value=self.sub['kittype'], missing=True)
def to_pydantic(self) -> PydSubmission:
"""
@@ -145,17 +145,17 @@ class InfoParser(object):
Object to parse generic info from excel sheet.
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, sub_object: BasicRun | None = None):
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, sub_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sub_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if sub_object is None:
sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
self.submission_type_obj = submission_type
self.submission_type = dict(value=self.submission_type_obj.name, missing=True)
self.sub_object = sub_object
@@ -164,12 +164,12 @@ class InfoParser(object):
@property
def info_map(self) -> dict:
"""
Gets location of basic info from the submission_type object in the database.
Gets location of basic info from the proceduretype object in the database.
Returns:
dict: Location map of all info for this run type
dict: Location map of all info for this procedure type
"""
# NOTE: Get the parse_info method from the run type specified
# NOTE: Get the parse_info method from the procedure type specified
return self.sub_object.construct_info_map(submission_type=self.submission_type_obj, mode="read")
@property
@@ -186,7 +186,7 @@ class InfoParser(object):
ws = self.xl[sheet]
relevant = []
for k, v in self.info_map.items():
# NOTE: If the value is hardcoded put it in the dictionary directly. Ex. Artic kit
# NOTE: If the value is hardcoded put it in the dictionary directly. Ex. Artic kittype
if k == "custom":
continue
if isinstance(v, str):
@@ -210,7 +210,7 @@ class InfoParser(object):
# NOTE: Get cell contents at this location
value = ws.cell(row=item['row'], column=item['column']).value
match item['name']:
case "submission_type":
case "proceduretype":
value, missing = is_missing(value)
value = value.title()
case "submitted_date":
@@ -232,7 +232,7 @@ class InfoParser(object):
dicto[item['name']] = dict(value=value, missing=missing)
except (KeyError, IndexError):
continue
# NOTE: Return after running the parser components held in run object.
# NOTE: Return after running the clientsubmissionparser components held in procedure object.
return self.sub_object.custom_info_parser(input_dict=dicto, xl=self.xl, custom_fields=self.info_map['custom'])
@@ -242,12 +242,12 @@ class ReagentParser(object):
"""
def __init__(self, xl: Workbook, submission_type: str | SubmissionType, extraction_kit: str,
run_object: BasicRun | None = None):
run_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str|SubmissionType): Type of run expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (str): Extraction kit used.
submission_type (str|SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (str): Extraction kittype used.
run_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
if isinstance(submission_type, str):
@@ -264,15 +264,16 @@ class ReagentParser(object):
@property
def kit_map(self) -> dict:
"""
Gets location of kit reagents from database
Gets location of kittype reagents from database
Args:
submission_type (str): Name of run type.
proceduretype (str): Name of procedure type.
Returns:
dict: locations of reagent info for the kit.
dict: locations of reagent info for the kittype.
"""
associations, self.kit_object = self.kit_object.construct_xl_map_for_use(submission_type=self.submission_type_obj)
associations, self.kit_object = self.kit_object.construct_xl_map_for_use(
proceduretype=self.submission_type_obj)
reagent_map = {k: v for k, v in associations.items() if k != 'info'}
try:
del reagent_map['info']
@@ -323,16 +324,16 @@ class ReagentParser(object):
class SampleParser(object):
"""
Object to pull data for samples in excel sheet and construct individual sample objects
Object to pull data for sample in excel sheet and construct individual sample objects
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType, sample_map: dict | None = None,
sub_object: BasicRun | None = None) -> None:
sub_object: Run | None = None) -> None:
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType): Type of run expected (Wastewater, Bacterial Culture, etc.)
sample_map (dict | None, optional): Locations in database where samples are found. Defaults to None.
submission_type (SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sample_map (dict | None, optional): Locations in database where sample are found. Defaults to None.
sub_object (BasicRun | None, optional): Submission object holding methods. Defaults to None.
"""
self.samples = []
@@ -343,19 +344,19 @@ class SampleParser(object):
self.submission_type_obj = submission_type
if sub_object is None:
logger.warning(
f"Sample parser attempting to fetch run class with polymorphic identity: {self.submission_type}")
sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
f"Sample clientsubmissionparser attempting to fetch procedure class with polymorphic identity: {self.submission_type}")
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.sub_object = sub_object
self.sample_type = self.sub_object.get_default_info("sample_type", submission_type=submission_type)
self.samp_object = BasicSample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)
self.sample_type = self.sub_object.get_default_info("sampletype", submission_type=submission_type)
self.samp_object = Sample.find_polymorphic_subclass(polymorphic_identity=self.sample_type)
@property
def sample_map(self) -> dict:
"""
Gets info locations in excel book for run type.
Gets info locations in excel book for procedure type.
Args:
submission_type (str): run type
proceduretype (str): procedure type
Returns:
dict: Info locations.
@@ -381,7 +382,7 @@ class SampleParser(object):
if check_not_nan(id):
if id not in invalids:
sample_dict = dict(id=id, row=ii, column=jj)
sample_dict['sample_type'] = self.sample_type
sample_dict['sampletype'] = self.sample_type
plate_map_samples.append(sample_dict)
else:
pass
@@ -407,7 +408,7 @@ class SampleParser(object):
row_dict[lmap['merge_on_id']] = str(row_dict[lmap['merge_on_id']])
except KeyError:
pass
row_dict['sample_type'] = self.sample_type
row_dict['sampletype'] = self.sample_type
row_dict['submission_rank'] = ii
try:
check = check_not_nan(row_dict[lmap['merge_on_id']])
@@ -423,14 +424,14 @@ class SampleParser(object):
Merges sample info from lookup table and plate map.
Returns:
List[dict]: Reconciled samples
List[dict]: Reconciled sample
"""
if not self.plate_map_samples or not self.lookup_samples:
logger.warning(f"No separate samples")
logger.warning(f"No separate sample")
samples = self.lookup_samples or self.plate_map_samples
for new in samples:
if not check_key_or_attr(key='submitter_id', interest=new, check_none=True):
new['submitter_id'] = new['id']
if not check_key_or_attr(key='sample_id', interest=new, check_none=True):
new['sample_id'] = new['id']
new = self.sub_object.parse_samples(new)
try:
del new['id']
@@ -459,8 +460,8 @@ class SampleParser(object):
if lsample[merge_on_id] == psample['id']), (-1, psample))
if jj >= 0:
lookup_samples[jj] = {}
if not check_key_or_attr(key='submitter_id', interest=new, check_none=True):
new['submitter_id'] = psample['id']
if not check_key_or_attr(key='sample_id', interest=new, check_none=True):
new['sample_id'] = psample['id']
new = self.sub_object.parse_samples(new)
try:
del new['id']
@@ -478,7 +479,7 @@ class EquipmentParser(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
@@ -488,7 +489,7 @@ class EquipmentParser(object):
@property
def equipment_map(self) -> dict:
"""
Gets the map of equipment locations in the run type's spreadsheet
Gets the map of equipment locations in the procedure type's spreadsheet
Returns:
List[dict]: List of locations
@@ -556,7 +557,7 @@ class TipParser(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (str | SubmissionType): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (str | SubmissionType): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
@@ -566,7 +567,7 @@ class TipParser(object):
@property
def tip_map(self) -> dict:
"""
Gets the map of equipment locations in the run type's spreadsheet
Gets the map of equipment locations in the procedure type's spreadsheet
Returns:
List[dict]: List of locations
@@ -609,7 +610,7 @@ class TipParser(object):
class PCRParser(object):
"""Object to pull data from Design and Analysis PCR export file."""
def __init__(self, filepath: Path | None = None, submission: BasicRun | None = None) -> None:
def __init__(self, filepath: Path | None = None, submission: Run | None = None) -> None:
"""
Args:
filepath (Path | None, optional): file to parse. Defaults to None.
@@ -659,7 +660,7 @@ class PCRParser(object):
class ConcentrationParser(object):
def __init__(self, filepath: Path | None = None, run: BasicRun | None = None) -> None:
def __init__(self, filepath: Path | None = None, run: Run | None = None) -> None:
if filepath is None:
logger.error('No filepath given.')
self.xl = None
@@ -673,7 +674,7 @@ class ConcentrationParser(object):
logger.error(f"Couldn't get permissions for {filepath.__str__()}. Operation might have been cancelled.")
return None
if run is None:
self.submission_obj = BasicRun()
self.submission_obj = Run()
rsl_plate_num = None
else:
self.submission_obj = run

View File

@@ -7,7 +7,7 @@ from pandas import DataFrame, ExcelWriter
from pathlib import Path
from datetime import date
from typing import Tuple, List
from backend.db.models import BasicRun
from backend.db.models import Run
from tools import jinja_template_loading, get_first_blank_df_row, row_map, flatten_list
from PyQt6.QtWidgets import QWidget
from openpyxl.worksheet.worksheet import Worksheet
@@ -45,9 +45,9 @@ class ReportMaker(object):
self.start_date = start_date
self.end_date = end_date
# NOTE: Set page size to zero to override limiting query size.
self.runs = BasicRun.query(start_date=start_date, end_date=end_date, page_size=0)
self.runs = Run.query(start_date=start_date, end_date=end_date, page_size=0)
if organizations is not None:
self.runs = [run for run in self.runs if run.client_submission.submitting_lab.name in organizations]
self.runs = [run for run in self.runs if run.clientsubmission.clientlab.name in organizations]
self.detailed_df, self.summary_df = self.make_report_xlsx()
self.html = self.make_report_html(df=self.summary_df)
@@ -61,14 +61,14 @@ class ReportMaker(object):
if not self.runs:
return DataFrame(), DataFrame()
df = DataFrame.from_records([item.to_dict(report=True) for item in self.runs])
# NOTE: put submissions with the same lab together
df = df.sort_values("submitting_lab")
# NOTE: put procedure with the same lab together
df = df.sort_values("clientlab")
# NOTE: aggregate cost and sample count columns
df2 = df.groupby(["submitting_lab", "extraction_kit"]).agg(
{'extraction_kit': 'count', 'cost': 'sum', 'sample_count': 'sum'})
df2 = df2.rename(columns={"extraction_kit": 'run_count'})
df2 = df.groupby(["clientlab", "kittype"]).agg(
{'kittype': 'count', 'cost': 'sum', 'sample_count': 'sum'})
df2 = df2.rename(columns={"kittype": 'run_count'})
df = df.drop('id', axis=1)
df = df.sort_values(['submitting_lab', "started_date"])
df = df.sort_values(['clientlab', "started_date"])
return df, df2
def make_report_html(self, df: DataFrame) -> str:
@@ -156,19 +156,19 @@ class TurnaroundMaker(ReportArchetype):
self.start_date = start_date
self.end_date = end_date
# NOTE: Set page size to zero to override limiting query size.
self.subs = BasicRun.query(start_date=start_date, end_date=end_date,
submission_type_name=submission_type, page_size=0)
self.subs = Run.query(start_date=start_date, end_date=end_date,
submissiontype_name=submission_type, page_size=0)
records = [self.build_record(sub) for sub in self.subs]
self.df = DataFrame.from_records(records)
self.sheet_name = "Turnaround"
@classmethod
def build_record(cls, sub: BasicRun) -> dict:
def build_record(cls, sub: Run) -> dict:
"""
Build a turnaround dictionary from a run
Build a turnaround dictionary from a procedure
Args:
sub (BasicRun): The run to be processed.
sub (BasicRun): The procedure to be processed.
Returns:
@@ -203,9 +203,9 @@ class ConcentrationMaker(ReportArchetype):
self.start_date = start_date
self.end_date = end_date
# NOTE: Set page size to zero to override limiting query size.
self.subs = BasicRun.query(start_date=start_date, end_date=end_date,
submission_type_name=submission_type, page_size=0)
# self.samples = flatten_list([sub.get_provisional_controls(controls_only=controls_only) for sub in self.runs])
self.subs = Run.query(start_date=start_date, end_date=end_date,
submissiontype_name=submission_type, page_size=0)
# self.sample = flatten_list([sub.get_provisional_controls(controls_only=controls_only) for sub in self.run])
self.samples = flatten_list([sub.get_provisional_controls(include=include) for sub in self.subs])
self.records = [self.build_record(sample) for sample in self.samples]
self.df = DataFrame.from_records(self.records)
@@ -214,9 +214,9 @@ class ConcentrationMaker(ReportArchetype):
@classmethod
def build_record(cls, control) -> dict:
regex = re.compile(r"^(ATCC)|(MCS)", flags=re.IGNORECASE)
if bool(regex.match(control.submitter_id)):
if bool(regex.match(control.sample_id)):
positive = "positive"
elif control.submitter_id.lower().startswith("en"):
elif control.sample_id.lower().startswith("en"):
positive = "negative"
else:
positive = "sample"
@@ -224,8 +224,8 @@ class ConcentrationMaker(ReportArchetype):
concentration = float(control.concentration)
except (TypeError, ValueError):
concentration = 0.0
return dict(name=control.submitter_id,
submission=str(control.submission), concentration=concentration,
return dict(name=control.sample_id,
submission=str(control.clientsubmission), concentration=concentration,
submitted_date=control.submitted_date, positive=positive)

View File

@@ -1,5 +1,5 @@
"""
contains writer objects for pushing values to run sheet templates.
contains writer objects for pushing values to procedure sheet templates.
"""
import logging
from copy import copy
@@ -8,7 +8,7 @@ from operator import itemgetter
from pprint import pformat
from typing import List, Generator, Tuple
from openpyxl import load_workbook, Workbook
from backend.db.models import SubmissionType, KitType, BasicRun
from backend.db.models import SubmissionType, KitType, Run
from backend.validators.pydant import PydSubmission
from io import BytesIO
from collections import OrderedDict
@@ -24,7 +24,7 @@ class SheetWriter(object):
def __init__(self, submission: PydSubmission):
"""
Args:
submission (PydSubmission): Object containing run information.
submission (PydSubmission): Object containing procedure information.
"""
self.sub = OrderedDict(submission.improved_dict())
# NOTE: Set values from pydantic object.
@@ -32,7 +32,7 @@ class SheetWriter(object):
match k:
case 'filepath':
self.__setattr__(k, v)
case 'submission_type':
case 'proceduretype':
self.sub[k] = v['value']
self.submission_type = SubmissionType.query(name=v['value'])
self.run_object = BasicRun.find_polymorphic_subclass(
@@ -58,7 +58,7 @@ class SheetWriter(object):
"""
Calls info writer
"""
disallowed = ['filepath', 'reagents', 'samples', 'equipment', 'controls']
disallowed = ['filepath', 'reagents', 'sample', 'equipment', 'control']
info_dict = {k: v for k, v in self.sub.items() if k not in disallowed}
writer = InfoWriter(xl=self.xl, submission_type=self.submission_type, info_dict=info_dict)
self.xl = writer.write_info()
@@ -69,14 +69,14 @@ class SheetWriter(object):
"""
reagent_list = self.sub['reagents']
writer = ReagentWriter(xl=self.xl, submission_type=self.submission_type,
extraction_kit=self.sub['extraction_kit'], reagent_list=reagent_list)
extraction_kit=self.sub['kittype'], reagent_list=reagent_list)
self.xl = writer.write_reagents()
def write_samples(self):
"""
Calls sample writer
"""
sample_list = self.sub['samples']
sample_list = self.sub['sample']
writer = SampleWriter(xl=self.xl, submission_type=self.submission_type, sample_list=sample_list)
self.xl = writer.write_samples()
@@ -99,22 +99,22 @@ class SheetWriter(object):
class InfoWriter(object):
"""
object to write general run info into excel file
object to write general procedure info into excel file
"""
def __init__(self, xl: Workbook, submission_type: SubmissionType | str, info_dict: dict,
sub_object: BasicRun | None = None):
sub_object: Run | None = None):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
info_dict (dict): Dictionary of information to write.
sub_object (BasicRun | None, optional): Submission object containing methods. Defaults to None.
"""
if isinstance(submission_type, str):
submission_type = SubmissionType.query(name=submission_type)
if sub_object is None:
sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
sub_object = Run.find_polymorphic_subclass(polymorphic_identity=submission_type.name)
self.submission_type = submission_type
self.sub_object = sub_object
self.xl = xl
@@ -196,8 +196,8 @@ class ReagentWriter(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of run expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (KitType | str): Extraction kit used.
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
extraction_kit (KitType | str): Extraction kittype used.
reagent_list (list): List of reagent dicts to be written to excel.
"""
self.xl = xl
@@ -208,7 +208,7 @@ class ReagentWriter(object):
extraction_kit = KitType.query(name=extraction_kit)
self.kit_object = extraction_kit
associations, self.kit_object = self.kit_object.construct_xl_map_for_use(
submission_type=self.submission_type_obj)
proceduretype=self.submission_type_obj)
reagent_map = {k: v for k, v in associations.items()}
self.reagents = self.reconcile_map(reagent_list=reagent_list, reagent_map=reagent_map)
@@ -223,13 +223,13 @@ class ReagentWriter(object):
Returns:
List[dict]: merged dictionary
"""
filled_roles = [item['role'] for item in reagent_list]
filled_roles = [item['reagentrole'] for item in reagent_list]
for map_obj in reagent_map.keys():
if map_obj not in filled_roles:
reagent_list.append(dict(name="Not Applicable", role=map_obj, lot="Not Applicable", expiry="Not Applicable"))
for reagent in reagent_list:
try:
mp_info = reagent_map[reagent['role']]
mp_info = reagent_map[reagent['reagentrole']]
except KeyError:
continue
placeholder = copy(reagent)
@@ -273,7 +273,7 @@ class SampleWriter(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
sample_list (list): List of sample dictionaries to be written to excel file.
"""
if isinstance(submission_type, str):
@@ -281,7 +281,7 @@ class SampleWriter(object):
self.submission_type = submission_type
self.xl = xl
self.sample_map = submission_type.sample_map['lookup_table']
# NOTE: exclude any samples without a run rank.
# NOTE: exclude any sample without a procedure rank.
samples = [item for item in self.reconcile_map(sample_list) if item['submission_rank'] > 0]
self.samples = sorted(samples, key=itemgetter('submission_rank'))
self.blank_lookup_table()
@@ -322,7 +322,7 @@ class SampleWriter(object):
Performs writing operations.
Returns:
Workbook: Workbook with samples written
Workbook: Workbook with sample written
"""
sheet = self.xl[self.sample_map['sheet']]
columns = self.sample_map['sample_columns']
@@ -351,7 +351,7 @@ class EquipmentWriter(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
equipment_list (list): List of equipment dictionaries to write to excel file.
"""
if isinstance(submission_type, str):
@@ -376,9 +376,9 @@ class EquipmentWriter(object):
return
for ii, equipment in enumerate(equipment_list, start=1):
try:
mp_info = equipment_map[equipment['role']]
mp_info = equipment_map[equipment['reagentrole']]
except KeyError:
logger.error(f"No {equipment['role']} in {pformat(equipment_map)}")
logger.error(f"No {equipment['reagentrole']} in {pformat(equipment_map)}")
mp_info = None
placeholder = copy(equipment)
if not mp_info:
@@ -433,7 +433,7 @@ class TipWriter(object):
"""
Args:
xl (Workbook): Openpyxl workbook from submitted excel file.
submission_type (SubmissionType | str): Type of run expected (Wastewater, Bacterial Culture, etc.)
submission_type (SubmissionType | str): Type of procedure expected (Wastewater, Bacterial Culture, etc.)
tips_list (list): List of tip dictionaries to write to the excel file.
"""
if isinstance(submission_type, str):

View File

@@ -5,7 +5,7 @@ import logging, re
import sys
from pathlib import Path
from openpyxl import load_workbook
from backend.db.models import BasicRun, SubmissionType
from backend.db.models import Run, SubmissionType
from tools import jinja_template_loading
from jinja2 import Template
from dateutil.parser import parse
@@ -25,22 +25,22 @@ class RSLNamer(object):
self.submission_type = submission_type
if not self.submission_type:
self.submission_type = self.retrieve_submission_type(filename=filename)
logger.info(f"got run type: {self.submission_type}")
logger.info(f"got procedure type: {self.submission_type}")
if self.submission_type:
self.sub_object = BasicRun.find_polymorphic_subclass(polymorphic_identity=self.submission_type)
self.parsed_name = self.retrieve_rsl_number(filename=filename, regex=self.sub_object.get_regex(
submission_type=submission_type))
if not data:
data = dict(submission_type=self.submission_type)
if "submission_type" not in data.keys():
data['submission_type'] = self.submission_type
if "proceduretype" not in data.keys():
data['proceduretype'] = self.submission_type
self.parsed_name = self.sub_object.enforce_name(instr=self.parsed_name, data=data)
logger.info(f"Parsed name: {self.parsed_name}")
@classmethod
def retrieve_submission_type(cls, filename: str | Path) -> str:
"""
Gets run type from excel file properties or sheet names or regex pattern match or user input
Gets procedure type from excel file properties or sheet names or regex pattern match or user input
Args:
filename (str | Path): filename
@@ -49,7 +49,7 @@ class RSLNamer(object):
TypeError: Raised if unsupported variable type for filename given.
Returns:
str: parsed run type
str: parsed procedure type
"""
def st_from_path(filepath: Path) -> str:
@@ -89,7 +89,7 @@ class RSLNamer(object):
sub_type = m.lastgroup
except AttributeError as e:
sub_type = None
logger.critical(f"No run type found or run type found!: {e}")
logger.critical(f"No procedure type found or procedure type found!: {e}")
return sub_type
match filename:
@@ -107,8 +107,8 @@ class RSLNamer(object):
if "pytest" in sys.modules:
raise ValueError("Submission Type came back as None.")
from frontend.widgets import ObjectSelector
dlg = ObjectSelector(title="Couldn't parse run type.",
message="Please select run type from list below.",
dlg = ObjectSelector(title="Couldn't parse procedure type.",
message="Please select procedure type from list below.",
obj_type=SubmissionType)
if dlg.exec():
submission_type = dlg.parse_form()
@@ -118,7 +118,7 @@ class RSLNamer(object):
@classmethod
def retrieve_rsl_number(cls, filename: str | Path, regex: re.Pattern | None = None):
"""
Uses regex to retrieve the plate number and run type from an input string
Uses regex to retrieve the plate number and procedure type from an input string
Args:
regex (str): string to construct pattern
@@ -145,14 +145,15 @@ class RSLNamer(object):
@classmethod
def construct_new_plate_name(cls, data: dict) -> str:
"""
Make a brand-new plate name from run data.
Make a brand-new plate name from procedure data.
Args:
data (dict): incoming run data
data (dict): incoming procedure data
Returns:
str: Output filename
"""
logger.debug(data)
if "submitted_date" in data.keys():
if isinstance(data['submitted_date'], dict):
if data['submitted_date']['value'] is not None:
@@ -163,14 +164,16 @@ class RSLNamer(object):
today = data['submitted_date']
else:
try:
today = re.search(r"\d{4}(_|-)?\d{2}(_|-)?\d{2}", data['rsl_plate_num'])
today = re.search(r"\d{4}(_|-)?\d{2}(_|-)?\d{2}", data['name'])
today = parse(today.group())
except (AttributeError, KeyError):
today = datetime.now()
if "rsl_plate_num" in data.keys():
plate_number = data['rsl_plate_num'].split("-")[-1][0]
if isinstance(today, str):
today = datetime.strptime(today, "%Y-%m-%d")
if "name" in data.keys():
plate_number = data['name'].split("-")[-1][0]
else:
previous = BasicRun.query(start_date=today, end_date=today, submissiontype=data['submission_type'])
previous = Run.query(start_date=today, end_date=today, submissiontype=data['submissiontype'])
plate_number = len(previous) + 1
return f"RSL-{data['abbreviation']}-{today.year}{str(today.month).zfill(2)}{str(today.day).zfill(2)}-{plate_number}"
@@ -205,4 +208,4 @@ class RSLNamer(object):
from .pydant import PydSubmission, PydKitType, PydContact, PydOrganization, PydSample, PydReagent, PydReagentRole, \
PydEquipment, PydEquipmentRole, PydTips, PydPCRControl, PydIridaControl, PydProcess, PydElastic, PydClientSubmission
PydEquipment, PydEquipmentRole, PydTips, PydProcess, PydElastic, PydClientSubmission

View File

@@ -650,22 +650,22 @@ class OmniProcess(BaseOmni):
new_assoc = st.to_sql()
except AttributeError:
new_assoc = SubmissionType.query(name=st)
if new_assoc not in instance.submission_types:
instance.submission_types.append(new_assoc)
if new_assoc not in instance.proceduretype:
instance.proceduretype.append(new_assoc)
for er in self.equipment_roles:
try:
new_assoc = er.to_sql()
except AttributeError:
new_assoc = EquipmentRole.query(name=er)
if new_assoc not in instance.equipment_roles:
instance.equipment_roles.append(new_assoc)
if new_assoc not in instance.equipmentrole:
instance.equipmentrole.append(new_assoc)
for tr in self.tip_roles:
try:
new_assoc = tr.to_sql()
except AttributeError:
new_assoc = TipRole.query(name=tr)
if new_assoc not in instance.tip_roles:
instance.tip_roles.append(new_assoc)
if new_assoc not in instance.tiprole:
instance.tiprole.append(new_assoc)
return instance
@property

File diff suppressed because it is too large Load Diff