Pre-frontend refactor.
This commit is contained in:
516
src/submissions/tools.py
Normal file
516
src/submissions/tools.py
Normal file
@@ -0,0 +1,516 @@
|
||||
'''
|
||||
Contains miscellaenous functions used by both frontend and backend.
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
import re
|
||||
import numpy as np
|
||||
import logging
|
||||
import pandas as pd
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
import yaml
|
||||
import sys, os, stat, platform, getpass
|
||||
import logging
|
||||
from logging import handlers
|
||||
from pathlib import Path
|
||||
from sqlalchemy.orm import Session, declarative_base, DeclarativeMeta, Query
|
||||
from sqlalchemy import create_engine
|
||||
from pydantic import field_validator, BaseModel, Field
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from typing import Any, Tuple, Literal, List
|
||||
import inspect
|
||||
|
||||
logger = logging.getLogger(f"submissions.{__name__}")
|
||||
|
||||
package_dir = Path(__file__).parents[2].resolve()
|
||||
logger.debug(f"Package dir: {package_dir}")
|
||||
|
||||
if platform.system() == "Windows":
|
||||
os_config_dir = "AppData/local"
|
||||
print(f"Got platform Windows, config_dir: {os_config_dir}")
|
||||
else:
|
||||
os_config_dir = ".config"
|
||||
print(f"Got platform other, config_dir: {os_config_dir}")
|
||||
|
||||
main_aux_dir = Path.home().joinpath(f"{os_config_dir}/submissions")
|
||||
|
||||
CONFIGDIR = main_aux_dir.joinpath("config")
|
||||
LOGDIR = main_aux_dir.joinpath("logs")
|
||||
|
||||
row_map = {1:"A", 2:"B", 3:"C", 4:"D", 5:"E", 6:"F", 7:"G", 8:"H"}
|
||||
|
||||
Base: DeclarativeMeta = declarative_base()
|
||||
metadata = Base.metadata
|
||||
|
||||
def check_not_nan(cell_contents) -> bool:
|
||||
"""
|
||||
Check to ensure excel sheet cell contents are not blank.
|
||||
|
||||
Args:
|
||||
cell_contents (_type_): The contents of the cell in question.
|
||||
|
||||
Returns:
|
||||
bool: True if cell has value, else, false.
|
||||
"""
|
||||
# check for nan as a string first
|
||||
try:
|
||||
if "Unnamed:" in cell_contents or "blank" in cell_contents.lower():
|
||||
cell_contents = np.nan
|
||||
cell_contents = cell_contents.lower()
|
||||
except (TypeError, AttributeError):
|
||||
pass
|
||||
try:
|
||||
if np.isnat(cell_contents):
|
||||
cell_contents = np.nan
|
||||
except TypeError as e:
|
||||
pass
|
||||
if cell_contents == "nat":
|
||||
cell_contents = np.nan
|
||||
if cell_contents == 'nan':
|
||||
cell_contents = np.nan
|
||||
if cell_contents == None:
|
||||
cell_contents = np.nan
|
||||
if str(cell_contents).lower() == "none":
|
||||
cell_contents = np.nan
|
||||
try:
|
||||
if pd.isnull(cell_contents):
|
||||
cell_contents = np.nan
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
return not np.isnan(cell_contents)
|
||||
except TypeError:
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug(f"Check encountered unknown error: {type(e).__name__} - {e}")
|
||||
return False
|
||||
|
||||
def convert_nans_to_nones(input_str) -> str|None:
|
||||
"""
|
||||
Get rid of various "nan", "NAN", "NaN", etc/
|
||||
|
||||
Args:
|
||||
input_str (str): input string
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
# logger.debug(f"Input value of: {input_str}")
|
||||
if check_not_nan(input_str):
|
||||
return input_str
|
||||
return None
|
||||
|
||||
def check_regex_match(pattern:str, check:str) -> bool:
|
||||
try:
|
||||
return bool(re.match(fr"{pattern}", check))
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
def massage_common_reagents(reagent_name:str):
|
||||
logger.debug(f"Attempting to massage {reagent_name}")
|
||||
if reagent_name.endswith("water") or "H2O" in reagent_name.upper():
|
||||
reagent_name = "molecular_grade_water"
|
||||
reagent_name = reagent_name.replace("µ", "u")
|
||||
return reagent_name
|
||||
|
||||
class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
|
||||
|
||||
def doRollover(self):
|
||||
"""
|
||||
Override base class method to make the new log file group writable.
|
||||
"""
|
||||
# Rotate the file first.
|
||||
handlers.RotatingFileHandler.doRollover(self)
|
||||
# Add group write to the current permissions.
|
||||
currMode = os.stat(self.baseFilename).st_mode
|
||||
os.chmod(self.baseFilename, currMode | stat.S_IWGRP)
|
||||
|
||||
def _open(self):
|
||||
prevumask=os.umask(0o002)
|
||||
rtv=handlers.RotatingFileHandler._open(self)
|
||||
os.umask(prevumask)
|
||||
return rtv
|
||||
|
||||
class StreamToLogger(object):
|
||||
"""
|
||||
Fake file-like stream object that redirects writes to a logger instance.
|
||||
"""
|
||||
|
||||
def __init__(self, logger, log_level=logging.INFO):
|
||||
self.logger = logger
|
||||
self.log_level = log_level
|
||||
self.linebuf = ''
|
||||
|
||||
def write(self, buf):
|
||||
for line in buf.rstrip().splitlines():
|
||||
self.logger.log(self.log_level, line.rstrip())
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""
|
||||
Pydantic model to hold settings
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: _description_
|
||||
|
||||
"""
|
||||
directory_path: Path
|
||||
database_path: Path|str|None = None
|
||||
backup_path: Path
|
||||
super_users: list|None = None
|
||||
power_users: list|None = None
|
||||
rerun_regex: str
|
||||
submission_types: dict|None = None
|
||||
database_session: Session|None = None
|
||||
package: Any|None = None
|
||||
|
||||
model_config = SettingsConfigDict(env_file_encoding='utf-8')
|
||||
|
||||
@field_validator('backup_path')
|
||||
@classmethod
|
||||
def set_backup_path(cls, value):
|
||||
if isinstance(value, str):
|
||||
value = Path(value)
|
||||
metadata.backup_path = value
|
||||
return value
|
||||
|
||||
@field_validator('directory_path', mode="before")
|
||||
@classmethod
|
||||
def ensure_directory_exists(cls, value):
|
||||
if isinstance(value, str):
|
||||
value = Path(value)
|
||||
if not value.exists():
|
||||
value = Path().home()
|
||||
metadata.directory_path = value
|
||||
return value
|
||||
|
||||
@field_validator('database_path', mode="before")
|
||||
@classmethod
|
||||
def ensure_database_exists(cls, value):
|
||||
if value == ":memory:":
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
value = Path(value)
|
||||
if value.exists():
|
||||
return value
|
||||
else:
|
||||
raise FileNotFoundError(f"Couldn't find database at {value}")
|
||||
|
||||
@field_validator('database_session', mode="before")
|
||||
@classmethod
|
||||
def create_database_session(cls, value, values):
|
||||
if value != None:
|
||||
return value
|
||||
else:
|
||||
database_path = values.data['database_path']
|
||||
if database_path == None:
|
||||
# check in user's .submissions directory for submissions.db
|
||||
if Path.home().joinpath(".submissions", "submissions.db").exists():
|
||||
database_path = Path.home().joinpath(".submissions", "submissions.db")
|
||||
# finally, look in the local dir
|
||||
else:
|
||||
database_path = package_dir.joinpath("submissions.db")
|
||||
else:
|
||||
if database_path == ":memory:":
|
||||
pass
|
||||
# check if user defined path is directory
|
||||
elif database_path.is_dir():
|
||||
database_path = database_path.joinpath("submissions.db")
|
||||
# check if user defined path is a file
|
||||
elif database_path.is_file():
|
||||
database_path = database_path
|
||||
else:
|
||||
raise FileNotFoundError("No database file found. Exiting program.")
|
||||
logger.debug(f"Using {database_path} for database file.")
|
||||
engine = create_engine(f"sqlite:///{database_path}")#, echo=True, future=True)
|
||||
session = Session(engine)
|
||||
metadata.session = session
|
||||
|
||||
return session
|
||||
|
||||
@field_validator('package', mode="before")
|
||||
@classmethod
|
||||
def import_package(cls, value):
|
||||
import __init__ as package
|
||||
if value == None:
|
||||
return package
|
||||
|
||||
def get_config(settings_path: Path|str|None=None) -> Settings:
|
||||
"""
|
||||
Get configuration settings from path or default if blank.
|
||||
|
||||
Args:
|
||||
settings_path (Path | str | None, optional): Path to config.yml Defaults to None.
|
||||
|
||||
Returns:
|
||||
Settings: Pydantic settings object
|
||||
"""
|
||||
logger.debug(f"Creating settings...")
|
||||
if isinstance(settings_path, str):
|
||||
settings_path = Path(settings_path)
|
||||
# custom pyyaml constructor to join fields
|
||||
def join(loader, node):
|
||||
seq = loader.construct_sequence(node)
|
||||
return ''.join([str(i) for i in seq])
|
||||
# register the tag handler
|
||||
yaml.add_constructor('!join', join)
|
||||
logger.debug(f"Making directory: {CONFIGDIR.__str__()}")
|
||||
# make directories
|
||||
try:
|
||||
CONFIGDIR.mkdir(parents=True)
|
||||
except FileExistsError:
|
||||
pass
|
||||
logger.debug(f"Making directory: {LOGDIR.__str__()}")
|
||||
try:
|
||||
LOGDIR.mkdir(parents=True)
|
||||
except FileExistsError:
|
||||
pass
|
||||
# if user hasn't defined config path in cli args
|
||||
if settings_path == None:
|
||||
# Check user .config/submissions directory
|
||||
if CONFIGDIR.joinpath("config.yml").exists():
|
||||
settings_path = CONFIGDIR.joinpath("config.yml")
|
||||
# Check user .submissions directory
|
||||
elif Path.home().joinpath(".submissions", "config.yml").exists():
|
||||
settings_path = Path.home().joinpath(".submissions", "config.yml")
|
||||
# finally look in the local config
|
||||
else:
|
||||
# if getattr(sys, 'frozen', False):
|
||||
if check_if_app():
|
||||
settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml")
|
||||
else:
|
||||
settings_path = package_dir.joinpath('config.yml')
|
||||
with open(settings_path, "r") as dset:
|
||||
default_settings = yaml.load(dset, Loader=yaml.Loader)
|
||||
# Tell program we need to copy the config.yml to the user directory
|
||||
# copy_settings_trigger = True
|
||||
# copy settings to config directory
|
||||
return Settings(**copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=default_settings))
|
||||
else:
|
||||
# check if user defined path is directory
|
||||
if settings_path.is_dir():
|
||||
settings_path = settings_path.joinpath("config.yml")
|
||||
# check if user defined path is file
|
||||
elif settings_path.is_file():
|
||||
settings_path = settings_path
|
||||
else:
|
||||
logger.error("No config.yml file found. Writing to directory.")
|
||||
with open(settings_path, "r") as dset:
|
||||
default_settings = yaml.load(dset, Loader=yaml.Loader)
|
||||
return Settings(**copy_settings(settings_path=settings_path, settings=default_settings))
|
||||
logger.debug(f"Using {settings_path} for config file.")
|
||||
with open(settings_path, "r") as stream:
|
||||
settings = yaml.load(stream, Loader=yaml.Loader)
|
||||
return Settings(**settings)
|
||||
|
||||
def setup_logger(verbosity:int=3):
|
||||
"""
|
||||
Set logger levels using settings.
|
||||
|
||||
Args:
|
||||
verbosit (int, optional): Level of verbosity desired 3 is highest. Defaults to 3.
|
||||
|
||||
Returns:
|
||||
logger: logger object
|
||||
"""
|
||||
logger = logging.getLogger("submissions")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
# create file handler which logs even debug messages
|
||||
try:
|
||||
Path(LOGDIR).mkdir(parents=True)
|
||||
except FileExistsError:
|
||||
pass
|
||||
fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False)
|
||||
# file logging will always be debug
|
||||
fh.setLevel(logging.DEBUG)
|
||||
fh.name = "File"
|
||||
# create console handler with a higher log level
|
||||
# create custom logger with STERR -> log
|
||||
ch = logging.StreamHandler(stream=sys.stdout)
|
||||
# set looging level based on verbosity
|
||||
match verbosity:
|
||||
case 3:
|
||||
ch.setLevel(logging.DEBUG)
|
||||
case 2:
|
||||
ch.setLevel(logging.INFO)
|
||||
case 1:
|
||||
ch.setLevel(logging.WARNING)
|
||||
ch.name = "Stream"
|
||||
# create formatter and add it to the handlers
|
||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - {%(pathname)s:%(lineno)d} - %(message)s')
|
||||
fh.setFormatter(formatter)
|
||||
ch.setFormatter(formatter)
|
||||
# add the handlers to the logger
|
||||
logger.addHandler(fh)
|
||||
logger.addHandler(ch)
|
||||
# Output exception and traceback to logger
|
||||
def handle_exception(exc_type, exc_value, exc_traceback):
|
||||
if issubclass(exc_type, KeyboardInterrupt):
|
||||
sys.__excepthook__(exc_type, exc_value, exc_traceback)
|
||||
return
|
||||
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
|
||||
sys.excepthook = handle_exception
|
||||
return logger
|
||||
|
||||
def copy_settings(settings_path:Path, settings:dict) -> dict:
|
||||
"""
|
||||
copies relevant settings dictionary from the default config.yml to a new directory
|
||||
|
||||
Args:
|
||||
settings_path (Path): path to write the file to
|
||||
settings (dict): settings dictionary obtained from default config.yml
|
||||
|
||||
Returns:
|
||||
dict: output dictionary for use in first run
|
||||
"""
|
||||
# if the current user is not a superuser remove the superusers entry
|
||||
if not getpass.getuser() in settings['super_users']:
|
||||
del settings['super_users']
|
||||
if not getpass.getuser() in settings['power_users']:
|
||||
del settings['power_users']
|
||||
if not settings_path.exists():
|
||||
with open(settings_path, 'w') as f:
|
||||
yaml.dump(settings, f)
|
||||
return settings
|
||||
|
||||
def jinja_template_loading():
|
||||
"""
|
||||
Returns jinja2 template environment.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
"""
|
||||
# determine if pyinstaller launcher is being used
|
||||
if check_if_app():
|
||||
loader_path = Path(sys._MEIPASS).joinpath("files", "templates")
|
||||
else:
|
||||
loader_path = Path(__file__).parent.joinpath('templates').absolute()#.__str__()
|
||||
# jinja template loading
|
||||
loader = FileSystemLoader(loader_path)
|
||||
env = Environment(loader=loader)
|
||||
env.globals['STATIC_PREFIX'] = loader_path.joinpath("static", "css")
|
||||
return env
|
||||
|
||||
def check_authorization(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
logger.debug(f"Checking authorization")
|
||||
if getpass.getuser() in kwargs['ctx'].power_users:
|
||||
return func(*args, **kwargs)
|
||||
else:
|
||||
logger.error(f"User {getpass.getuser()} is not authorized for this function.")
|
||||
return dict(code=1, message="This user does not have permission for this function.", status="warning")
|
||||
return wrapper
|
||||
|
||||
def check_if_app(ctx:Settings=None) -> bool:
|
||||
"""
|
||||
Checks if the program is running from pyinstaller compiled
|
||||
|
||||
Args:
|
||||
ctx (dict, optional): Settings passed down from gui. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bool: True if running from pyinstaller. Else False.
|
||||
"""
|
||||
if getattr(sys, 'frozen', False):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def convert_well_to_row_column(input_str:str) -> Tuple[int, int]:
|
||||
"""
|
||||
Converts typical alphanumeric (i.e. "A2") to row, column
|
||||
|
||||
Args:
|
||||
input_str (str): Input string. Ex. "A2"
|
||||
|
||||
Returns:
|
||||
Tuple[int, int]: row, column
|
||||
"""
|
||||
row_keys = dict(A=1, B=2, C=3, D=4, E=5, F=6, G=7, H=8)
|
||||
try:
|
||||
row = int(row_keys[input_str[0].upper()])
|
||||
column = int(input_str[1:])
|
||||
except IndexError:
|
||||
return None, None
|
||||
return row, column
|
||||
|
||||
def query_return(query:Query, limit:int=0):
|
||||
"""
|
||||
Execute sqlalchemy query.
|
||||
|
||||
Args:
|
||||
query (Query): Query object
|
||||
limit (int, optional): Maximum number of results to return (0 = all). Defaults to 0.
|
||||
|
||||
Returns:
|
||||
_type_: Query result.
|
||||
"""
|
||||
with query.session.no_autoflush:
|
||||
match limit:
|
||||
case 0:
|
||||
return query.all()
|
||||
case 1:
|
||||
return query.first()
|
||||
case _:
|
||||
return query.limit(limit).all()
|
||||
|
||||
def setup_lookup(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
for k, v in locals().items():
|
||||
if k == "kwargs":
|
||||
continue
|
||||
if isinstance(v, dict):
|
||||
raise ValueError("Cannot use dictionary in query. Make sure you parse it first.")
|
||||
return func(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
class Result(BaseModel):
|
||||
|
||||
owner: str = Field(default="", validate_default=True)
|
||||
code: int = Field(default=0)
|
||||
msg: str
|
||||
status: Literal["NoIcon", "Question", "Information", "Warning", "Critical"] = Field(default="NoIcon")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Result({self.owner})"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.owner = inspect.stack()[1].function
|
||||
|
||||
def report(self):
|
||||
from frontend.custom_widgets.misc import AlertPop
|
||||
return AlertPop(message=self.msg, status=self.status, owner=self.owner)
|
||||
|
||||
class Report(BaseModel):
|
||||
|
||||
results: List[Result] = Field(default=[])
|
||||
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# if 'msg' in kwargs.keys():
|
||||
# res = Result(msg=kwargs['msg'])
|
||||
# for k,v in kwargs.items():
|
||||
# if k in ['code', 'status']:
|
||||
# setattr(res, k, v)
|
||||
# self.results.append(res)
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return f"Report(result_count:{len(self.results)})"
|
||||
|
||||
def add_result(self, result:Result|Report|None):
|
||||
match result:
|
||||
case Result():
|
||||
logger.debug(f"Adding {result} to results.")
|
||||
try:
|
||||
self.results.append(result)
|
||||
except AttributeError:
|
||||
logger.error(f"Problem adding result.")
|
||||
case Report():
|
||||
|
||||
for res in result.results:
|
||||
logger.debug(f"Adding {res} from to results.")
|
||||
self.results.append(res)
|
||||
case _:
|
||||
pass
|
||||
|
||||
|
||||
Reference in New Issue
Block a user