Added Postgres support.

This commit is contained in:
lwark
2024-07-25 08:41:44 -05:00
parent 54e1e55804
commit 4bc5e08ac6
32 changed files with 579 additions and 1030 deletions

View File

@@ -17,7 +17,7 @@ def set_sqlite_pragma(dbapi_connection, connection_record):
connection_record (_type_): _description_
"""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
# cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()

View File

@@ -3,12 +3,13 @@ Contains all models for sqlalchemy
"""
from __future__ import annotations
import sys, logging
from sqlalchemy import Column, INTEGER, String, JSON
from sqlalchemy import Column, INTEGER, String, JSON, inspect
from sqlalchemy.orm import DeclarativeMeta, declarative_base, Query, Session
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.exc import ArgumentError
from typing import Any, List
from pathlib import Path
from tools import report_result
# Load testing environment
if 'pytest' in sys.modules:
@@ -90,7 +91,7 @@ class BaseClass(Base):
Returns:
dict | list | str: Output of key:value dict or single (list, str) desired variable
"""
"""
dicto = dict(singles=['id'])
output = {}
for k, v in dicto.items():
@@ -110,7 +111,7 @@ class BaseClass(Base):
Returns:
Any | List[Any]: Result of query execution.
"""
"""
return cls.execute_query(**kwargs)
@classmethod
@@ -152,38 +153,43 @@ class BaseClass(Base):
case _:
return query.limit(limit).all()
@report_result
def save(self):
"""
Add the object to the database and commit
"""
# logger.debug(f"Saving object: {pformat(self.__dict__)}")
report = Report()
try:
self.__database_session__.add(self)
self.__database_session__.commit()
# self.__database_session__.merge(self)
except Exception as e:
logger.critical(f"Problem saving object: {e}")
self.__database_session__.rollback()
report.add_result(Result(msg=f"Problem saving object {e}", status="Critical"))
return report
class ConfigItem(BaseClass):
"""
Key:JSON objects to store config settings in database.
"""
"""
id = Column(INTEGER, primary_key=True)
key = Column(String(32)) #: Name of the configuration item.
value = Column(JSON) #: Value associated with the config item.
key = Column(String(32)) #: Name of the configuration item.
value = Column(JSON) #: Value associated with the config item.
def __repr__(self):
return f"ConfigItem({self.key} : {self.value})"
@classmethod
def get_config_items(cls, *args) -> ConfigItem|List[ConfigItem]:
def get_config_items(cls, *args) -> ConfigItem | List[ConfigItem]:
"""
Get desired config items from database
Returns:
ConfigItem|List[ConfigItem]: Config item(s)
"""
"""
config_items = cls.__database_session__.query(cls).all()
config_items = [item for item in config_items if item.key in args]
if len(args) == 1:
@@ -196,4 +202,5 @@ from .controls import *
from .organizations import *
from .kits import *
from .submissions import *
BasicSubmission.reagents.creator = lambda reg: SubmissionReagentAssociation(reagent=reg)

View File

@@ -1532,7 +1532,7 @@ class Process(BaseClass):
query = cls.__database_session__.query(cls)
match name:
case str():
logger.debug(f"Lookup Process with name str {name}")
# logger.debug(f"Lookup Process with name str {name}")
query = query.filter(cls.name == name)
limit = 1
case _:

View File

@@ -23,7 +23,7 @@ import pandas as pd
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.drawing.image import Image as OpenpyxlImage
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr
from tools import row_map, setup_lookup, jinja_template_loading, rreplace, row_keys, check_key_or_attr, Result, Report
from datetime import datetime, date
from typing import List, Any, Tuple, Literal
from dateutil.parser import parse
@@ -691,7 +691,8 @@ class BasicSubmission(BaseClass):
Args:
input_dict (dict): Input sample dictionary
xl (pd.ExcelFile): original xl workbook, used for child classes mostly
xl (Workbook): original xl workbook, used for child classes mostly
custom_fields: Dictionary of locations, ranges, etc to be used by this function
Returns:
dict: Updated sample dictionary
@@ -739,6 +740,7 @@ class BasicSubmission(BaseClass):
input_excel (Workbook): initial workbook.
info (dict | None, optional): dictionary of additional info. Defaults to None.
backup (bool, optional): Whether this is part of a backup operation. Defaults to False.
custom_fields: Dictionary of locations, ranges, etc to be used by this function
Returns:
Workbook: Updated workbook
@@ -1046,14 +1048,16 @@ class BasicSubmission(BaseClass):
"""
code = 0
msg = ""
report = Report()
disallowed = ["id"]
if kwargs == {}:
raise ValueError("Need to narrow down query or the first available instance will be returned.")
for key in kwargs.keys():
if key in disallowed:
raise ValueError(
f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects. Use .query() instead.")
instance = cls.query(submission_type=submission_type, limit=1, **kwargs)
sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in disallowed}
# for key in kwargs.keys():
# if key in disallowed:
# raise ValueError(
# f"{key} is not allowed as a query argument as it could lead to creation of duplicate objects. Use .query() instead.")
instance = cls.query(submission_type=submission_type, limit=1, **sanitized_kwargs)
# logger.debug(f"Retrieved instance: {instance}")
if instance is None:
used_class = cls.find_polymorphic_subclass(attrs=kwargs, polymorphic_identity=submission_type)
@@ -1070,7 +1074,8 @@ class BasicSubmission(BaseClass):
else:
code = 1
msg = "This submission already exists.\nWould you like to overwrite?"
return instance, code, msg
report.add_result(Result(msg=msg, code=code))
return instance, report
# Custom context events for the ui
@@ -1135,7 +1140,7 @@ class BasicSubmission(BaseClass):
# logger.debug(widg)
widg.setParent(None)
pyd = self.to_pydantic(backup=True)
form = pyd.to_form(parent=obj)
form = pyd.to_form(parent=obj, disable=['rsl_plate_num'])
obj.app.table_widget.formwidget.layout().addWidget(form)
def add_comment(self, obj):
@@ -1352,13 +1357,29 @@ class Wastewater(BasicSubmission):
Args:
input_dict (dict): Input sample dictionary
xl (Workbook): xl (Workbook): original xl workbook, used for child classes mostly.
custom_fields: Dictionary of locations, ranges, etc to be used by this function
Returns:
dict: Updated sample dictionary
"""
input_dict = super().custom_info_parser(input_dict)
logger.debug(f"Input dict: {pformat(input_dict)}")
if xl is not None:
input_dict['csv'] = xl["Copy to import file"]
try:
input_dict['csv'] = xl["Copy to import file"]
except KeyError as e:
logger.error(e)
try:
match input_dict['rsl_plate_num']:
case dict():
input_dict['csv'] = xl[input_dict['rsl_plate_num']['value']]
case str():
input_dict['csv'] = xl[input_dict['rsl_plate_num']]
case _:
pass
except Exception as e:
logger.error(f"Error handling couldn't get csv due to: {e}")
return input_dict
@classmethod
@@ -1604,11 +1625,12 @@ class WastewaterArtic(BasicSubmission):
Args:
input_dict (dict): Input sample dictionary
xl (pd.ExcelFile): original xl workbook, used for child classes mostly
custom_fields: Dictionary of locations, ranges, etc to be used by this function
Returns:
dict: Updated sample dictionary
"""
# TODO: Clean up and move range start/stops to db somehow.
from backend.validators import RSLNamer
input_dict = super().custom_info_parser(input_dict)
egel_section = custom_fields['egel_results']
ws = xl[egel_section['sheet']]
@@ -1621,12 +1643,11 @@ class WastewaterArtic(BasicSubmission):
source_plates_section = custom_fields['source_plates']
ws = xl[source_plates_section['sheet']]
data = [dict(plate=ws.cell(row=ii, column=source_plates_section['plate_column']).value, starting_sample=ws.cell(row=ii, column=source_plates_section['starting_sample_column']).value) for ii in
range(source_plates_section['start_row'], source_plates_section['end_row'])]
range(source_plates_section['start_row'], source_plates_section['end_row']+1)]
for datum in data:
if datum['plate'] in ["None", None, ""]:
continue
else:
from backend.validators import RSLNamer
datum['plate'] = RSLNamer(filename=datum['plate'], sub_type="Wastewater").parsed_name
input_dict['source_plates'] = data
return input_dict
@@ -1820,6 +1841,7 @@ class WastewaterArtic(BasicSubmission):
input_excel (Workbook): initial workbook.
info (dict | None, optional): dictionary of additional info. Defaults to None.
backup (bool, optional): Whether this is part of a backup operation. Defaults to False.
custom_fields: Dictionary of locations, ranges, etc to be used by this function
Returns:
Workbook: Updated workbook
@@ -2798,7 +2820,7 @@ class WastewaterArticAssociation(SubmissionSampleAssociation):
DOC: https://docs.sqlalchemy.org/en/14/orm/extensions/associationproxy.html
"""
id = Column(INTEGER, ForeignKey("_submissionsampleassociation.id"), primary_key=True)
source_plate = Column(String(16))
source_plate = Column(String(32))
source_plate_number = Column(INTEGER)
source_well = Column(String(8))
ct = Column(String(8)) #: AKA ct for N1

