Debugging scripts import hell.

This commit is contained in:
lwark
2024-12-30 08:37:41 -06:00
parent 5fd36308b2
commit 0808b54264
22 changed files with 156 additions and 85 deletions

View File

@@ -2,11 +2,12 @@
Contains miscellaenous functions used by both frontend and backend.
'''
from __future__ import annotations
import importlib
import time
from datetime import date, datetime, timedelta
from json import JSONDecodeError
from pprint import pprint
import numpy as np
import logging, re, yaml, sys, os, stat, platform, getpass, inspect, json, pandas as pd
import logging, re, yaml, sys, os, stat, platform, getpass, inspect, json, numpy as np, pandas as pd
from dateutil.easter import easter
from jinja2 import Environment, FileSystemLoader
from logging import handlers
@@ -22,6 +23,7 @@ from tkinter import Tk # NOTE: This is for choosing database path before app is
from tkinter.filedialog import askdirectory
from sqlalchemy.exc import IntegrityError as sqlalcIntegrityError
from pytz import timezone as tz
from functools import wraps
timezone = tz("America/Winnipeg")
@@ -44,6 +46,7 @@ LOGDIR = main_aux_dir.joinpath("logs")
row_map = {1: "A", 2: "B", 3: "C", 4: "D", 5: "E", 6: "F", 7: "G", 8: "H"}
row_keys = {v: k for k, v in row_map.items()}
# NOTE: Sets background for uneditable comboboxes and date edits.
main_form_style = '''
QComboBox:!editable, QDateEdit {
background-color:light gray;
@@ -53,6 +56,7 @@ main_form_style = '''
page_size = 250
def divide_chunks(input_list: list, chunk_count: int):
"""
Divides a list into {chunk_count} equal parts
@@ -313,7 +317,7 @@ class Settings(BaseSettings, extra="allow"):
check = value.exists()
except AttributeError:
check = False
if not check:
if not check:
# print(f"No directory found, using Documents/submissions")
value.mkdir(exist_ok=True)
# print(f"Final return of directory_path: {value}")
@@ -417,6 +421,7 @@ class Settings(BaseSettings, extra="allow"):
super().__init__(*args, **kwargs)
self.set_from_db()
# self.set_startup_teardown()
# pprint(f"User settings:\n{self.__dict__}")
def set_from_db(self):
@@ -448,6 +453,15 @@ class Settings(BaseSettings, extra="allow"):
if not hasattr(self, k):
self.__setattr__(k, v)
def set_scripts(self):
"""
Imports all functions from "scripts" folder which will run their @registers, adding them to ctx scripts
"""
p = Path(__file__).parent.joinpath("scripts").absolute()
subs = [item.stem for item in p.glob("*.py") if "__" not in item.stem]
for sub in subs:
importlib.import_module(f"tools.scripts.{sub}")
@classmethod
def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str:
c = ConfigParser()
@@ -514,6 +528,7 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
def join(loader, node):
seq = loader.construct_sequence(node)
return ''.join([str(i) for i in seq])
# NOTE: register the tag handler
yaml.add_constructor('!join', join)
# NOTE: make directories
@@ -738,6 +753,7 @@ def setup_lookup(func):
func (_type_): wrapped function
"""
@wraps(func)
def wrapper(*args, **kwargs):
sanitized_kwargs = {}
for k, v in locals()['kwargs'].items():
@@ -881,7 +897,7 @@ def yaml_regex_creator(loader, node):
return f"(?P<{name}>RSL(?:-|_)?{abbr}(?:-|_)?20\d{2}-?\d{2}-?\d{2}(?:(_|-)?\d?([^_0123456789\sA-QS-Z]|$)?R?\d?)?)"
def super_splitter(ins_str:str, substring:str, idx:int) -> str:
def super_splitter(ins_str: str, substring: str, idx: int) -> str:
"""
Args:
@@ -898,9 +914,6 @@ def super_splitter(ins_str:str, substring:str, idx:int) -> str:
return ins_str
ctx = get_config(None)
def is_power_user() -> bool:
"""
Checks if user is in list of power users
@@ -930,8 +943,11 @@ def check_authorization(func):
else:
logger.error(f"User {getpass.getuser()} is not authorized for this function.")
report = Report()
report.add_result(Result(owner=func.__str__(), code=1, msg="This user does not have permission for this function.", status="warning"))
report.add_result(
Result(owner=func.__str__(), code=1, msg="This user does not have permission for this function.",
status="warning"))
return report
return wrapper
@@ -946,6 +962,8 @@ def report_result(func):
__type__: Output from decorated function
"""
@wraps(func)
def wrapper(*args, **kwargs):
logger.info(f"Report result being called by {func.__name__}")
output = func(*args, **kwargs)
@@ -980,16 +998,17 @@ def report_result(func):
else:
true_output = None
return true_output
return wrapper
def create_holidays_for_year(year: int|None=None) -> List[date]:
def find_nth_monday(year, month, occurence: int | None=None, day: int|None=None):
def create_holidays_for_year(year: int | None = None) -> List[date]:
def find_nth_monday(year, month, occurence: int | None = None, day: int | None = None):
if not occurence:
occurence = 1
if not day:
day = occurence * 7
max_days = (date(2012, month+1, 1) - date(2012, month, 1)).days
max_days = (date(2012, month + 1, 1) - date(2012, month, 1)).days
if day > max_days:
day = max_days
try:
@@ -999,12 +1018,13 @@ def create_holidays_for_year(year: int|None=None) -> List[date]:
offset = -d.weekday() # weekday == 0 means Monday
output = d + timedelta(offset)
return output.date()
if not year:
year = date.today().year
# Includes New Year's day for next year.
holidays = [date(year, 1, 1), date(year, 7,1), date(year, 9, 30),
holidays = [date(year, 1, 1), date(year, 7, 1), date(year, 9, 30),
date(year, 11, 11), date(year, 12, 25), date(year, 12, 26),
date(year+1, 1, 1)]
date(year + 1, 1, 1)]
# Labour Day
holidays.append(find_nth_monday(year, 9))
# Thanksgiving
@@ -1015,3 +1035,39 @@ def create_holidays_for_year(year: int|None=None) -> List[date]:
holidays.append(easter(year) - timedelta(days=2))
holidays.append(easter(year) + timedelta(days=1))
return sorted(holidays)
def timer(func):
"""
Performs timing of wrapped function
Args:
func (__function__): incoming function
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
logger.debug(f"Finished {func.__name__}() in {run_time:.4f} secs")
return value
return wrapper
ctx = get_config(None)
def register_script(func):
"""Register a function as a plug-in"""
if func.__name__ in ctx.startup_scripts.keys():
ctx.startup_scripts[func.__name__] = func
if func.__name__ in ctx.teardown_scripts.keys():
ctx.teardown_scripts[func.__name__] = func
return func
ctx.set_scripts()