Created Sample verification before import.
This commit is contained in:
@@ -714,7 +714,6 @@ class BasicSubmission(BaseClass, LogMixin):
|
||||
# logger.debug(f"Returning regex: {regex}")
|
||||
return regex
|
||||
|
||||
|
||||
# NOTE: Polymorphic functions
|
||||
|
||||
@classproperty
|
||||
@@ -1130,13 +1129,13 @@ class BasicSubmission(BaseClass, LogMixin):
|
||||
case date():
|
||||
pass
|
||||
case datetime():
|
||||
end_date = end_date# + timedelta(days=1)
|
||||
end_date = end_date # + timedelta(days=1)
|
||||
# pass
|
||||
case int():
|
||||
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date()# \
|
||||
# + timedelta(days=1)
|
||||
end_date = datetime.fromordinal(datetime(1900, 1, 1).toordinal() + end_date - 2).date() # \
|
||||
# + timedelta(days=1)
|
||||
case _:
|
||||
end_date = parse(end_date).date()# + timedelta(days=1)
|
||||
end_date = parse(end_date).date() # + timedelta(days=1)
|
||||
# end_date = end_date.strftime("%Y-%m-%d")
|
||||
start_date = datetime.combine(start_date, datetime.min.time()).strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
end_date = datetime.combine(end_date, datetime.max.time()).strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
@@ -1224,10 +1223,11 @@ class BasicSubmission(BaseClass, LogMixin):
|
||||
else:
|
||||
from frontend.widgets.pop_ups import QuestionAsker
|
||||
logger.warning(f"Found existing instance: {instance}, asking to overwrite.")
|
||||
# code = 1
|
||||
# msg = "This submission already exists.\nWould you like to overwrite?"
|
||||
# report.add_result(Result(msg=msg, code=code))
|
||||
dlg = QuestionAsker(title="Overwrite?", message="This submission already exists.\nWould you like to overwrite?")
|
||||
# code = 1
|
||||
# msg = "This submission already exists.\nWould you like to overwrite?"
|
||||
# report.add_result(Result(msg=msg, code=code))
|
||||
dlg = QuestionAsker(title="Overwrite?",
|
||||
message="This submission already exists.\nWould you like to overwrite?")
|
||||
if dlg.exec():
|
||||
pass
|
||||
else:
|
||||
@@ -1529,10 +1529,23 @@ class BacterialCulture(BasicSubmission):
|
||||
main_sheet = xl[lookup_table['sheet']]
|
||||
for row in main_sheet.iter_rows(min_row=lookup_table['start_row'], max_row=lookup_table['end_row']):
|
||||
idx = row[0].row
|
||||
sample = dict(submitter_id=main_sheet.cell(row=idx, column=lookup_table['sample_columns']['submitter_id']).value)
|
||||
sample['concentration'] = main_sheet.cell(row=idx, column=lookup_table['sample_columns']['concentration']).value
|
||||
sample = dict(
|
||||
submitter_id=main_sheet.cell(row=idx, column=lookup_table['sample_columns']['submitter_id']).value)
|
||||
sample['concentration'] = main_sheet.cell(row=idx,
|
||||
column=lookup_table['sample_columns']['concentration']).value
|
||||
yield sample
|
||||
|
||||
def get_provisional_controls(self):
|
||||
if self.controls:
|
||||
provs = (control.sample for control in self.controls)
|
||||
else:
|
||||
regex = re.compile(r"^(ATCC)|(MCS)|(EN)")
|
||||
provs = (sample for sample in self.samples if bool(regex.match(sample.submitter_id)))
|
||||
for prov in provs:
|
||||
prov.submission = self.rsl_plate_num
|
||||
prov.submitted_date = self.submitted_date
|
||||
yield prov
|
||||
|
||||
|
||||
class Wastewater(BasicSubmission):
|
||||
"""
|
||||
@@ -1827,7 +1840,7 @@ class WastewaterArtic(BasicSubmission):
|
||||
artic_date = Column(TIMESTAMP) #: Date Artic Performed
|
||||
ngs_date = Column(TIMESTAMP) #: Date submission received
|
||||
gel_date = Column(TIMESTAMP) #: Date submission received
|
||||
gel_barcode = Column(String(16)) #: Identifier for the used gel.
|
||||
gel_barcode = Column(String(16)) #: Identifier for the used gel.
|
||||
|
||||
__mapper_args__ = dict(polymorphic_identity="Wastewater Artic",
|
||||
polymorphic_load="inline",
|
||||
@@ -2767,10 +2780,14 @@ class BacterialCultureSample(BasicSample):
|
||||
sample = super().to_sub_dict(full_data=full_data)
|
||||
sample['name'] = self.submitter_id
|
||||
sample['organism'] = self.organism
|
||||
sample['concentration'] = self.concentration
|
||||
try:
|
||||
sample['concentration'] = f"{float(self.concentration):.2f}"
|
||||
except TypeError:
|
||||
sample['concentration'] = 0.0
|
||||
if self.control is not None:
|
||||
sample['colour'] = [0, 128, 0]
|
||||
target = next((v for k,v in self.control.controltype.targets.items() if k == self.control.subtype), "Not Available")
|
||||
target = next((v for k, v in self.control.controltype.targets.items() if k == self.control.subtype),
|
||||
"Not Available")
|
||||
try:
|
||||
target = ", ".join(target)
|
||||
except:
|
||||
|
||||
@@ -202,19 +202,21 @@ class ConcentrationMaker(ReportArchetype):
|
||||
# NOTE: Set page size to zero to override limiting query size.
|
||||
self.subs = BasicSubmission.query(start_date=start_date, end_date=end_date,
|
||||
submission_type_name=submission_type, page_size=0)
|
||||
self.controls = list(itertools.chain.from_iterable([sub.controls for sub in self.subs]))
|
||||
# self.known_controls = list(itertools.chain.from_iterable([sub.controls for sub in self.subs]))
|
||||
self.controls = list(itertools.chain.from_iterable([sub.get_provisional_controls() for sub in self.subs]))
|
||||
self.records = [self.build_record(control) for control in self.controls]
|
||||
self.df = DataFrame.from_records(self.records)
|
||||
self.sheet_name = "Concentration"
|
||||
|
||||
@classmethod
|
||||
def build_record(cls, control: IridaControl) -> dict:
|
||||
positive = control.is_positive_control
|
||||
concentration = control.sample.concentration
|
||||
if not concentration:
|
||||
concentration = 0
|
||||
return dict(name=control.name,
|
||||
submission=str(control.submission.rsl_plate_num), concentration=concentration,
|
||||
def build_record(cls, control) -> dict:
|
||||
positive = not control.submitter_id.lower().startswith("en")
|
||||
try:
|
||||
concentration = float(control.concentration)
|
||||
except (TypeError, ValueError):
|
||||
concentration = 0.0
|
||||
return dict(name=control.submitter_id,
|
||||
submission=str(control.submission), concentration=concentration,
|
||||
submitted_date=control.submitted_date, positive=positive)
|
||||
|
||||
|
||||
|
||||
@@ -195,16 +195,6 @@ class PydSample(BaseModel, extra='allow'):
|
||||
pass
|
||||
return value
|
||||
|
||||
def improved_dict(self) -> dict:
|
||||
"""
|
||||
Constructs a dictionary consisting of model.fields and model.extras
|
||||
|
||||
Returns:
|
||||
dict: Information dictionary
|
||||
"""
|
||||
fields = list(self.model_fields.keys()) + list(self.model_extra.keys())
|
||||
return {k: getattr(self, k) for k in fields}
|
||||
|
||||
@report_result
|
||||
def to_sql(self, submission: BasicSubmission | str = None) -> Tuple[
|
||||
BasicSample, List[SubmissionSampleAssociation], Result | None]:
|
||||
@@ -1010,6 +1000,18 @@ class PydSubmission(BaseModel, extra='allow'):
|
||||
for r in worksheet.rows:
|
||||
c.writerow([cell.value for cell in r])
|
||||
|
||||
@property
|
||||
def sample_list(self) -> List[dict]:
|
||||
samples = []
|
||||
for sample in self.samples:
|
||||
sample = sample.improved_dict()
|
||||
sample['row'] = sample['row'][0]
|
||||
sample['column'] = sample['column'][0]
|
||||
sample['submission_rank'] = sample['submission_rank'][0]
|
||||
samples.append(sample)
|
||||
samples = sorted(samples, key=itemgetter("submission_rank"))
|
||||
return samples
|
||||
|
||||
|
||||
class PydContact(BaseModel):
|
||||
name: str
|
||||
|
||||
67
src/submissions/frontend/widgets/sample_checker.py
Normal file
67
src/submissions/frontend/widgets/sample_checker.py
Normal file
@@ -0,0 +1,67 @@
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from PyQt6.QtCore import Qt, pyqtSlot
|
||||
from PyQt6.QtWebChannel import QWebChannel
|
||||
from PyQt6.QtWebEngineWidgets import QWebEngineView
|
||||
from PyQt6.QtWidgets import (QDialog, QPushButton, QVBoxLayout,
|
||||
QDialogButtonBox, QTextEdit, QGridLayout)
|
||||
|
||||
from backend.validators import PydSubmission
|
||||
from tools import get_application_from_parent, jinja_template_loading
|
||||
|
||||
env = jinja_template_loading()
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
class SampleChecker(QDialog):
|
||||
|
||||
def __init__(self, parent, title:str, pyd: PydSubmission):
|
||||
super().__init__(parent)
|
||||
self.pyd = pyd
|
||||
self.setWindowTitle(title)
|
||||
self.app = get_application_from_parent(parent)
|
||||
self.webview = QWebEngineView(parent=self)
|
||||
self.webview.setMinimumSize(900, 500)
|
||||
self.webview.setMaximumWidth(900)
|
||||
self.layout = QGridLayout()
|
||||
self.layout.addWidget(self.webview, 0, 0, 10, 10)
|
||||
# NOTE: setup channel
|
||||
self.channel = QWebChannel()
|
||||
self.channel.registerObject('backend', self)
|
||||
# NOTE: Used to maintain javascript functions.
|
||||
# self.webview.page().setWebChannel(self.channel)
|
||||
template = env.get_template("sample_checker.html")
|
||||
template_path = Path(template.environment.loader.__getattribute__("searchpath")[0])
|
||||
with open(template_path.joinpath("css", "styles.css"), "r") as f:
|
||||
css = f.read()
|
||||
html = template.render(samples=pyd.sample_list, css=css)
|
||||
self.webview.setHtml(html)
|
||||
QBtn = QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel
|
||||
self.buttonBox = QDialogButtonBox(QBtn)
|
||||
self.buttonBox.accepted.connect(self.accept)
|
||||
self.buttonBox.rejected.connect(self.reject)
|
||||
self.layout.addWidget(self.buttonBox, 11, 9, 1, 1, alignment=Qt.AlignmentFlag.AlignRight)
|
||||
self.setLayout(self.layout)
|
||||
self.webview.page().setWebChannel(self.channel)
|
||||
|
||||
@pyqtSlot(str, str, str)
|
||||
def text_changed(self, submission_rank: str, key: str, new_value: str):
|
||||
logger.debug(f"Name: {submission_rank}, Key: {key}, Value: {new_value}")
|
||||
match key:
|
||||
case "row" | "column":
|
||||
value = [new_value]
|
||||
case _:
|
||||
value = new_value
|
||||
try:
|
||||
item = next((sample for sample in self.pyd.samples if int(submission_rank) in sample.submission_rank))
|
||||
except StopIteration:
|
||||
logger.error(f"Unable to find sample {submission_rank}")
|
||||
item.__setattr__(key, value)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -22,6 +22,8 @@ from .omni_add_edit import AddEdit
|
||||
from typing import List, Tuple
|
||||
from datetime import date
|
||||
|
||||
from .sample_checker import SampleChecker
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
|
||||
@@ -135,8 +137,16 @@ class SubmissionFormContainer(QWidget):
|
||||
except AttributeError:
|
||||
self.prsr = SheetParser(filepath=fname)
|
||||
self.pyd = self.prsr.to_pydantic()
|
||||
self.form = self.pyd.to_form(parent=self)
|
||||
self.layout().addWidget(self.form)
|
||||
# logger.debug(f"Samples: {pformat(self.pyd.samples)}")
|
||||
checker = SampleChecker(self, "Sample Checker", self.pyd)
|
||||
if checker.exec():
|
||||
logger.debug(pformat(self.pyd.samples))
|
||||
self.form = self.pyd.to_form(parent=self)
|
||||
self.layout().addWidget(self.form)
|
||||
else:
|
||||
message = "Submission cancelled."
|
||||
logger.warning(message)
|
||||
report.add_result(Result(msg=message, owner=self.__class__.__name__, status="Warning"))
|
||||
return report
|
||||
|
||||
@report_result
|
||||
|
||||
@@ -39,3 +39,5 @@
|
||||
text-decoration-color: #ff33ff;
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
|
||||
|
||||
44
src/submissions/templates/sample_checker.html
Normal file
44
src/submissions/templates/sample_checker.html
Normal file
@@ -0,0 +1,44 @@
|
||||
{% extends "details.html" %}
|
||||
<head>
|
||||
{% block head %}
|
||||
{{ super() }}
|
||||
<title>Sample Checker</title>
|
||||
{% endblock %}
|
||||
</head>
|
||||
<body>
|
||||
{% block body %}
|
||||
<h2><u>Sample Checker</u></h2>
|
||||
<br>
|
||||
<p>Take a moment to verify sample names.</p>
|
||||
<br>
|
||||
<form>
|
||||
  Submitter ID              Row           Column<br/>
|
||||
{% for sample in samples %}
|
||||
{{ '%02d' % sample['submission_rank'] }}
|
||||
<input type="text" id="{{ sample['submission_rank'] }}_id" name="submitter_id" value="{{ sample['submitter_id'] }}" size="40">
|
||||
<input type="number" id="{{ sample['submission_rank'] }}_row" name="row" value="{{ sample['row'] }}" size="5", min="1">
|
||||
<input type="number" id="{{ sample['submission_rank'] }}_col" name="column" value="{{ sample['column'] }}" size="5", min="1">
|
||||
<br/>
|
||||
{% endfor %}
|
||||
</form>
|
||||
{% endblock %}
|
||||
</body>
|
||||
<script>
|
||||
{% block script %}
|
||||
{{ super() }}
|
||||
{% for sample in samples %}
|
||||
document.getElementById("{{ sample['submission_rank'] }}_id").addEventListener("input", function(){
|
||||
backend.text_changed("{{ sample['submission_rank'] }}", this.name, this.value);
|
||||
});
|
||||
document.getElementById("{{ sample['submission_rank'] }}_row").addEventListener("input", function(){
|
||||
backend.text_changed("{{ sample['submission_rank'] }}", this.name, this.value);
|
||||
});
|
||||
document.getElementById("{{ sample['submission_rank'] }}_column").addEventListener("input", function(){
|
||||
backend.text_changed("{{ sample['submission_rank'] }}", this.name, this.value);
|
||||
});
|
||||
{% endfor %}
|
||||
document.addEventListener('DOMContentLoaded', function() {
|
||||
backend.activate_export(false);
|
||||
}, false);
|
||||
{% endblock %}
|
||||
</script>
|
||||
@@ -259,388 +259,6 @@ def timer(func):
|
||||
return wrapper
|
||||
|
||||
|
||||
# Settings
|
||||
|
||||
# class Settings(BaseSettings, extra="allow"):
|
||||
# """
|
||||
# Pydantic model to hold settings
|
||||
#
|
||||
# Raises:
|
||||
# FileNotFoundError: Error if database not found.
|
||||
#
|
||||
# """
|
||||
# database_schema: str | None = None
|
||||
# directory_path: Path | None = None
|
||||
# database_user: str | None = None
|
||||
# database_password: str | None = None
|
||||
# database_name: str | None = None
|
||||
# database_path: Path | str | None = None
|
||||
# backup_path: Path | str | None = None
|
||||
# submission_types: dict | None = None
|
||||
# database_session: Session | None = None
|
||||
# package: Any | None = None
|
||||
# logging_enabled: bool = Field(default=False)
|
||||
#
|
||||
# model_config = SettingsConfigDict(env_file_encoding='utf-8')
|
||||
#
|
||||
# # model_config = SettingsConfigDict(yaml_file="C:\\Users\lwark\AppData\Local\submissions\config\config.yml",
|
||||
# # yaml_file_encoding='utf-8')
|
||||
#
|
||||
# # @classmethod
|
||||
# # def settings_customise_sources(
|
||||
# # cls,
|
||||
# # settings_cls: type[BaseSettings],
|
||||
# # init_settings: PydanticBaseSettingsSource,
|
||||
# # env_settings: PydanticBaseSettingsSource,
|
||||
# # dotenv_settings: PydanticBaseSettingsSource,
|
||||
# # file_secret_settings: PydanticBaseSettingsSource,
|
||||
# # ) -> tuple[PydanticBaseSettingsSource, ...]:
|
||||
# # return (
|
||||
# # YamlConfigSettingsSource(settings_cls),
|
||||
# # init_settings,
|
||||
# # env_settings,
|
||||
# # dotenv_settings,
|
||||
# # file_secret_settings,
|
||||
# # )
|
||||
#
|
||||
# @field_validator('database_schema', mode="before")
|
||||
# @classmethod
|
||||
# def set_schema(cls, value):
|
||||
# if value is None:
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='schema')
|
||||
# if value is None:
|
||||
# value = "sqlite"
|
||||
# return value
|
||||
#
|
||||
# @field_validator('backup_path', mode="before")
|
||||
# @classmethod
|
||||
# def set_backup_path(cls, value, values):
|
||||
# match value:
|
||||
# case str():
|
||||
# value = Path(value)
|
||||
# case None:
|
||||
# value = values.data['directory_path'].joinpath("Database backups")
|
||||
# if not value.exists():
|
||||
# try:
|
||||
# value.mkdir(parents=True)
|
||||
# except OSError:
|
||||
# value = Path(askdirectory(title="Directory for backups."))
|
||||
# return value
|
||||
#
|
||||
# @field_validator('directory_path', mode="before")
|
||||
# @classmethod
|
||||
# def ensure_directory_exists(cls, value, values):
|
||||
# if value is None:
|
||||
# match values.data['database_schema']:
|
||||
# case "sqlite":
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent
|
||||
# case _:
|
||||
# Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing
|
||||
# value = Path(askdirectory(
|
||||
# title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file
|
||||
# if isinstance(value, str):
|
||||
# value = Path(value)
|
||||
# try:
|
||||
# check = value.exists()
|
||||
# except AttributeError:
|
||||
# check = False
|
||||
# if not check:
|
||||
# value.mkdir(exist_ok=True)
|
||||
# return value
|
||||
#
|
||||
# @field_validator('database_path', mode="before")
|
||||
# @classmethod
|
||||
# def ensure_database_exists(cls, value, values):
|
||||
# match values.data['database_schema']:
|
||||
# case "sqlite":
|
||||
# if value is None:
|
||||
# value = values.data['directory_path']
|
||||
# if isinstance(value, str):
|
||||
# value = Path(value)
|
||||
# case _:
|
||||
# if value is None:
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent
|
||||
# return value
|
||||
#
|
||||
# @field_validator('database_name', mode='before')
|
||||
# @classmethod
|
||||
# def get_database_name(cls, value):
|
||||
# if value is None:
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').stem
|
||||
# return value
|
||||
#
|
||||
# @field_validator("database_user", mode='before')
|
||||
# @classmethod
|
||||
# def get_user(cls, value):
|
||||
# if value is None:
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='user')
|
||||
# return value
|
||||
#
|
||||
# @field_validator("database_password", mode='before')
|
||||
# @classmethod
|
||||
# def get_pass(cls, value):
|
||||
# if value is None:
|
||||
# if check_if_app():
|
||||
# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||
# else:
|
||||
# alembic_path = project_path.joinpath("alembic.ini")
|
||||
# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='pass')
|
||||
# return value
|
||||
#
|
||||
# @field_validator('database_session', mode="before")
|
||||
# @classmethod
|
||||
# def create_database_session(cls, value, values):
|
||||
# if value is not None:
|
||||
# return value
|
||||
# else:
|
||||
# match values.data['database_schema']:
|
||||
# case "sqlite":
|
||||
# value = f"/{values.data['database_path']}"
|
||||
# db_name = f"{values.data['database_name']}.db"
|
||||
# template = jinja_template_loading().from_string(
|
||||
# "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}")
|
||||
# case "mssql+pyodbc":
|
||||
# value = values.data['database_path']
|
||||
# db_name = values.data['database_name']
|
||||
# template = jinja_template_loading().from_string(
|
||||
# "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Trusted_Connection=yes"
|
||||
# )
|
||||
# case _:
|
||||
# tmp = jinja_template_loading().from_string(
|
||||
# "{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}@{{ values['database_path'] }}")
|
||||
# value = tmp.render(values=values.data)
|
||||
# db_name = values.data['database_name']
|
||||
# database_path = template.render(values=values.data, value=value, db_name=db_name)
|
||||
# print(f"Using {database_path} for database path")
|
||||
# engine = create_engine(database_path)
|
||||
# session = Session(engine)
|
||||
# return session
|
||||
#
|
||||
# @field_validator('package', mode="before")
|
||||
# @classmethod
|
||||
# def import_package(cls, value):
|
||||
# import __init__ as package
|
||||
# if value is None:
|
||||
# return package
|
||||
#
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# super().__init__(*args, **kwargs)
|
||||
#
|
||||
# self.set_from_db()
|
||||
# self.set_scripts()
|
||||
#
|
||||
# def set_from_db(self):
|
||||
# if 'pytest' in sys.modules:
|
||||
# output = dict(power_users=['lwark', 'styson', 'ruwang'],
|
||||
# startup_scripts=dict(hello=None),
|
||||
# teardown_scripts=dict(goodbye=None)
|
||||
# )
|
||||
# else:
|
||||
# session = self.database_session
|
||||
# metadata = MetaData()
|
||||
# try:
|
||||
# metadata.reflect(bind=session.get_bind())
|
||||
# except AttributeError as e:
|
||||
# print(f"Error getting tables: {e}")
|
||||
# return
|
||||
# if "_configitem" not in metadata.tables.keys():
|
||||
# print(f"Couldn't find _configitems in {metadata.tables.keys()}.")
|
||||
# return
|
||||
# config_items = session.execute(text("SELECT * FROM _configitem")).all()
|
||||
# output = {}
|
||||
# for item in config_items:
|
||||
# try:
|
||||
# output[item[1]] = json.loads(item[2])
|
||||
# except (JSONDecodeError, TypeError):
|
||||
# output[item[1]] = item[2]
|
||||
# for k, v in output.items():
|
||||
# if not hasattr(self, k):
|
||||
# self.__setattr__(k, v)
|
||||
#
|
||||
# def set_scripts(self):
|
||||
# """
|
||||
# Imports all functions from "scripts" folder, adding them to ctx scripts
|
||||
# """
|
||||
# if check_if_app():
|
||||
# p = Path(sys._MEIPASS).joinpath("files", "scripts")
|
||||
# else:
|
||||
# p = Path(__file__).parents[2].joinpath("scripts").absolute()
|
||||
# if p.__str__() not in sys.path:
|
||||
# sys.path.append(p.__str__())
|
||||
# # NOTE: Get all .py files that don't have __ in them.
|
||||
# modules = p.glob("[!__]*.py")
|
||||
# for module in modules:
|
||||
# mod = importlib.import_module(module.stem)
|
||||
# for function in getmembers(mod, isfunction):
|
||||
# name = function[0]
|
||||
# func = function[1]
|
||||
# # NOTE: assign function based on its name being in config: startup/teardown
|
||||
# # NOTE: scripts must be registered using {name: Null} in the database
|
||||
# if name in self.startup_scripts.keys():
|
||||
# self.startup_scripts[name] = func
|
||||
# if name in self.teardown_scripts.keys():
|
||||
# self.teardown_scripts[name] = func
|
||||
#
|
||||
# @timer
|
||||
# def run_startup(self):
|
||||
# """
|
||||
# Runs startup scripts.
|
||||
# """
|
||||
# for script in self.startup_scripts.values():
|
||||
# try:
|
||||
# logger.info(f"Running startup script: {script.__name__}")
|
||||
# thread = Thread(target=script, args=(ctx,))
|
||||
# thread.start()
|
||||
# except AttributeError:
|
||||
# logger.error(f"Couldn't run startup script: {script}")
|
||||
#
|
||||
# @timer
|
||||
# def run_teardown(self):
|
||||
# """
|
||||
# Runs teardown scripts.
|
||||
# """
|
||||
# for script in self.teardown_scripts.values():
|
||||
# try:
|
||||
# logger.info(f"Running teardown script: {script.__name__}")
|
||||
# thread = Thread(target=script, args=(ctx,))
|
||||
# thread.start()
|
||||
# except AttributeError:
|
||||
# logger.error(f"Couldn't run teardown script: {script}")
|
||||
#
|
||||
# @classmethod
|
||||
# def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str:
|
||||
# c = ConfigParser()
|
||||
# c.read(alembic_path)
|
||||
# url = c['alembic']['sqlalchemy.url']
|
||||
# match mode:
|
||||
# case 'path':
|
||||
# path = re.sub(r"^.*//", "", url)
|
||||
# path = re.sub(r"^.*@", "", path)
|
||||
# return Path(path)
|
||||
# case "schema":
|
||||
# return url[:url.index(":")]
|
||||
# case "user":
|
||||
# url = re.sub(r"^.*//", "", url)
|
||||
# try:
|
||||
# return url[:url.index("@")].split(":")[0]
|
||||
# except (IndexError, ValueError) as e:
|
||||
# return None
|
||||
# case "pass":
|
||||
# url = re.sub(r"^.*//", "", url)
|
||||
# try:
|
||||
# return url[:url.index("@")].split(":")[1]
|
||||
# except (IndexError, ValueError) as e:
|
||||
# return None
|
||||
#
|
||||
# def save(self, settings_path: Path):
|
||||
# if not settings_path.exists():
|
||||
# dicto = {}
|
||||
# for k, v in self.__dict__.items():
|
||||
# if k in ['package', 'database_session', 'submission_types']:
|
||||
# continue
|
||||
# match v:
|
||||
# case Path():
|
||||
# if v.is_dir():
|
||||
# v = v.absolute().__str__()
|
||||
# elif v.is_file():
|
||||
# v = v.parent.absolute().__str__()
|
||||
# else:
|
||||
# v = v.__str__()
|
||||
# case _:
|
||||
# pass
|
||||
# dicto[k] = v
|
||||
# with open(settings_path, 'w') as f:
|
||||
# yaml.dump(dicto, f)
|
||||
#
|
||||
#
|
||||
# def get_config(settings_path: Path | str | None = None) -> Settings:
|
||||
# """
|
||||
# Get configuration settings from path or default if blank.
|
||||
#
|
||||
# Args:
|
||||
# settings_path (Path | str | None, optional): Path to config.yml Defaults to None.
|
||||
# override (dict | None, optional): dictionary of settings to be used instead of file. Defaults to None.
|
||||
#
|
||||
# Returns:
|
||||
# Settings: Pydantic settings object
|
||||
# """
|
||||
# if isinstance(settings_path, str):
|
||||
# settings_path = Path(settings_path)
|
||||
#
|
||||
# # NOTE: custom pyyaml constructor to join fields
|
||||
# def join(loader, node):
|
||||
# seq = loader.construct_sequence(node)
|
||||
# return ''.join([str(i) for i in seq])
|
||||
# # NOTE: register the tag handler
|
||||
# yaml.add_constructor('!join', join)
|
||||
# # NOTE: make directories
|
||||
# try:
|
||||
# CONFIGDIR.mkdir(parents=True)
|
||||
# except FileExistsError:
|
||||
# logger.warning(f"Config directory {CONFIGDIR} already exists.")
|
||||
# try:
|
||||
# LOGDIR.mkdir(parents=True)
|
||||
# except FileExistsError:
|
||||
# logger.warning(f"Logging directory {LOGDIR} already exists.")
|
||||
# # NOTE: if user hasn't defined config path in cli args
|
||||
# if settings_path is None:
|
||||
# # NOTE: Check user .config/submissions directory
|
||||
# if CONFIGDIR.joinpath("config.yml").exists():
|
||||
# settings_path = CONFIGDIR.joinpath("config.yml")
|
||||
# # NOTE: Check user .submissions directory
|
||||
# elif Path.home().joinpath(".submissions", "config.yml").exists():
|
||||
# settings_path = Path.home().joinpath(".submissions", "config.yml")
|
||||
# # NOTE: finally look in the local config
|
||||
# else:
|
||||
# if check_if_app():
|
||||
# settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml")
|
||||
# else:
|
||||
# settings_path = project_path.joinpath('src', 'config.yml')
|
||||
# with open(settings_path, "r") as dset:
|
||||
# default_settings = yaml.load(dset, Loader=yaml.Loader)
|
||||
# # NOTE: Tell program we need to copy the config.yml to the user directory
|
||||
# # NOTE: copy settings to config directory
|
||||
# settings = Settings(**default_settings)
|
||||
# settings.save(settings_path=CONFIGDIR.joinpath("config.yml"))
|
||||
# return settings
|
||||
# else:
|
||||
# # NOTE: check if user defined path is directory
|
||||
# if settings_path.is_dir():
|
||||
# settings_path = settings_path.joinpath("config.yml")
|
||||
# # NOTE: check if user defined path is file
|
||||
# elif settings_path.is_file():
|
||||
# settings_path = settings_path
|
||||
# else:
|
||||
# logger.error("No config.yml file found. Writing to directory.")
|
||||
# with open(settings_path, "r") as dset:
|
||||
# default_settings = yaml.load(dset, Loader=yaml.Loader)
|
||||
# settings = Settings(**default_settings)
|
||||
# settings.save(settings_path=settings_path)
|
||||
# with open(settings_path, "r") as stream:
|
||||
# settings = yaml.load(stream, Loader=yaml.Loader)
|
||||
# return Settings(**settings)
|
||||
#
|
||||
|
||||
def check_if_app() -> bool:
|
||||
"""
|
||||
Checks if the program is running from pyinstaller compiled
|
||||
|
||||
Reference in New Issue
Block a user