View File

@@ -108,7 +108,7 @@ class PydReagent(BaseModel):
Returns:
dict: Information dictionary
"""
"""
try:
extras = list(self.model_extra.keys())
except AttributeError:
@@ -161,7 +161,7 @@ class PydReagent(BaseModel):
# reagent.reagent_submission_associations.append(assoc)
else:
assoc = None
report.add_result(Result(owner = __name__, code=0, msg="New reagent created.", status="Information"))
report.add_result(Result(owner=__name__, code=0, msg="New reagent created.", status="Information"))
else:
if submission is not None and reagent not in submission.reagents:
assoc = SubmissionReagentAssociation(reagent=reagent, submission=submission)
@@ -217,7 +217,7 @@ class PydSample(BaseModel, extra='allow'):
Returns:
dict: Information dictionary
"""
"""
fields = list(self.model_fields.keys()) + list(self.model_extra.keys())
return {k: getattr(self, k) for k in fields}
@@ -254,7 +254,8 @@ class PydSample(BaseModel, extra='allow'):
submission=submission,
sample=instance,
row=row, column=column, id=aid,
submission_rank=submission_rank, **self.model_extra)
submission_rank=submission_rank,
**self.model_extra)
# logger.debug(f"Using submission_sample_association: {association}")
try:
# instance.sample_submission_associations.append(association)
@@ -270,7 +271,7 @@ class PydSample(BaseModel, extra='allow'):
Returns:
dict: Information dictionary
"""
"""
try:
extras = list(self.model_extra.keys())
except AttributeError:
@@ -281,10 +282,10 @@ class PydSample(BaseModel, extra='allow'):
class PydTips(BaseModel):
name: str
lot: str|None = Field(default=None)
lot: str | None = Field(default=None)
role: str
def to_sql(self, submission:BasicSubmission) -> SubmissionTipsAssociation:
def to_sql(self, submission: BasicSubmission) -> SubmissionTipsAssociation:
"""
Con
@@ -293,7 +294,7 @@ class PydTips(BaseModel):
Returns:
SubmissionTipsAssociation: Association between queried tips and submission
"""
"""
tips = Tips.query(name=self.name, lot=self.lot, limit=1)
assoc = SubmissionTipsAssociation(submission=submission, tips=tips, role_name=self.role)
return assoc
@@ -305,7 +306,7 @@ class PydEquipment(BaseModel, extra='ignore'):
nickname: str | None
processes: List[str] | None
role: str | None
tips: List[PydTips]|None = Field(default=None)
tips: List[PydTips] | None = Field(default=None)
@field_validator('processes', mode='before')
@classmethod
@@ -338,23 +339,19 @@ class PydEquipment(BaseModel, extra='ignore'):
if equipment is None:
return
if submission is not None:
assoc = SubmissionEquipmentAssociation(submission=submission, equipment=equipment)
process = Process.query(name=self.processes[0])
if process is None:
logger.error(f"Found unknown process: {process}.")
# from frontend.widgets.pop_ups import QuestionAsker
# dlg = QuestionAsker(title="Add Process?",
# message=f"Unable to find {self.processes[0]} in the database.\nWould you like to add it?")
# if dlg.exec():
# kit = submission.extraction_kit
# submission_type = submission.submission_type
# process = Process(name=self.processes[0])
# process.kit_types.append(kit)
# process.submission_types.append(submission_type)
# process.equipment.append(equipment)
# process.save()
assoc.process = process
assoc.role = self.role
# NOTE: Need to make sure the same association is not added to the submission
assoc = SubmissionEquipmentAssociation.query(equipment_id=equipment.id, submission_id=submission.id,
role=self.role, limit=1)
if assoc is None:
assoc = SubmissionEquipmentAssociation(submission=submission, equipment=equipment)
process = Process.query(name=self.processes[0])
if process is None:
logger.error(f"Found unknown process: {process}.")
assoc.process = process
assoc.role = self.role
else:
assoc = None
else:
assoc = None
return equipment, assoc
@@ -365,7 +362,7 @@ class PydEquipment(BaseModel, extra='ignore'):
Returns:
dict: Information dictionary
"""
"""
try:
extras = list(self.model_extra.keys())
except AttributeError:
@@ -441,7 +438,7 @@ class PydSubmission(BaseModel, extra='allow'):
return value.date()
case int():
return dict(value=datetime.fromordinal(datetime(1900, 1, 1).toordinal() + value['value'] - 2).date(),
missing=True)
missing=True)
case str():
string = re.sub(r"(_|-)\d$", "", value['value'])
try:
@@ -508,7 +505,7 @@ class PydSubmission(BaseModel, extra='allow'):
output = "RSL-BS-Test001"
else:
output = RSLNamer(filename=values.data['filepath'].__str__(), sub_type=sub_type,
data=values.data).parsed_name
data=values.data).parsed_name
return dict(value=output, missing=True)
@field_validator("technician", mode="before")
@@ -637,14 +634,14 @@ class PydSubmission(BaseModel, extra='allow'):
self.submission_object = BasicSubmission.find_polymorphic_subclass(
polymorphic_identity=self.submission_type['value'])
def set_attribute(self, key:str, value):
def set_attribute(self, key: str, value):
"""
Better handling of attribute setting.
Args:
key (str): Name of field to set
value (_type_): Value to set field to.
"""
"""
self.__setattr__(name=key, value=value)
def handle_duplicate_samples(self):
@@ -710,7 +707,7 @@ class PydSubmission(BaseModel, extra='allow'):
missing_reagents = [reagent for reagent in self.reagents if reagent.missing]
return missing_info, missing_reagents
def to_sql(self) -> Tuple[BasicSubmission, Result]:
def to_sql(self) -> Tuple[BasicSubmission, Report]:
"""
Converts this instance into a backend.db.models.submissions.BasicSubmission instance
@@ -718,13 +715,13 @@ class PydSubmission(BaseModel, extra='allow'):
Tuple[BasicSubmission, Result]: BasicSubmission instance, result object
"""
# self.__dict__.update(self.model_extra)
report = Report()
dicto = self.improved_dict()
instance, code, msg = BasicSubmission.query_or_create(submission_type=self.submission_type['value'],
rsl_plate_num=self.rsl_plate_num['value'])
result = Result(msg=msg, code=code)
instance, result = BasicSubmission.query_or_create(submission_type=self.submission_type['value'],
rsl_plate_num=self.rsl_plate_num['value'])
report.add_result(result)
self.handle_duplicate_samples()
# logger.debug(f"Here's our list of duplicate removed samples: {self.samples}")
# for key, value in self.__dict__.items():
for key, value in dicto.items():
if isinstance(value, dict):
value = value['value']
@@ -733,13 +730,13 @@ class PydSubmission(BaseModel, extra='allow'):
# logger.debug(f"Setting {key} to {value}")
match key:
case "reagents":
if code == 1:
if report.results[0].code == 1:
instance.submission_reagent_associations = []
# logger.debug(f"Looking through {self.reagents}")
for reagent in self.reagents:
reagent, assoc, _ = reagent.toSQL(submission=instance)
# logger.debug(f"Association: {assoc}")
if assoc is not None:# and assoc not in instance.submission_reagent_associations:
if assoc is not None: # and assoc not in instance.submission_reagent_associations:
instance.submission_reagent_associations.append(assoc)
# instance.reagents.append(reagent)
case "samples":
@@ -755,10 +752,7 @@ class PydSubmission(BaseModel, extra='allow'):
if equip is None:
continue
equip, association = equip.toSQL(submission=instance)
if association is not None and association not in instance.submission_equipment_associations:
# association.save()
# logger.debug(
# f"Equipment association SQL object to be added to submission: {association.__dict__}")
if association is not None:
instance.submission_equipment_associations.append(association)
case "tips":
for tips in self.tips:
@@ -817,9 +811,9 @@ class PydSubmission(BaseModel, extra='allow'):
# except AttributeError as e:
# logger.debug(f"Something went wrong constructing instance {self.rsl_plate_num}: {e}")
# logger.debug(f"Constructed submissions message: {msg}")
return instance, result
return instance, report
def to_form(self, parent: QWidget):
def to_form(self, parent: QWidget, disable:list|None=None):
"""
Converts this instance into a frontend.widgets.submission_widget.SubmissionFormWidget
@@ -830,7 +824,8 @@ class PydSubmission(BaseModel, extra='allow'):
SubmissionFormWidget: Submission form widget
"""
from frontend.widgets.submission_widget import SubmissionFormWidget
return SubmissionFormWidget(parent=parent, submission=self)
logger.debug(f"Disbable: {disable}")
return SubmissionFormWidget(parent=parent, submission=self, disable=disable)
def to_writer(self) -> "SheetWriter":
"""
@@ -838,7 +833,7 @@ class PydSubmission(BaseModel, extra='allow'):
Returns:
SheetWriter: Sheetwriter object that will perform writing.
"""
"""
from backend.excel.writer import SheetWriter
return SheetWriter(self)
@@ -896,8 +891,8 @@ class PydSubmission(BaseModel, extra='allow'):
status="Warning")
report.add_result(result)
return output_reagents, report
def export_csv(self, filename:Path|str):
def export_csv(self, filename: Path | str):
try:
worksheet = self.csv
except AttributeError:
@@ -1024,4 +1019,3 @@ class PydEquipmentRole(BaseModel):
"""
from frontend.widgets.equipment_usage import RoleComboBox
return RoleComboBox(parent=parent, role=self, used=used)

