Created omni-manager, omni-addit

This commit is contained in:
lwark
2025-01-03 12:37:03 -06:00
parent 482b641569
commit b55258f677
19 changed files with 502 additions and 77 deletions

View File

@@ -0,0 +1,43 @@
"""
script meant to copy database data to new file. Currently for Sqlite only
"""
import logging, shutil, pyodbc
from datetime import date
from pathlib import Path
from tools import Settings
logger = logging.getLogger(f"submissions.{__name__}")
def backup_database(ctx: Settings):
"""
Copies the database into the backup directory the first time it is opened every month.
"""
month = date.today().strftime("%Y-%m")
current_month_bak = Path(ctx.backup_path).joinpath(f"submissions_backup-{month}").resolve()
logger.info(f"Here is the db directory: {ctx.database_path}")
logger.info(f"Here is the backup directory: {ctx.backup_path}")
match ctx.database_schema:
case "sqlite":
db_path = ctx.database_path.joinpath(ctx.database_name).with_suffix(".db")
current_month_bak = current_month_bak.with_suffix(".db")
if not current_month_bak.exists() and "Archives" not in db_path.__str__():
logger.info("No backup found for this month, backing up database.")
try:
shutil.copyfile(db_path, current_month_bak)
except PermissionError as e:
logger.error(f"Couldn't backup database due to: {e}")
case "postgresql+psycopg2":
logger.warning(f"Backup function not yet implemented for psql")
current_month_bak = current_month_bak.with_suffix(".psql")
case "mssql+pyodbc":
logger.warning(f"{ctx.database_schema} backup is currently experiencing permission issues")
current_month_bak = current_month_bak.with_suffix(".bak")
return
if not current_month_bak.exists():
logger.info(f"No backup found for this month, backing up database to {current_month_bak}.")
connection = pyodbc.connect(driver='{ODBC Driver 18 for SQL Server}',
server=f'{ctx.database_path}', database=f'{ctx.database_name}',
trusted_connection='yes', trustservercertificate="yes", autocommit=True)
backup = f"BACKUP DATABASE [{ctx.database_name}] TO DISK = N'{current_month_bak}'"
cursor = connection.cursor().execute(backup)
connection.close()

22
src/scripts/goodbye.py Normal file
View File

@@ -0,0 +1,22 @@
"""
Test script for teardown_scripts
"""
def goodbye(ctx):
"""
Args:
ctx (Settings): All scripts must take ctx as an argument to maintain interoperability.
Returns:
None: Scripts are currently unable to return results to the program.
"""
print("\n\nGoodbye. Thank you for using Robotics Submission Tracker.\n\n")
"""
For scripts to be run, they must be added to the _configitem.startup_scripts or _configitem.teardown_scripts
rows as a key: value (name: null) entry in the JSON.
ex: {"goodbye": null, "backup_database": null}
The program will overwrite null with the actual function upon startup.
"""

22
src/scripts/hello.py Normal file
View File

@@ -0,0 +1,22 @@
"""
Test script for startup_scripts
"""
def hello(ctx) -> None:
"""
Args:
ctx (Settings): All scripts must take ctx as an argument to maintain interoperability.
Returns:
None: Scripts are currently unable to return results to the program.
"""
print("\n\nHello! Welcome to Robotics Submission Tracker.\n\n")
"""
For scripts to be run, they must be added to the _configitem.startup_scripts or _configitem.teardown_scripts
rows as a key: value (name: null) entry in the JSON.
ex: {"hello": null, "import_irida": null}
The program will overwrite null with the actual function upon startup.
"""

View File

@@ -0,0 +1,58 @@
import logging, sqlite3, json
from pprint import pformat, pprint
from datetime import datetime
from tools import Settings
from sqlalchemy.orm import Session
logger = logging.getLogger(f"submissions.{__name__}")
def import_irida(ctx: Settings):
"""
Grabs Irida controls from secondary database.
Args:
ctx (Settings): Settings inherited from app.
"""
from backend import BasicSample
from backend.db import IridaControl, ControlType
# NOTE: Because the main session will be busy in another thread, this requires a new session.
new_session = Session(ctx.database_session.get_bind())
ct = new_session.query(ControlType).filter(ControlType.name == "Irida Control").first()
existing_controls = [item.name for item in new_session.query(IridaControl)]
prm_list = ", ".join([f"'{thing}'" for thing in existing_controls])
ctrl_db_path = ctx.directory_path.joinpath("submissions_parser_output", "submissions.db")
try:
conn = sqlite3.connect(ctrl_db_path)
except AttributeError as e:
logger.error(f"Error, could not import from irida due to {e}")
return
sql = "SELECT name, submitted_date, submission_id, contains, matches, kraken, subtype, refseq_version, " \
"kraken2_version, kraken2_db_version, sample_id FROM _iridacontrol INNER JOIN _control on _control.id " \
f"= _iridacontrol.id WHERE _control.name NOT IN ({prm_list})"
cursor = conn.execute(sql)
records = [
dict(name=row[0], submitted_date=row[1], submission_id=row[2], contains=row[3], matches=row[4], kraken=row[5],
subtype=row[6], refseq_version=row[7], kraken2_version=row[8], kraken2_db_version=row[9],
sample_id=row[10]) for row in cursor]
for record in records:
instance = new_session.query(IridaControl).filter(IridaControl.name == record['name']).first()
if instance:
logger.warning(f"Irida Control {instance.name} already exists, skipping.")
continue
for thing in ['contains', 'matches', 'kraken']:
if record[thing]:
record[thing] = json.loads(record[thing])
assert isinstance(record[thing], dict)
else:
record[thing] = {}
record['submitted_date'] = datetime.strptime(record['submitted_date'], "%Y-%m-%d %H:%M:%S.%f")
assert isinstance(record['submitted_date'], datetime)
instance = IridaControl(controltype=ct, **record)
sample = new_session.query(BasicSample).filter(BasicSample.submitter_id == instance.name).first()
if sample:
instance.sample = sample
instance.submission = sample.submissions[0]
new_session.add(instance)
new_session.commit()
new_session.close()