Code cleanup, dependency update, various bug fixes

This commit is contained in:
lwark
2024-05-24 13:02:46 -05:00
parent 2814be8980
commit e047d1a9ee
24 changed files with 403 additions and 561 deletions

View File

@@ -40,8 +40,9 @@ main_aux_dir = Path.home().joinpath(f"{os_config_dir}/submissions")
CONFIGDIR = main_aux_dir.joinpath("config")
LOGDIR = main_aux_dir.joinpath("logs")
row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"}
row_keys = {v:k for k,v in row_map.items()}
row_map = {1: "A", 2: "B", 3: "C", 4: "D", 5: "E", 6: "F", 7: "G", 8: "H"}
row_keys = {v: k for k, v in row_map.items()}
def check_not_nan(cell_contents) -> bool:
"""
@@ -52,7 +53,7 @@ def check_not_nan(cell_contents) -> bool:
Returns:
bool: True if cell has value, else, false.
"""
"""
# check for nan as a string first
exclude = ['unnamed:', 'blank', 'void']
try:
@@ -88,7 +89,8 @@ def check_not_nan(cell_contents) -> bool:
logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
return False
def convert_nans_to_nones(input_str) -> str|None:
def convert_nans_to_nones(input_str) -> str | None:
"""
Get rid of various "nan", "NAN", "NaN", etc/
@@ -97,19 +99,21 @@ def convert_nans_to_nones(input_str) -> str|None:
Returns:
str: _description_
"""
"""
# logger.debug(f"Input value of: {input_str}")
if check_not_nan(input_str):
return input_str
return None
def is_missing(value:Any) -> Tuple[Any, bool]:
def is_missing(value: Any) -> Tuple[Any, bool]:
if check_not_nan(value):
return value, False
else:
return convert_nans_to_nones(value), True
def check_regex_match(pattern:str, check:str) -> bool:
def check_regex_match(pattern: str, check: str) -> bool:
"""
Determines if a pattern matches a str
@@ -119,13 +123,14 @@ def check_regex_match(pattern:str, check:str) -> bool:
Returns:
bool: match found?
"""
"""
try:
return bool(re.match(fr"{pattern}", check))
except TypeError:
return False
def get_first_blank_df_row(df:pd.DataFrame) -> int:
def get_first_blank_df_row(df: pd.DataFrame) -> int:
"""
For some reason I need a whole function for this.
@@ -134,9 +139,10 @@ def get_first_blank_df_row(df:pd.DataFrame) -> int:
Returns:
int: Index of the row after the last used row.
"""
"""
return df.shape[0] + 1
# Settings
class Settings(BaseSettings, extra="allow"):
@@ -146,16 +152,16 @@ class Settings(BaseSettings, extra="allow"):
Raises:
FileNotFoundError: Error if database not found.
"""
"""
directory_path: Path
database_path: Path|str|None = None
backup_path: Path|str|None = None
database_path: Path | str | None = None
backup_path: Path | str | None = None
# super_users: list|None = None
# power_users: list|None = None
# rerun_regex: str
submission_types: dict|None = None
database_session: Session|None = None
package: Any|None = None
submission_types: dict | None = None
database_session: Session | None = None
package: Any | None = None
model_config = SettingsConfigDict(env_file_encoding='utf-8')
@@ -178,10 +184,10 @@ class Settings(BaseSettings, extra="allow"):
if isinstance(value, str):
value = Path(value)
if not value.exists():
value = Path().home()
# metadata.directory_path = value
value = Path().home()
# metadata.directory_path = value
return value
@field_validator('database_path', mode="before")
@classmethod
def ensure_database_exists(cls, value, values):
@@ -196,7 +202,7 @@ class Settings(BaseSettings, extra="allow"):
return value
else:
raise FileNotFoundError(f"Couldn't find database at {value}")
@field_validator('database_session', mode="before")
@classmethod
def create_database_session(cls, value, values):
@@ -223,7 +229,7 @@ class Settings(BaseSettings, extra="allow"):
else:
raise FileNotFoundError("No database file found. Exiting program.")
logger.debug(f"Using {database_path} for database file.")
engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True)
engine = create_engine(f"sqlite:///{database_path}") #, echo=True, future=True)
session = Session(engine)
# metadata.session = session
return session
@@ -240,19 +246,20 @@ class Settings(BaseSettings, extra="allow"):
super().__init__(*args, **kwargs)
self.set_from_db(db_path=kwargs['database_path'])
def set_from_db(self, db_path:Path):
def set_from_db(self, db_path: Path):
if 'pytest' in sys.modules:
config_items = dict(power_users=['lwark', 'styson', 'ruwang'])
else:
session = Session(create_engine(f"sqlite:///{db_path}"))
config_items = session.execute(text("SELECT * FROM _configitem")).all()
session.close()
config_items = {item[1]:json.loads(item[2]) for item in config_items}
config_items = {item[1]: json.loads(item[2]) for item in config_items}
for k, v in config_items.items():
if not hasattr(self, k):
self.__setattr__(k, v)
def get_config(settings_path: Path|str|None=None) -> Settings:
def get_config(settings_path: Path | str | None = None) -> Settings:
"""
Get configuration settings from path or default if blank.
@@ -262,36 +269,38 @@ def get_config(settings_path: Path|str|None=None) -> Settings:
Returns:
Settings: Pydantic settings object
"""
logger.debug(f"Creating settings...")
"""
# logger.debug(f"Creating settings...")
if isinstance(settings_path, str):
settings_path = Path(settings_path)
# custom pyyaml constructor to join fields
def join(loader, node):
seq = loader.construct_sequence(node)
return ''.join([str(i) for i in seq])
# register the tag handler
yaml.add_constructor('!join', join)
logger.debug(f"Making directory: {CONFIGDIR.__str__()}")
# make directories
try:
CONFIGDIR.mkdir(parents=True)
except FileExistsError:
pass
logger.debug(f"Making directory: {LOGDIR.__str__()}")
logger.warning(f"Config directory {CONFIGDIR} already exists.")
try:
LOGDIR.mkdir(parents=True)
except FileExistsError:
pass
# if user hasn't defined config path in cli args
logger.warning(f"Logging directory {LOGDIR} already exists.")
# NOTE: if user hasn't defined config path in cli args
if settings_path == None:
# Check user .config/submissions directory
# NOTE: Check user .config/submissions directory
if CONFIGDIR.joinpath("config.yml").exists():
settings_path = CONFIGDIR.joinpath("config.yml")
# Check user .submissions directory
# NOTE: Check user .submissions directory
elif Path.home().joinpath(".submissions", "config.yml").exists():
settings_path = Path.home().joinpath(".submissions", "config.yml")
# finally look in the local config
# NOTE: finally look in the local config
else:
if check_if_app():
settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml")
@@ -299,14 +308,14 @@ def get_config(settings_path: Path|str|None=None) -> Settings:
settings_path = package_dir.joinpath('config.yml')
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
# Tell program we need to copy the config.yml to the user directory
# copy settings to config directory
# NOTE: Tell program we need to copy the config.yml to the user directory
# NOTE: copy settings to config directory
return Settings(**copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=default_settings))
else:
# check if user defined path is directory
# NOTE: check if user defined path is directory
if settings_path.is_dir():
settings_path = settings_path.joinpath("config.yml")
# check if user defined path is file
# NOTE: check if user defined path is file
elif settings_path.is_file():
settings_path = settings_path
else:
@@ -314,11 +323,12 @@ def get_config(settings_path: Path|str|None=None) -> Settings:
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
return Settings(**copy_settings(settings_path=settings_path, settings=default_settings))
logger.debug(f"Using {settings_path} for config file.")
# logger.debug(f"Using {settings_path} for config file.")
with open(settings_path, "r") as stream:
settings = yaml.load(stream, Loader=yaml.Loader)
return Settings(**settings)
# Logging formatters
class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
@@ -334,13 +344,13 @@ class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
os.chmod(self.baseFilename, currMode | stat.S_IWGRP)
def _open(self):
prevumask=os.umask(0o002)
rtv=handlers.RotatingFileHandler._open(self)
prevumask = os.umask(0o002)
rtv = handlers.RotatingFileHandler._open(self)
os.umask(prevumask)
return rtv
class CustomFormatter(logging.Formatter):
class CustomFormatter(logging.Formatter):
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
@@ -367,6 +377,7 @@ class CustomFormatter(logging.Formatter):
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
class StreamToLogger(object):
"""
Fake file-like stream object that redirects writes to a logger instance.
@@ -381,31 +392,33 @@ class StreamToLogger(object):
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
def setup_logger(verbosity:int=3):
def setup_logger(verbosity: int = 3):
"""
Set logger levels using settings.
Args:
verbosit (int, optional): Level of verbosity desired 3 is highest. Defaults to 3.
verbosity (int, optional): Level of verbosity desired 3 is highest. Defaults to 3.
Returns:
logger: logger object
"""
logger = logging.getLogger("submissions")
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
# NOTE: create file handler which logs even debug messages
try:
Path(LOGDIR).mkdir(parents=True)
except FileExistsError:
pass
logger.warning(f"Logging directory {LOGDIR} already exists.")
# NOTE: logging to file turned off due to repeated permission errors
# fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False)
# file logging will always be debug
# fh.setLevel(logging.DEBUG)
# fh.name = "File"
# create console handler with a higher log level
# create custom logger with STERR -> log
# NOTE: create console handler with a higher log level
# NOTE: create custom logger with STERR -> log
ch = logging.StreamHandler(stream=sys.stdout)
# set looging level based on verbosity
# NOTE: set looging level based on verbosity
match verbosity:
case 3:
ch.setLevel(logging.DEBUG)
@@ -414,24 +427,26 @@ def setup_logger(verbosity:int=3):
case 1:
ch.setLevel(logging.WARNING)
ch.name = "Stream"
# create formatter and add it to the handlers
# formatter = logging.Formatter('%(asctime)s - %(levelname)s - {%(pathname)s:%(lineno)d} - %(message)s')
# NOTE: create formatter and add it to the handlers
formatter = CustomFormatter()
# fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
# NOTE: add the handlers to the logger
# logger.addHandler(fh)
logger.addHandler(ch)
# Output exception and traceback to logger
# NOTE: Output exception and traceback to logger
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
return logger
def copy_settings(settings_path:Path, settings:dict) -> dict:
def copy_settings(settings_path: Path, settings: dict) -> dict:
"""
copies relevant settings dictionary from the default config.yml to a new directory
@@ -441,8 +456,8 @@ def copy_settings(settings_path:Path, settings:dict) -> dict:
Returns:
dict: output dictionary for use in first run
"""
# if the current user is not a superuser remove the superusers entry
"""
# NOTE: if the current user is not a superuser remove the superusers entry
if not getpass.getuser() in settings['super_users']:
del settings['super_users']
if not getpass.getuser() in settings['power_users']:
@@ -452,40 +467,40 @@ def copy_settings(settings_path:Path, settings:dict) -> dict:
yaml.dump(settings, f)
return settings
def jinja_template_loading() -> Environment:
"""
Returns jinja2 template environment.
Returns:
_type_: _description_
"""
# determine if pyinstaller launcher is being used
"""
# NOTE: determine if pyinstaller launcher is being used
if check_if_app():
loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
else:
loader_path = Path(__file__).parent.joinpath('templates').absolute()#.__str__()
# jinja template loading
loader_path = Path(__file__).parent.joinpath('templates').absolute() #.__str__()
# NOTE: jinja template loading
loader = FileSystemLoader(loader_path)
env = Environment(loader=loader)
env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css")
return env
def check_if_app() -> bool:
"""
Checks if the program is running from pyinstaller compiled
Args:
ctx (dict, optional): Settings passed down from gui. Defaults to None.
Returns:
bool: True if running from pyinstaller. Else False.
"""
"""
if getattr(sys, 'frozen', False):
return True
else:
return False
def convert_well_to_row_column(input_str:str) -> Tuple[int, int]:
def convert_well_to_row_column(input_str: str) -> Tuple[int, int]:
"""
Converts typical alphanumeric (i.e. "A2") to row, column
@@ -494,22 +509,24 @@ def convert_well_to_row_column(input_str:str) -> Tuple[int, int]:
Returns:
Tuple[int, int]: row, column
"""
row_keys = {v:k for k,v in row_map.items()}
"""
row_keys = {v: k for k, v in row_map.items()}
try:
row = int(row_keys[input_str[0].upper()])
column = int(input_str[1:])
column = int(input_str[1:])
except IndexError:
return None, None
return row, column
def setup_lookup(func):
"""
Checks to make sure all args are allowed
Args:
func (_type_): _description_
"""
func (_type_): wrapped function
"""
def wrapper(*args, **kwargs):
sanitized_kwargs = {}
for k, v in locals()['kwargs'].items():
@@ -521,20 +538,23 @@ def setup_lookup(func):
elif v is not None:
sanitized_kwargs[k] = v
return func(*args, **sanitized_kwargs)
return wrapper
class Result(BaseModel):
class Result(BaseModel):
owner: str = Field(default="", validate_default=True)
code: int = Field(default=0)
msg: str
status: Literal["NoIcon", "Question", "Information", "Warning", "Critical"] = Field(default="NoIcon")
@field_validator('status', mode='before')
@classmethod
def to_title(cls, value:str):
return value.title()
def to_title(cls, value: str):
if value.lower().replace(" ", "") == "noicon":
return "NoIcon"
else:
return value.title()
def __repr__(self) -> str:
return f"Result({self.owner})"
@@ -546,15 +566,15 @@ class Result(BaseModel):
def report(self):
from frontend.widgets.misc import AlertPop
return AlertPop(message=self.msg, status=self.status, owner=self.owner)
class Report(BaseModel):
class Report(BaseModel):
results: List[Result] = Field(default=[])
def __repr__(self):
return f"Report(result_count:{len(self.results)})"
def add_result(self, result:Result|Report|None):
def add_result(self, result: Result | Report | None):
match result:
case Result():
logger.debug(f"Adding {result} to results.")
@@ -568,36 +588,41 @@ class Report(BaseModel):
logger.debug(f"Adding {res} from to results.")
self.results.append(res)
case _:
pass
def rreplace(s, old, new):
return (s[::-1].replace(old[::-1],new[::-1], 1))[::-1]
logger.error(f"Unknown variable type: {type(result)}")
def html_to_pdf(html, output_file:Path|str):
def rreplace(s, old, new):
return (s[::-1].replace(old[::-1], new[::-1], 1))[::-1]
def html_to_pdf(html, output_file: Path | str):
if isinstance(output_file, str):
output_file = Path(output_file)
# document = QTextDocument()
document = QWebEngineView()
document.setHtml(html)
document.setHtml(html)
printer = QPrinter(QPrinter.PrinterMode.HighResolution)
printer.setOutputFormat(QPrinter.OutputFormat.PdfFormat)
printer.setOutputFileName(output_file.absolute().__str__())
printer.setPageSize(QPageSize(QPageSize.PageSizeId.A4))
document.print(printer)
def remove_key_from_list_of_dicts(input:list, key:str):
def remove_key_from_list_of_dicts(input: list, key: str):
for item in input:
del item[key]
return input
def workbook_2_csv(worksheet: Worksheet, filename:Path):
def workbook_2_csv(worksheet: Worksheet, filename: Path):
with open(filename, 'w', newline="") as f:
c = csv.writer(f)
for r in worksheet.rows:
c.writerow([cell.value for cell in r])
ctx = get_config(None)
def is_power_user() -> bool:
try:
check = getpass.getuser() in ctx.power_users
@@ -605,13 +630,14 @@ def is_power_user() -> bool:
check = False
return check
def check_authorization(func):
"""
Decorator to check if user is authorized to access function
Args:
func (_type_): Function to be used.
"""
"""
def wrapper(*args, **kwargs):
logger.debug(f"Checking authorization")
if is_power_user():