View File

@@ -1,6 +1,6 @@
'''
"""
Constructs main application.
'''
"""
from PyQt6.QtWidgets import (
QTabWidget, QWidget, QVBoxLayout,
QHBoxLayout, QScrollArea, QMainWindow,
@@ -13,7 +13,7 @@ from markdown import markdown
from tools import check_if_app, Settings, Report, jinja_template_loading
from datetime import date
from .pop_ups import AlertPop, HTMLPop
from .pop_ups import HTMLPop
from .misc import LogParser
import logging, webbrowser, sys, shutil
from .submission_table import SubmissionsSheet
@@ -36,7 +36,7 @@ class App(QMainWindow):
self.report = Report()
# NOTE: indicate version and connected database in title bar
try:
self.title = f"Submissions App (v{ctx.package.__version__}) - {ctx.database_path}"
self.title = f"Submissions App (v{ctx.package.__version__}) - {ctx.database_session.get_bind().url}"
except (AttributeError, KeyError):
self.title = f"Submissions App"
# NOTE: set initial app position and size
@@ -164,27 +164,6 @@ class App(QMainWindow):
instr = HTMLPop(html=html, title="Instructions")
instr.exec()
def result_reporter(self):
"""
Report any anomolous results - if any - to the user
Args:
result (dict | None, optional): The result from a function. Defaults to None.
"""
# logger.debug(f"Running results reporter for: {self.report.results}")
if len(self.report.results) > 0:
# logger.debug(f"We've got some results!")
for result in self.report.results:
# logger.debug(f"Showing result: {result}")
if result is not None:
alert = result.report()
if alert.exec():
pass
self.report = Report()
else:
self.statusBar().showMessage("Action completed sucessfully.", 5000)
def runSearch(self):
dlg = LogParser(self)
dlg.exec()
@@ -201,12 +180,19 @@ class App(QMainWindow):
Copies the database into the backup directory the first time it is opened every month.
"""
month = date.today().strftime("%Y-%m")
current_month_bak = Path(self.ctx.backup_path).joinpath(f"submissions_backup-{month}").resolve()
# logger.debug(f"Here is the db directory: {self.ctx.database_path}")
# logger.debug(f"Here is the backup directory: {self.ctx.backup_path}")
current_month_bak = Path(self.ctx.backup_path).joinpath(f"submissions_backup-{month}").resolve().with_suffix(".db")
if not current_month_bak.exists() and "demo" not in self.ctx.database_path.__str__():
logger.info("No backup found for this month, backing up database.")
shutil.copyfile(self.ctx.database_path, current_month_bak)
match self.ctx.database_schema:
case "sqlite":
current_month_bak = current_month_bak.with_suffix(".db")
if not current_month_bak.exists() and "demo" not in self.ctx.database_path.__str__():
logger.info("No backup found for this month, backing up database.")
shutil.copyfile(self.ctx.database_path, current_month_bak)
case "postgresql+psycopg2":
logger.warning(f"Backup function not yet implemented for psql")
current_month_bak = current_month_bak.with_suffix(".psql")
class AddSubForm(QWidget):

View File

@@ -15,7 +15,7 @@ logger = logging.getLogger(f"submissions.{__name__}")
class EquipmentUsage(QDialog):
def __init__(self, parent, submission: BasicSubmission) -> QDialog:
def __init__(self, parent, submission: BasicSubmission):
super().__init__(parent)
self.submission = submission
self.setWindowTitle(f"Equipment Checklist - {submission.rsl_plate_num}")
@@ -139,7 +139,7 @@ class RoleComboBox(QWidget):
Changes what tips are available when process is changed
"""
process = self.process.currentText().strip()
logger.debug(f"Checking process: {process} for equipment {self.role.name}")
# logger.debug(f"Checking process: {process} for equipment {self.role.name}")
process = Process.query(name=process)
if process.tip_roles:
for iii, tip_role in enumerate(process.tip_roles):

View File

@@ -21,10 +21,10 @@ logger = logging.getLogger(f"submissions.{__name__}")
# Main window class
class GelBox(QDialog):
def __init__(self, parent, img_path:str|Path, submission:WastewaterArtic):
def __init__(self, parent, img_path: str | Path, submission: WastewaterArtic):
super().__init__(parent)
# NOTE: setting title
self.setWindowTitle("PyQtGraph")
self.setWindowTitle(f"Gel - {img_path}")
self.img_path = img_path
self.submission = submission
# NOTE: setting geometry
@@ -41,7 +41,7 @@ class GelBox(QDialog):
def UiComponents(self):
"""
Create widgets in ui
"""
"""
# NOTE: setting configuration options
pg.setConfigOptions(antialias=True)
# NOTE: creating image view object
@@ -49,41 +49,42 @@ class GelBox(QDialog):
# NOTE: Create image.
# NOTE: For some reason, ImageView wants to flip the image, so we have to rotate and flip the array first.
# NOTE: Using the Image.rotate function results in cropped image, so using np.
img = np.flip(np.rot90(np.array(Image.open(self.img_path)),1),0)
img = np.flip(np.rot90(np.array(Image.open(self.img_path)), 1), 0)
self.imv.setImage(img)
layout = QGridLayout()
layout.addWidget(QLabel("DNA Core Submission Number"),0,1)
layout.addWidget(QLabel("DNA Core Submission Number"), 21, 1)
self.core_number = QLineEdit()
self.core_number.setText(self.submission.dna_core_submission_number)
layout.addWidget(self.core_number, 0,2)
layout.addWidget(QLabel("Gel Barcode"),0,3)
layout.addWidget(self.core_number, 21, 2)
layout.addWidget(QLabel("Gel Barcode"), 21, 3)
self.gel_barcode = QLineEdit()
self.gel_barcode.setText(self.submission.gel_barcode)
layout.addWidget(self.gel_barcode, 0, 4)
layout.addWidget(self.gel_barcode, 21, 4)
# NOTE: setting this layout to the widget
# NOTE: plot window goes on right side, spanning 3 rows
layout.addWidget(self.imv, 1, 1,20,20)
layout.addWidget(self.imv, 0, 1, 20, 20)
# NOTE: setting this widget as central widget of the main window
try:
control_info = sorted(self.submission.gel_controls, key=lambda d: d['location'])
except KeyError:
control_info = None
self.form = ControlsForm(parent=self, control_info=control_info)
layout.addWidget(self.form,22,1,1,4)
layout.addWidget(self.form, 22, 1, 1, 4)
QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
layout.addWidget(self.buttonBox, 23, 1, 1, 1)#, alignment=Qt.AlignmentFlag.AlignTop)
layout.addWidget(self.buttonBox, 23, 1, 1, 1) #, alignment=Qt.AlignmentFlag.AlignTop)
self.setLayout(layout)
def parse_form(self) -> Tuple[str, str|Path, list]:
def parse_form(self) -> Tuple[str, str | Path, list]:
"""
Get relevant values from self/form
Returns:
Tuple[str, str|Path, list]: output values
"""
"""
dna_core_submission_number = self.core_number.text()
gel_barcode = self.gel_barcode.text()
values, comment = self.form.parse_form()
@@ -92,7 +93,7 @@ class GelBox(QDialog):
class ControlsForm(QWidget):
def __init__(self, parent, control_info:List=None) -> None:
def __init__(self, parent, control_info: List = None) -> None:
super().__init__(parent)
self.layout = QGridLayout()
columns = []
@@ -101,9 +102,10 @@ class ControlsForm(QWidget):
tt_text = "\n".join([f"{item['sample_id']} - CELL {item['location']}" for item in control_info])
except TypeError:
tt_text = None
for iii, item in enumerate(["Negative Control Key", "Description", "Results - 65 C", "Results - 63 C", "Results - Spike"]):
for iii, item in enumerate(
["Negative Control Key", "Description", "Results - 65 C", "Results - 63 C", "Results - Spike"]):
label = QLabel(item)
self.layout.addWidget(label, 0, iii,1,1)
self.layout.addWidget(label, 0, iii, 1, 1)
if iii > 1:
columns.append(item)
elif iii == 0:
@@ -114,7 +116,8 @@ class ControlsForm(QWidget):
label = QLabel(item)
self.layout.addWidget(label, iii, 0, 1, 1)
rows.append(item)
for iii, item in enumerate(["Processing Negative (PBS)", "Extraction Negative (Extraction buffers ONLY)", "Artic no-template control (mastermix ONLY)"], start=1):
for iii, item in enumerate(["Processing Negative (PBS)", "Extraction Negative (Extraction buffers ONLY)",
"Artic no-template control (mastermix ONLY)"], start=1):
label = QLabel(item)
self.layout.addWidget(label, iii, 1, 1, 1)
for iii in range(3):
@@ -125,11 +128,11 @@ class ControlsForm(QWidget):
widge.setCurrentIndex(0)
widge.setEditable(True)
widge.setObjectName(f"{rows[iii]} : {columns[jjj]}")
self.layout.addWidget(widge, iii+1, jjj+2, 1, 1)
self.layout.addWidget(QLabel("Comments:"), 0,5,1,1)
self.layout.addWidget(widge, iii + 1, jjj + 2, 1, 1)
self.layout.addWidget(QLabel("Comments:"), 0, 5, 1, 1)
self.comment_field = QTextEdit(self)
self.comment_field.setFixedHeight(50)
self.layout.addWidget(self.comment_field, 1,5,4,1)
self.layout.addWidget(self.comment_field, 1, 5, 4, 1)
self.setLayout(self.layout)
def parse_form(self) -> List[dict]:
@@ -138,12 +141,12 @@ class ControlsForm(QWidget):
Returns:
List[dict]: output of values
"""
"""
output = []
for le in self.findChildren(QComboBox):
label = [item.strip() for item in le.objectName().split(" : ")]
try:
dicto = [item for item in output if item['name']==label[0]][0]
dicto = [item for item in output if item['name'] == label[0]][0]
except IndexError:
dicto = dict(name=label[0], values=[])
dicto['values'].append(dict(name=label[1], value=le.currentText()))

View File

@@ -8,7 +8,7 @@ from PyQt6.QtWidgets import (
QDialogButtonBox, QDateEdit, QPushButton, QFormLayout
)
from PyQt6.QtCore import Qt, QDate
from tools import jinja_template_loading, Settings
from tools import jinja_template_loading
from backend.db.models import *
import logging
from .pop_ups import AlertPop
@@ -45,18 +45,19 @@ class AddReagentForm(QDialog):
self.exp_input.setObjectName('expiry')
# NOTE: if expiry is not passed in from gui, use today
if expiry is None:
self.exp_input.setDate(QDate.currentDate())
# self.exp_input.setDate(QDate.currentDate())
self.exp_input.setDate(QDate(1970, 1, 1))
else:
try:
self.exp_input.setDate(expiry)
except TypeError:
self.exp_input.setDate(QDate.currentDate())
self.exp_input.setDate(QDate(1970, 1, 1))
# NOTE: widget to get reagent type info
self.type_input = QComboBox()
self.type_input.setObjectName('type')
self.type_input.addItems([item.name for item in ReagentRole.query()])
# logger.debug(f"Trying to find index of {reagent_type}")
# NOTE: convert input to user friendly string?
# NOTE: convert input to user-friendly string?
try:
reagent_role = reagent_role.replace("_", " ").title()
except AttributeError:

View File

@@ -7,8 +7,8 @@ from PyQt6.QtWidgets import QTableView, QMenu
from PyQt6.QtCore import Qt, QAbstractTableModel, QSortFilterProxyModel
from PyQt6.QtGui import QAction, QCursor
from backend.db.models import BasicSubmission
from backend.excel import make_report_html, make_report_xlsx, ReportMaker
from tools import Report, Result, row_map, get_first_blank_df_row, html_to_pdf
from backend.excel import ReportMaker
from tools import Report, Result, report_result
from .functions import select_save_file, select_open_file
from .misc import ReportDatePicker
import pandas as pd
@@ -129,14 +129,15 @@ class SubmissionsSheet(QTableView):
func = self.con_actions[action_name]
func(obj=self)
@report_result
def link_extractions(self):
"""
Pull extraction logs into the db
"""
self.link_extractions_function()
self.app.report.add_result(self.report)
"""
self.report = Report()
self.app.result_reporter()
self.link_extractions_function()
self.report.add_result(self.report)
return self.report
def link_extractions_function(self):
"""
@@ -179,6 +180,7 @@ class SubmissionsSheet(QTableView):
sub.save()
self.report.add_result(Result(msg=f"We added {count} logs to the database.", status='Information'))
@report_result
def link_pcr(self):
"""
Pull pcr logs into the db
@@ -186,7 +188,7 @@ class SubmissionsSheet(QTableView):
self.link_pcr_function()
self.app.report.add_result(self.report)
self.report = Report()
self.app.result_reporter()
return self.report
def link_pcr_function(self):
"""
@@ -225,15 +227,15 @@ class SubmissionsSheet(QTableView):
# NOTE: check if pcr_info already exists
sub.save()
self.report.add_result(Result(msg=f"We added {count} logs to the database.", status='Information'))
@report_result
def generate_report(self):
"""
Make a report
"""
self.generate_report_function()
self.app.report.add_result(self.report)
"""
self.report = Report()
self.app.result_reporter()
self.generate_report_function()
return self.report
def generate_report_function(self):
"""
@@ -250,43 +252,7 @@ class SubmissionsSheet(QTableView):
dlg = ReportDatePicker()
if dlg.exec():
info = dlg.parse_form()
# logger.debug(f"Report info: {info}")
# NOTE: find submissions based on date range
subs = BasicSubmission.query(start_date=info['start_date'], end_date=info['end_date'])
# NOTE: convert each object to dict
records = [item.to_dict(report=True) for item in subs]
# logger.debug(f"Records: {pformat(records)}")
# NOTE: make dataframe from record dictionaries
detailed_df, summary_df = make_report_xlsx(records=records)
html = make_report_html(df=summary_df, start_date=info['start_date'], end_date=info['end_date'])
# NOTE: get save location of report
fname = select_save_file(obj=self, default_name=f"Submissions_Report_{info['start_date']}-{info['end_date']}.docx", extension="docx")
# html_to_pdf(html=html, output_file=fname)
# writer = pd.ExcelWriter(fname.with_suffix(".xlsx"), engine='openpyxl')
# summary_df.to_excel(writer, sheet_name="Report")
# detailed_df.to_excel(writer, sheet_name="Details", index=False)
# worksheet: Worksheet = writer.sheets['Report']
# for idx, col in enumerate(summary_df, start=1): # loop through all columns
# series = summary_df[col]
# max_len = max((
# series.astype(str).map(len).max(), # len of largest item
# len(str(series.name)) # len of column name/header
# )) + 20 # adding a little extra space
# try:
# # NOTE: Convert idx to letter
# col_letter = chr(ord('@') + idx)
# worksheet.column_dimensions[col_letter].width = max_len
# except ValueError:
# pass
# blank_row = get_first_blank_df_row(summary_df) + 1
# # logger.debug(f"Blank row index = {blank_row}")
# for col in range(3,6):
# col_letter = row_map[col]
# worksheet.cell(row=blank_row, column=col, value=f"=SUM({col_letter}2:{col_letter}{str(blank_row-1)})")
# for cell in worksheet['D']:
# if cell.row > 1:
# cell.style = 'Currency'
# writer.close()
rp = ReportMaker(start_date=info['start_date'], end_date=info['end_date'])
rp.write_report(filename=fname, obj=self)
self.report.add_result(report)

View File

@@ -11,7 +11,7 @@ from pathlib import Path
from . import select_open_file, select_save_file
import logging, difflib, inspect
from pathlib import Path
from tools import Report, Result, check_not_nan, workbook_2_csv, main_form_style
from tools import Report, Result, check_not_nan, workbook_2_csv, main_form_style, report_result
from backend.excel.parser import SheetParser
from backend.validators import PydSubmission, PydReagent
from backend.db import (
@@ -59,17 +59,16 @@ class SubmissionFormContainer(QWidget):
self.app.last_dir = fname.parent
self.import_drag.emit(fname)
@report_result
def importSubmission(self, fname: Path | None = None):
"""
import submission from excel sheet into form
"""
self.app.raise_()
self.app.activateWindow()
self.import_submission_function(fname)
# logger.debug(f"Result from result reporter: {self.report.results}")
self.app.report.add_result(self.report)
self.report = Report()
self.app.result_reporter()
self.import_submission_function(fname)
return self.report
def import_submission_function(self, fname: Path | None = None):
"""
@@ -115,8 +114,9 @@ class SubmissionFormContainer(QWidget):
# logger.debug(f"Outgoing report: {self.report.results}")
# logger.debug(f"All attributes of submission container:\n{pformat(self.__dict__)}")
@report_result
def add_reagent(self, reagent_lot: str | None = None, reagent_role: str | None = None, expiry: date | None = None,
name: str | None = None):
name: str | None = None) -> Tuple[PydReagent, Report]:
"""
Action to create new reagent in DB.
@@ -144,16 +144,18 @@ class SubmissionFormContainer(QWidget):
sqlobj, assoc, result = reagent.toSQL()
sqlobj.save()
report.add_result(result)
self.app.report.add_result(report)
self.app.result_reporter()
return reagent
# logger.debug(f"Reagent: {reagent}, Report: {report}")
return reagent, report
class SubmissionFormWidget(QWidget):
def __init__(self, parent: QWidget, submission: PydSubmission) -> None:
def __init__(self, parent: QWidget, submission: PydSubmission, disable: list | None = None) -> None:
super().__init__(parent)
# self.report = Report()
# logger.debug(f"Disable: {disable}")
if disable is None:
disable = []
self.app = parent.app
self.pyd = submission
self.missing_info = []
@@ -166,12 +168,19 @@ class SubmissionFormWidget(QWidget):
for k in list(self.pyd.model_fields.keys()) + list(self.pyd.model_extra.keys()):
if k in self.ignore:
continue
try:
# logger.debug(f"Key: {k}, Disable: {disable}")
check = k in disable
# logger.debug(f"Check: {check}")
except TypeError:
check = False
try:
value = self.pyd.__getattribute__(k)
except AttributeError:
logger.error(f"Couldn't get attribute from pyd: {k}")
value = dict(value=None, missing=True)
add_widget = self.create_widget(key=k, value=value, submission_type=self.pyd.submission_type['value'], sub_obj=st)
add_widget = self.create_widget(key=k, value=value, submission_type=self.pyd.submission_type['value'],
sub_obj=st, disable=check)
if add_widget is not None:
self.layout.addWidget(add_widget)
if k == "extraction_kit":
@@ -180,11 +189,13 @@ class SubmissionFormWidget(QWidget):
self.scrape_reagents(self.pyd.extraction_kit)
def create_widget(self, key: str, value: dict | PydReagent, submission_type: str | None = None,
extraction_kit: str | None = None, sub_obj:BasicSubmission|None=None) -> "self.InfoItem":
extraction_kit: str | None = None, sub_obj: BasicSubmission | None = None,
disable: bool = False) -> "self.InfoItem":
"""
Make an InfoItem widget to hold a field
Args:
disable ():
key (str): Name of the field
value (dict): Value of field
submission_type (str | None, optional): Submissiontype as str. Defaults to None.
@@ -192,18 +203,25 @@ class SubmissionFormWidget(QWidget):
Returns:
self.InfoItem: Form widget to hold name:value
"""
# logger.debug(f"Key: {key}, Disable: {disable}")
if key not in self.ignore:
match value:
case PydReagent():
if value.name.lower() != "not applicable":
widget = self.ReagentFormWidget(self, reagent=value, extraction_kit=extraction_kit)
else:
widget = None
case _:
widget = self.InfoItem(self, key=key, value=value, submission_type=submission_type, sub_obj=sub_obj)
# logger.debug(f"Setting widget enabled to: {not disable}")
if disable:
widget.input.setEnabled(False)
widget.input.setToolTip("Widget disabled to protect database integrity.")
return widget
return None
@report_result
def scrape_reagents(self, *args, **kwargs): #extraction_kit:str, caller:str|None=None):
"""
Extracted scrape reagents function that will run when
@@ -250,8 +268,7 @@ class SubmissionFormWidget(QWidget):
self.layout.addWidget(submit_btn)
submit_btn.clicked.connect(self.submit_new_sample_function)
self.setLayout(self.layout)
self.app.report.add_result(report)
self.app.result_reporter()
return report
def clear_form(self):
"""
@@ -275,7 +292,8 @@ class SubmissionFormWidget(QWidget):
query = [widget for widget in query if widget.objectName() == object_name]
return query
def submit_new_sample_function(self) -> QWidget:
@report_result
def submit_new_sample_function(self, *args) -> Report:
"""
Parse forms and add sample to the database.
@@ -294,37 +312,40 @@ class SubmissionFormWidget(QWidget):
_, result = self.pyd.check_kit_integrity()
report.add_result(result)
if len(result.results) > 0:
self.app.report.add_result(report)
self.app.result_reporter()
# self.app.report.add_result(report)
# self.app.report_result()
return
# logger.debug(f"PYD before transformation into SQL:\n\n{self.pyd}\n\n")
base_submission, result = self.pyd.to_sql()
# logger.debug(f"SQL object: {pformat(base_submission.__dict__)}")
# logger.debug(f"Base submission: {base_submission.to_dict()}")
# NOTE: check output message for issues
match result.code:
try:
code = report.results[-1].code
except IndexError:
code = 0
match code:
# NOTE: code 0: everything is fine.
case 0:
report.add_result(None)
pass
# NOTE: code 1: ask for overwrite
case 1:
dlg = QuestionAsker(title=f"Review {base_submission.rsl_plate_num}?", message=result.msg)
if dlg.exec():
# NOTE: Do not add duplicate reagents.
result = None
pass
else:
self.app.ctx.database_session.rollback()
report.add_result(Result(msg="Overwrite cancelled", status="Information"))
self.app.report.add_result(report)
self.app.result_reporter()
return
# self.app.report.add_result(report)
# self.app.report_result()
return report
# NOTE: code 2: No RSL plate number given
case 2:
report.add_result(result)
self.app.report.add_result(report)
self.app.result_reporter()
return
# self.app.report.add_result(report)
# self.app.report_result()
return report
case _:
pass
# NOTE: add reagents to submission object
@@ -338,8 +359,7 @@ class SubmissionFormWidget(QWidget):
# NOTE: reset form
self.setParent(None)
# logger.debug(f"All attributes of obj: {pformat(self.__dict__)}")
self.app.report.add_result(report)
self.app.result_reporter()
return report
def export_csv_function(self, fname: Path | None = None):
"""
@@ -352,7 +372,6 @@ class SubmissionFormWidget(QWidget):
fname = select_save_file(obj=self, default_name=self.pyd.construct_filename(), extension="csv")
try:
self.pyd.export_csv(fname)
# workbook_2_csv(worksheet=self.pyd.csv, filename=fname)
except PermissionError:
logger.warning(f"Could not get permissions to {fname}. Possibly the request was cancelled.")
except AttributeError:
@@ -398,11 +417,13 @@ class SubmissionFormWidget(QWidget):
class InfoItem(QWidget):
def __init__(self, parent: QWidget, key: str, value: dict, submission_type: str | None = None, sub_obj:BasicSubmission|None=None) -> None:
def __init__(self, parent: QWidget, key: str, value: dict, submission_type: str | None = None,
sub_obj: BasicSubmission | None = None) -> None:
super().__init__(parent)
layout = QVBoxLayout()
self.label = self.ParsedQLabel(key=key, value=value)
self.input: QWidget = self.set_widget(parent=self, key=key, value=value, submission_type=submission_type, sub_obj=sub_obj)
self.input: QWidget = self.set_widget(parent=self, key=key, value=value, submission_type=submission_type,
sub_obj=sub_obj)
self.setObjectName(key)
try:
self.missing: bool = value['missing']
@@ -439,7 +460,8 @@ class SubmissionFormWidget(QWidget):
return None, None
return self.input.objectName(), dict(value=value, missing=self.missing)
def set_widget(self, parent: QWidget, key: str, value: dict, submission_type: str | None = None, sub_obj:BasicSubmission|None=None) -> QWidget:
def set_widget(self, parent: QWidget, key: str, value: dict, submission_type: str | None = None,
sub_obj: BasicSubmission | None = None) -> QWidget:
"""
Creates form widget
@@ -472,6 +494,7 @@ class SubmissionFormWidget(QWidget):
pass
# set combobox values to lookedup values
add_widget.addItems(labs)
add_widget.setToolTip("Select submitting lab.")
case 'extraction_kit':
# if extraction kit not available, all other values fail
if not check_not_nan(value):
@@ -493,15 +516,7 @@ class SubmissionFormWidget(QWidget):
logger.error(f"Couldn't find {obj.prsr.sub['extraction_kit']}")
obj.ext_kit = uses[0]
add_widget.addItems(uses)
# case 'submitted_date':
# # NOTE: uses base calendar
# add_widget = QDateEdit(calendarPopup=True)
# # NOTE: sets submitted date based on date found in excel sheet
# try:
# add_widget.setDate(value)
# # NOTE: if not found, use today
# except:
# add_widget.setDate(date.today())
add_widget.setToolTip("Select extraction kit.")
case 'submission_category':
add_widget = QComboBox()
cats = ['Diagnostic', "Surveillance", "Research"]
@@ -511,6 +526,7 @@ class SubmissionFormWidget(QWidget):
except ValueError:
cats.insert(0, cats.pop(cats.index(submission_type)))
add_widget.addItems(cats)
add_widget.setToolTip("Enter submission category or select from list.")
case _:
if key in sub_obj.timestamps():
add_widget = QDateEdit(calendarPopup=True)
@@ -520,11 +536,13 @@ class SubmissionFormWidget(QWidget):
# NOTE: if not found, use today
except:
add_widget.setDate(date.today())
add_widget.setToolTip(f"Select date for {key}")
else:
# NOTE: anything else gets added in as a line edit
add_widget = QLineEdit()
# logger.debug(f"Setting widget text to {str(value).replace('_', ' ')}")
add_widget.setText(str(value).replace("_", " "))
add_widget.setToolTip(f"Enter value for {key}")
if add_widget is not None:
add_widget.setObjectName(key)
add_widget.setParent(parent)
@@ -594,13 +612,14 @@ class SubmissionFormWidget(QWidget):
# NOTE: If changed set self.missing to True and update self.label
self.lot.currentTextChanged.connect(self.updated)
def parse_form(self) -> Tuple[PydReagent, dict]:
def parse_form(self) -> Tuple[PydReagent | None, Report]:
"""
Pulls form info into PydReagent
Returns:
Tuple[PydReagent, dict]: PydReagent and Report(?)
"""
report = Report()
lot = self.lot.currentText()
# logger.debug(f"Using this lot for the reagent {self.reagent}: {lot}")
wanted_reagent = Reagent.query(lot_number=lot, reagent_role=self.reagent.role)
@@ -609,14 +628,16 @@ class SubmissionFormWidget(QWidget):
dlg = QuestionAsker(title=f"Add {lot}?",
message=f"Couldn't find reagent type {self.reagent.role}: {lot} in the database.\n\nWould you like to add it?")
if dlg.exec():
wanted_reagent = self.parent().parent().add_reagent(reagent_lot=lot, reagent_role=self.reagent.role,
expiry=self.reagent.expiry,
name=self.reagent.name)
return wanted_reagent, None
wanted_reagent, _ = self.parent().parent().add_reagent(reagent_lot=lot,
reagent_role=self.reagent.role,
expiry=self.reagent.expiry,
name=self.reagent.name)
return wanted_reagent, report
else:
# NOTE: In this case we will have an empty reagent and the submission will fail kit integrity check
# logger.debug("Will not add reagent.")
return None, Result(msg="Failed integrity check", status="Critical")
report.add_result(Result(msg="Failed integrity check", status="Critical"))
return None, report
else:
# NOTE: Since this now gets passed in directly from the parser -> pyd -> form and the parser gets the name
# from the db, it should no longer be necessary to query the db with reagent/kit, but with rt name directly.
@@ -624,7 +645,7 @@ class SubmissionFormWidget(QWidget):
if rt is None:
rt = ReagentRole.query(kit_type=self.extraction_kit, reagent=wanted_reagent)
return PydReagent(name=wanted_reagent.name, lot=wanted_reagent.lot, role=rt.name,
expiry=wanted_reagent.expiry, missing=False), None
expiry=wanted_reagent.expiry, missing=False), report
def updated(self):
"""
@@ -708,4 +729,5 @@ class SubmissionFormWidget(QWidget):
# logger.debug(f"New relevant reagents: {relevant_reagents}")
self.setObjectName(f"lot_{reagent.role}")
self.addItems(relevant_reagents)
self.setToolTip(f"Enter lot number for the reagent used for {reagent.role}")
# self.setStyleSheet(main_form_style)

View File

@@ -4,6 +4,8 @@ Contains miscellaenous functions used by both frontend and backend.
from __future__ import annotations
import json
from json import JSONDecodeError
import jinja2
import numpy as np
import logging, re, yaml, sys, os, stat, platform, getpass, inspect, csv
import pandas as pd
@@ -18,7 +20,6 @@ from typing import Any, Tuple, Literal, List
from PyQt6.QtGui import QPageSize
from PyQt6.QtWebEngineWidgets import QWebEngineView
from openpyxl.worksheet.worksheet import Worksheet
# from PyQt6 import QtPrintSupport, QtCore, QtWebEngineWidgets
from PyQt6.QtPrintSupport import QPrinter
logger = logging.getLogger(f"submissions.{__name__}")
@@ -74,7 +75,7 @@ def check_key_or_attr(key: str, interest: dict | object, check_none: bool = Fals
Returns:
bool: True if exists, else False
"""
"""
match interest:
case dict():
if key in interest.keys():
@@ -175,7 +176,7 @@ def is_missing(value: Any) -> Tuple[Any, bool]:
Returns:
Tuple[Any, bool]: Value, True if nan, else False
"""
"""
if check_not_nan(value):
return value, False
else:
@@ -222,7 +223,11 @@ class Settings(BaseSettings, extra="allow"):
FileNotFoundError: Error if database not found.
"""
database_schema: str
directory_path: Path
database_user: str | None = None
database_password: str | None = None
database_name: str
database_path: Path | str | None = None
backup_path: Path | str | None = None
# super_users: list|None = None
@@ -260,17 +265,26 @@ class Settings(BaseSettings, extra="allow"):
@field_validator('database_path', mode="before")
@classmethod
def ensure_database_exists(cls, value, values):
if value == ":memory:":
return value
match value:
case str():
value = Path(value)
case None:
value = values.data['directory_path'].joinpath("submissions.db")
if value.exists():
return value
else:
raise FileNotFoundError(f"Couldn't find database at {value}")
# if value == ":memory:":
# return value
match values.data['database_schema']:
case "sqlite":
value = f"/{Path(value).absolute().__str__()}/{values.data['database_name']}.db"
# db_name = f"{values.data['database_name']}.db"
case _:
value = f"@{value}/{values.data['database_name']}"
# db_name = values.data['database_name']
# match value:
# case str():
# value = Path(value)
# case None:
# value = values.data['directory_path'].joinpath("submissions.db")
# if value.exists():
# return value
# else:
# raise FileNotFoundError(f"Couldn't find database at {value}")
return value
@field_validator('database_session', mode="before")
@classmethod
@@ -278,27 +292,33 @@ class Settings(BaseSettings, extra="allow"):
if value is not None:
return value
else:
database_path = values.data['database_path']
if database_path is None:
# NOTE: check in user's .submissions directory for submissions.db
if Path.home().joinpath(".submissions", "submissions.db").exists():
database_path = Path.home().joinpath(".submissions", "submissions.db")
# NOTE: finally, look in the local dir
else:
database_path = package_dir.joinpath("submissions.db")
else:
if database_path == ":memory:":
pass
# NOTE: check if user defined path is directory
elif database_path.is_dir():
database_path = database_path.joinpath("submissions.db")
# NOTE: check if user defined path is a file
elif database_path.is_file():
database_path = database_path
else:
raise FileNotFoundError("No database file found. Exiting program.")
template = jinja_template_loading().from_string(
"{{ values['database_schema'] }}://{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}{{ values['database_path'] }}")
database_path = template.render(values=values.data)
# print(f"Using {database_path} for database path")
# database_path = values.data['database_path']
# if database_path is None:
# # NOTE: check in user's .submissions directory for submissions.db
# if Path.home().joinpath(".submissions", "submissions.db").exists():
# database_path = Path.home().joinpath(".submissions", "submissions.db")
# # NOTE: finally, look in the local dir
# else:
# database_path = package_dir.joinpath("submissions.db")
# else:
# if database_path == ":memory:":
# pass
# # NOTE: check if user defined path is directory
# elif database_path.is_dir():
# database_path = database_path.joinpath("submissions.db")
# # NOTE: check if user defined path is a file
# elif database_path.is_file():
# database_path = database_path
# else:
# raise FileNotFoundError("No database file found. Exiting program.")
logger.info(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}") #, echo=True, future=True)
# engine = create_engine(f"sqlite:///{database_path}") #, echo=True, future=True)
# engine = create_engine("postgresql+psycopg2://postgres:RE,4321q@localhost:5432/submissions")
engine = create_engine(database_path)
session = Session(engine)
return session
@@ -316,13 +336,21 @@ class Settings(BaseSettings, extra="allow"):
def set_from_db(self, db_path: Path):
if 'pytest' in sys.modules:
config_items = dict(power_users=['lwark', 'styson', 'ruwang'])
output = dict(power_users=['lwark', 'styson', 'ruwang'])
else:
session = Session(create_engine(f"sqlite:///{db_path}"))
# session = Session(create_engine(f"sqlite:///{db_path}"))
session = self.database_session
config_items = session.execute(text("SELECT * FROM _configitem")).all()
session.close()
config_items = {item[1]: json.loads(item[2]) for item in config_items}
for k, v in config_items.items():
# print(config_items)
output = {}
for item in config_items:
try:
output[item[1]] = json.loads(item[2])
except (JSONDecodeError, TypeError):
output[item[1]] = item[2]
# config_items = {item[1]: json.loads(item[2]) for item in config_items}
for k, v in output.items():
if not hasattr(self, k):
self.__setattr__(k, v)
@@ -355,7 +383,6 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
CONFIGDIR.mkdir(parents=True)
except FileExistsError:
logger.warning(f"Config directory {CONFIGDIR} already exists.")
try:
LOGDIR.mkdir(parents=True)
except FileExistsError:
@@ -373,7 +400,7 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
if check_if_app():
settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml")
else:
settings_path = package_dir.joinpath('config.yml')
settings_path = package_dir.joinpath('src', 'config.yml')
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
# NOTE: Tell program we need to copy the config.yml to the user directory
@@ -502,7 +529,7 @@ def setup_logger(verbosity: int = 3):
# NOTE: create console handler with a higher log level
# NOTE: create custom logger with STERR -> log
ch = logging.StreamHandler(stream=sys.stdout)
# NOTE: set looging level based on verbosity
# NOTE: set logging level based on verbosity
match verbosity:
case 3:
ch.setLevel(logging.DEBUG)
@@ -542,10 +569,10 @@ def copy_settings(settings_path: Path, settings: dict) -> dict:
dict: output dictionary for use in first run
"""
# NOTE: if the current user is not a superuser remove the superusers entry
if not getpass.getuser() in settings['super_users']:
del settings['super_users']
if not getpass.getuser() in settings['power_users']:
del settings['power_users']
# if not getpass.getuser() in settings['super_users']:
# del settings['super_users']
# if not getpass.getuser() in settings['power_users']:
# del settings['power_users']
if not settings_path.exists():
with open(settings_path, 'w') as f:
yaml.dump(settings, f)
@@ -651,7 +678,7 @@ class Report(BaseModel):
Args:
result (Result | Report | None): Results to be added.
"""
"""
match result:
case Result():
logger.info(f"Adding {result} to results.")
@@ -668,7 +695,7 @@ class Report(BaseModel):
logger.error(f"Unknown variable type: {type(result)} for <Result> entry into <Report>")
def rreplace(s:str, old:str, new:str) -> str:
def rreplace(s: str, old: str, new: str) -> str:
"""
Removes rightmost occurence of a substring
@@ -679,18 +706,18 @@ def rreplace(s:str, old:str, new:str) -> str:
Returns:
str: updated string
"""
"""
return (s[::-1].replace(old[::-1], new[::-1], 1))[::-1]
def html_to_pdf(html:str, output_file: Path | str):
def html_to_pdf(html: str, output_file: Path | str):
"""
Attempts to print an html string as a PDF. (currently not working)
Args:
html (str): Input html string.
output_file (Path | str): Output PDF file path.
"""
"""
if isinstance(output_file, str):
output_file = Path(output_file)
logger.debug(f"Printing PDF to {output_file}")
@@ -732,7 +759,7 @@ def workbook_2_csv(worksheet: Worksheet, filename: Path):
Args:
worksheet (Worksheet): Incoming worksheet
filename (Path): Output csv filepath.
"""
"""
with open(filename, 'w', newline="") as f:
c = csv.writer(f)
for r in worksheet.rows:
@@ -748,7 +775,7 @@ def is_power_user() -> bool:
Returns:
bool: True if yes, False if no.
"""
"""
try:
check = getpass.getuser() in ctx.power_users
except:
@@ -773,3 +800,32 @@ def check_authorization(func):
return dict(code=1, message="This user does not have permission for this function.", status="warning")
return wrapper
def report_result(func):
def wrapper(*args, **kwargs):
logger.debug(f"Arguments: {args}")
logger.debug(f"Keyword arguments: {kwargs}")
output = func(*args, **kwargs)
if isinstance(output, tuple):
report = [item for item in output if isinstance(item, Report)][0]
else:
report = None
logger.debug(f"Got report: {report}")
try:
results = report.results
except AttributeError:
logger.error("No results available")
results = []
for iii, result in enumerate(results):
logger.debug(f"Result {iii}: {result}")
try:
dlg = result.report()
dlg.exec()
except Exception as e:
logger.error(f"Problem reporting due to {e}")
logger.error(result.msg)
logger.debug(f"Returning: {output}")
return output
return wrapper