From f215972b9dd45f951356fbb9e751820320824c0f Mon Sep 17 00:00:00 2001 From: lwark Date: Thu, 27 Mar 2025 08:50:12 -0500 Subject: [PATCH] New inclusive tools.Settings class. --- CHANGELOG.md | 1 + src/submissions/tools/__init__.py | 1106 +++++++++++++++++++---------- 2 files changed, 742 insertions(+), 365 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dae9c4..ef4b07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # 202503.05 +- Shuttered tools.get_config and moved to new inclusive Settings class. - Added concentrations chart tab. - Saving report xlsx/pdf now inserts report class name in file name. diff --git a/src/submissions/tools/__init__.py b/src/submissions/tools/__init__.py index 26972f6..22e7df1 100644 --- a/src/submissions/tools/__init__.py +++ b/src/submissions/tools/__init__.py @@ -177,7 +177,7 @@ def check_not_nan(cell_contents) -> bool: return False -def convert_nans_to_nones(input_str:str) -> str | None: +def convert_nans_to_nones(input_str: str) -> str | None: """ Get rid of various "nan", "NAN", "NaN", etc/ @@ -261,364 +261,385 @@ def timer(func): # Settings -class Settings(BaseSettings, extra="allow"): - """ - Pydantic model to hold settings - - Raises: - FileNotFoundError: Error if database not found. - - """ - database_schema: str | None = None - directory_path: Path | None = None - database_user: str | None = None - database_password: str | None = None - database_name: str | None = None - database_path: Path | str | None = None - backup_path: Path | str | None = None - submission_types: dict | None = None - database_session: Session | None = None - package: Any | None = None - logging_enabled: bool = Field(default=False) - - model_config = SettingsConfigDict(env_file_encoding='utf-8') - - @field_validator('database_schema', mode="before") - @classmethod - def set_schema(cls, value): - if value is None: - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='schema') - if value is None: - value = "sqlite" - return value - - @field_validator('backup_path', mode="before") - @classmethod - def set_backup_path(cls, value, values): - match value: - case str(): - value = Path(value) - case None: - value = values.data['directory_path'].joinpath("Database backups") - if not value.exists(): - try: - value.mkdir(parents=True) - except OSError: - value = Path(askdirectory(title="Directory for backups.")) - return value - - @field_validator('directory_path', mode="before") - @classmethod - def ensure_directory_exists(cls, value, values): - if value is None: - match values.data['database_schema']: - case "sqlite": - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent - case _: - Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing - value = Path(askdirectory( - title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file - if isinstance(value, str): - value = Path(value) - try: - check = value.exists() - except AttributeError: - check = False - if not check: - value.mkdir(exist_ok=True) - return value - - @field_validator('database_path', mode="before") - @classmethod - def ensure_database_exists(cls, value, values): - match values.data['database_schema']: - case "sqlite": - if value is None: - value = values.data['directory_path'] - if isinstance(value, str): - value = Path(value) - case _: - if value is None: - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent - return value - - @field_validator('database_name', mode='before') - @classmethod - def get_database_name(cls, value): - if value is None: - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').stem - return value - - @field_validator("database_user", mode='before') - @classmethod - def get_user(cls, value): - if value is None: - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='user') - return value - - @field_validator("database_password", mode='before') - @classmethod - def get_pass(cls, value): - if value is None: - if check_if_app(): - alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") - else: - alembic_path = project_path.joinpath("alembic.ini") - value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='pass') - return value - - @field_validator('database_session', mode="before") - @classmethod - def create_database_session(cls, value, values): - if value is not None: - return value - else: - match values.data['database_schema']: - case "sqlite": - value = f"/{values.data['database_path']}" - db_name = f"{values.data['database_name']}.db" - template = jinja_template_loading().from_string( - "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}") - case "mssql+pyodbc": - value = values.data['database_path'] - db_name = values.data['database_name'] - template = jinja_template_loading().from_string( - "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Trusted_Connection=yes" - ) - case _: - tmp = jinja_template_loading().from_string( - "{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}@{{ values['database_path'] }}") - value = tmp.render(values=values.data) - db_name = values.data['database_name'] - database_path = template.render(values=values.data, value=value, db_name=db_name) - print(f"Using {database_path} for database path") - engine = create_engine(database_path) - session = Session(engine) - return session - - @field_validator('package', mode="before") - @classmethod - def import_package(cls, value): - import __init__ as package - if value is None: - return package - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.set_from_db() - self.set_scripts() - - def set_from_db(self): - if 'pytest' in sys.modules: - output = dict(power_users=['lwark', 'styson', 'ruwang'], - startup_scripts=dict(hello=None), - teardown_scripts=dict(goodbye=None) - ) - else: - session = self.database_session - metadata = MetaData() - try: - metadata.reflect(bind=session.get_bind()) - except AttributeError as e: - print(f"Error getting tables: {e}") - return - if "_configitem" not in metadata.tables.keys(): - print(f"Couldn't find _configitems in {metadata.tables.keys()}.") - return - config_items = session.execute(text("SELECT * FROM _configitem")).all() - output = {} - for item in config_items: - try: - output[item[1]] = json.loads(item[2]) - except (JSONDecodeError, TypeError): - output[item[1]] = item[2] - for k, v in output.items(): - if not hasattr(self, k): - self.__setattr__(k, v) - - def set_scripts(self): - """ - Imports all functions from "scripts" folder, adding them to ctx scripts - """ - if check_if_app(): - p = Path(sys._MEIPASS).joinpath("files", "scripts") - else: - p = Path(__file__).parents[2].joinpath("scripts").absolute() - if p.__str__() not in sys.path: - sys.path.append(p.__str__()) - # NOTE: Get all .py files that don't have __ in them. - modules = p.glob("[!__]*.py") - for module in modules: - mod = importlib.import_module(module.stem) - for function in getmembers(mod, isfunction): - name = function[0] - func = function[1] - # NOTE: assign function based on its name being in config: startup/teardown - # NOTE: scripts must be registered using {name: Null} in the database - if name in self.startup_scripts.keys(): - self.startup_scripts[name] = func - if name in self.teardown_scripts.keys(): - self.teardown_scripts[name] = func - - @timer - def run_startup(self): - """ - Runs startup scripts. - """ - for script in self.startup_scripts.values(): - try: - logger.info(f"Running startup script: {script.__name__}") - thread = Thread(target=script, args=(ctx,)) - thread.start() - except AttributeError: - logger.error(f"Couldn't run startup script: {script}") - - @timer - def run_teardown(self): - """ - Runs teardown scripts. - """ - for script in self.teardown_scripts.values(): - try: - logger.info(f"Running teardown script: {script.__name__}") - thread = Thread(target=script, args=(ctx,)) - thread.start() - except AttributeError: - logger.error(f"Couldn't run teardown script: {script}") - - @classmethod - def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str: - c = ConfigParser() - c.read(alembic_path) - url = c['alembic']['sqlalchemy.url'] - match mode: - case 'path': - path = re.sub(r"^.*//", "", url) - path = re.sub(r"^.*@", "", path) - return Path(path) - case "schema": - return url[:url.index(":")] - case "user": - url = re.sub(r"^.*//", "", url) - try: - return url[:url.index("@")].split(":")[0] - except (IndexError, ValueError) as e: - return None - case "pass": - url = re.sub(r"^.*//", "", url) - try: - return url[:url.index("@")].split(":")[1] - except (IndexError, ValueError) as e: - return None - - def save(self, settings_path: Path): - if not settings_path.exists(): - dicto = {} - for k, v in self.__dict__.items(): - if k in ['package', 'database_session', 'submission_types']: - continue - match v: - case Path(): - if v.is_dir(): - v = v.absolute().__str__() - elif v.is_file(): - v = v.parent.absolute().__str__() - else: - v = v.__str__() - case _: - pass - dicto[k] = v - with open(settings_path, 'w') as f: - yaml.dump(dicto, f) - - -def get_config(settings_path: Path | str | None = None) -> Settings: - """ - Get configuration settings from path or default if blank. - - Args: - settings_path (Path | str | None, optional): Path to config.yml Defaults to None. - override (dict | None, optional): dictionary of settings to be used instead of file. Defaults to None. - - Returns: - Settings: Pydantic settings object - """ - if isinstance(settings_path, str): - settings_path = Path(settings_path) - - # NOTE: custom pyyaml constructor to join fields - def join(loader, node): - seq = loader.construct_sequence(node) - return ''.join([str(i) for i in seq]) - # NOTE: register the tag handler - yaml.add_constructor('!join', join) - # NOTE: make directories - try: - CONFIGDIR.mkdir(parents=True) - except FileExistsError: - logger.warning(f"Config directory {CONFIGDIR} already exists.") - try: - LOGDIR.mkdir(parents=True) - except FileExistsError: - logger.warning(f"Logging directory {LOGDIR} already exists.") - # NOTE: if user hasn't defined config path in cli args - if settings_path is None: - # NOTE: Check user .config/submissions directory - if CONFIGDIR.joinpath("config.yml").exists(): - settings_path = CONFIGDIR.joinpath("config.yml") - # NOTE: Check user .submissions directory - elif Path.home().joinpath(".submissions", "config.yml").exists(): - settings_path = Path.home().joinpath(".submissions", "config.yml") - # NOTE: finally look in the local config - else: - if check_if_app(): - settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml") - else: - settings_path = project_path.joinpath('src', 'config.yml') - with open(settings_path, "r") as dset: - default_settings = yaml.load(dset, Loader=yaml.Loader) - # NOTE: Tell program we need to copy the config.yml to the user directory - # NOTE: copy settings to config directory - settings = Settings(**default_settings) - settings.save(settings_path=CONFIGDIR.joinpath("config.yml")) - return settings - else: - # NOTE: check if user defined path is directory - if settings_path.is_dir(): - settings_path = settings_path.joinpath("config.yml") - # NOTE: check if user defined path is file - elif settings_path.is_file(): - settings_path = settings_path - else: - logger.error("No config.yml file found. Writing to directory.") - with open(settings_path, "r") as dset: - default_settings = yaml.load(dset, Loader=yaml.Loader) - settings = Settings(**default_settings) - settings.save(settings_path=settings_path) - with open(settings_path, "r") as stream: - settings = yaml.load(stream, Loader=yaml.Loader) - return Settings(**settings) - +# class Settings(BaseSettings, extra="allow"): +# """ +# Pydantic model to hold settings +# +# Raises: +# FileNotFoundError: Error if database not found. +# +# """ +# database_schema: str | None = None +# directory_path: Path | None = None +# database_user: str | None = None +# database_password: str | None = None +# database_name: str | None = None +# database_path: Path | str | None = None +# backup_path: Path | str | None = None +# submission_types: dict | None = None +# database_session: Session | None = None +# package: Any | None = None +# logging_enabled: bool = Field(default=False) +# +# model_config = SettingsConfigDict(env_file_encoding='utf-8') +# +# # model_config = SettingsConfigDict(yaml_file="C:\\Users\lwark\AppData\Local\submissions\config\config.yml", +# # yaml_file_encoding='utf-8') +# +# # @classmethod +# # def settings_customise_sources( +# # cls, +# # settings_cls: type[BaseSettings], +# # init_settings: PydanticBaseSettingsSource, +# # env_settings: PydanticBaseSettingsSource, +# # dotenv_settings: PydanticBaseSettingsSource, +# # file_secret_settings: PydanticBaseSettingsSource, +# # ) -> tuple[PydanticBaseSettingsSource, ...]: +# # return ( +# # YamlConfigSettingsSource(settings_cls), +# # init_settings, +# # env_settings, +# # dotenv_settings, +# # file_secret_settings, +# # ) +# +# @field_validator('database_schema', mode="before") +# @classmethod +# def set_schema(cls, value): +# if value is None: +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='schema') +# if value is None: +# value = "sqlite" +# return value +# +# @field_validator('backup_path', mode="before") +# @classmethod +# def set_backup_path(cls, value, values): +# match value: +# case str(): +# value = Path(value) +# case None: +# value = values.data['directory_path'].joinpath("Database backups") +# if not value.exists(): +# try: +# value.mkdir(parents=True) +# except OSError: +# value = Path(askdirectory(title="Directory for backups.")) +# return value +# +# @field_validator('directory_path', mode="before") +# @classmethod +# def ensure_directory_exists(cls, value, values): +# if value is None: +# match values.data['database_schema']: +# case "sqlite": +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent +# case _: +# Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing +# value = Path(askdirectory( +# title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file +# if isinstance(value, str): +# value = Path(value) +# try: +# check = value.exists() +# except AttributeError: +# check = False +# if not check: +# value.mkdir(exist_ok=True) +# return value +# +# @field_validator('database_path', mode="before") +# @classmethod +# def ensure_database_exists(cls, value, values): +# match values.data['database_schema']: +# case "sqlite": +# if value is None: +# value = values.data['directory_path'] +# if isinstance(value, str): +# value = Path(value) +# case _: +# if value is None: +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent +# return value +# +# @field_validator('database_name', mode='before') +# @classmethod +# def get_database_name(cls, value): +# if value is None: +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').stem +# return value +# +# @field_validator("database_user", mode='before') +# @classmethod +# def get_user(cls, value): +# if value is None: +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='user') +# return value +# +# @field_validator("database_password", mode='before') +# @classmethod +# def get_pass(cls, value): +# if value is None: +# if check_if_app(): +# alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") +# else: +# alembic_path = project_path.joinpath("alembic.ini") +# value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='pass') +# return value +# +# @field_validator('database_session', mode="before") +# @classmethod +# def create_database_session(cls, value, values): +# if value is not None: +# return value +# else: +# match values.data['database_schema']: +# case "sqlite": +# value = f"/{values.data['database_path']}" +# db_name = f"{values.data['database_name']}.db" +# template = jinja_template_loading().from_string( +# "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}") +# case "mssql+pyodbc": +# value = values.data['database_path'] +# db_name = values.data['database_name'] +# template = jinja_template_loading().from_string( +# "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Trusted_Connection=yes" +# ) +# case _: +# tmp = jinja_template_loading().from_string( +# "{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}@{{ values['database_path'] }}") +# value = tmp.render(values=values.data) +# db_name = values.data['database_name'] +# database_path = template.render(values=values.data, value=value, db_name=db_name) +# print(f"Using {database_path} for database path") +# engine = create_engine(database_path) +# session = Session(engine) +# return session +# +# @field_validator('package', mode="before") +# @classmethod +# def import_package(cls, value): +# import __init__ as package +# if value is None: +# return package +# +# def __init__(self, *args, **kwargs): +# super().__init__(*args, **kwargs) +# +# self.set_from_db() +# self.set_scripts() +# +# def set_from_db(self): +# if 'pytest' in sys.modules: +# output = dict(power_users=['lwark', 'styson', 'ruwang'], +# startup_scripts=dict(hello=None), +# teardown_scripts=dict(goodbye=None) +# ) +# else: +# session = self.database_session +# metadata = MetaData() +# try: +# metadata.reflect(bind=session.get_bind()) +# except AttributeError as e: +# print(f"Error getting tables: {e}") +# return +# if "_configitem" not in metadata.tables.keys(): +# print(f"Couldn't find _configitems in {metadata.tables.keys()}.") +# return +# config_items = session.execute(text("SELECT * FROM _configitem")).all() +# output = {} +# for item in config_items: +# try: +# output[item[1]] = json.loads(item[2]) +# except (JSONDecodeError, TypeError): +# output[item[1]] = item[2] +# for k, v in output.items(): +# if not hasattr(self, k): +# self.__setattr__(k, v) +# +# def set_scripts(self): +# """ +# Imports all functions from "scripts" folder, adding them to ctx scripts +# """ +# if check_if_app(): +# p = Path(sys._MEIPASS).joinpath("files", "scripts") +# else: +# p = Path(__file__).parents[2].joinpath("scripts").absolute() +# if p.__str__() not in sys.path: +# sys.path.append(p.__str__()) +# # NOTE: Get all .py files that don't have __ in them. +# modules = p.glob("[!__]*.py") +# for module in modules: +# mod = importlib.import_module(module.stem) +# for function in getmembers(mod, isfunction): +# name = function[0] +# func = function[1] +# # NOTE: assign function based on its name being in config: startup/teardown +# # NOTE: scripts must be registered using {name: Null} in the database +# if name in self.startup_scripts.keys(): +# self.startup_scripts[name] = func +# if name in self.teardown_scripts.keys(): +# self.teardown_scripts[name] = func +# +# @timer +# def run_startup(self): +# """ +# Runs startup scripts. +# """ +# for script in self.startup_scripts.values(): +# try: +# logger.info(f"Running startup script: {script.__name__}") +# thread = Thread(target=script, args=(ctx,)) +# thread.start() +# except AttributeError: +# logger.error(f"Couldn't run startup script: {script}") +# +# @timer +# def run_teardown(self): +# """ +# Runs teardown scripts. +# """ +# for script in self.teardown_scripts.values(): +# try: +# logger.info(f"Running teardown script: {script.__name__}") +# thread = Thread(target=script, args=(ctx,)) +# thread.start() +# except AttributeError: +# logger.error(f"Couldn't run teardown script: {script}") +# +# @classmethod +# def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str: +# c = ConfigParser() +# c.read(alembic_path) +# url = c['alembic']['sqlalchemy.url'] +# match mode: +# case 'path': +# path = re.sub(r"^.*//", "", url) +# path = re.sub(r"^.*@", "", path) +# return Path(path) +# case "schema": +# return url[:url.index(":")] +# case "user": +# url = re.sub(r"^.*//", "", url) +# try: +# return url[:url.index("@")].split(":")[0] +# except (IndexError, ValueError) as e: +# return None +# case "pass": +# url = re.sub(r"^.*//", "", url) +# try: +# return url[:url.index("@")].split(":")[1] +# except (IndexError, ValueError) as e: +# return None +# +# def save(self, settings_path: Path): +# if not settings_path.exists(): +# dicto = {} +# for k, v in self.__dict__.items(): +# if k in ['package', 'database_session', 'submission_types']: +# continue +# match v: +# case Path(): +# if v.is_dir(): +# v = v.absolute().__str__() +# elif v.is_file(): +# v = v.parent.absolute().__str__() +# else: +# v = v.__str__() +# case _: +# pass +# dicto[k] = v +# with open(settings_path, 'w') as f: +# yaml.dump(dicto, f) +# +# +# def get_config(settings_path: Path | str | None = None) -> Settings: +# """ +# Get configuration settings from path or default if blank. +# +# Args: +# settings_path (Path | str | None, optional): Path to config.yml Defaults to None. +# override (dict | None, optional): dictionary of settings to be used instead of file. Defaults to None. +# +# Returns: +# Settings: Pydantic settings object +# """ +# if isinstance(settings_path, str): +# settings_path = Path(settings_path) +# +# # NOTE: custom pyyaml constructor to join fields +# def join(loader, node): +# seq = loader.construct_sequence(node) +# return ''.join([str(i) for i in seq]) +# # NOTE: register the tag handler +# yaml.add_constructor('!join', join) +# # NOTE: make directories +# try: +# CONFIGDIR.mkdir(parents=True) +# except FileExistsError: +# logger.warning(f"Config directory {CONFIGDIR} already exists.") +# try: +# LOGDIR.mkdir(parents=True) +# except FileExistsError: +# logger.warning(f"Logging directory {LOGDIR} already exists.") +# # NOTE: if user hasn't defined config path in cli args +# if settings_path is None: +# # NOTE: Check user .config/submissions directory +# if CONFIGDIR.joinpath("config.yml").exists(): +# settings_path = CONFIGDIR.joinpath("config.yml") +# # NOTE: Check user .submissions directory +# elif Path.home().joinpath(".submissions", "config.yml").exists(): +# settings_path = Path.home().joinpath(".submissions", "config.yml") +# # NOTE: finally look in the local config +# else: +# if check_if_app(): +# settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml") +# else: +# settings_path = project_path.joinpath('src', 'config.yml') +# with open(settings_path, "r") as dset: +# default_settings = yaml.load(dset, Loader=yaml.Loader) +# # NOTE: Tell program we need to copy the config.yml to the user directory +# # NOTE: copy settings to config directory +# settings = Settings(**default_settings) +# settings.save(settings_path=CONFIGDIR.joinpath("config.yml")) +# return settings +# else: +# # NOTE: check if user defined path is directory +# if settings_path.is_dir(): +# settings_path = settings_path.joinpath("config.yml") +# # NOTE: check if user defined path is file +# elif settings_path.is_file(): +# settings_path = settings_path +# else: +# logger.error("No config.yml file found. Writing to directory.") +# with open(settings_path, "r") as dset: +# default_settings = yaml.load(dset, Loader=yaml.Loader) +# settings = Settings(**default_settings) +# settings.save(settings_path=settings_path) +# with open(settings_path, "r") as stream: +# settings = yaml.load(stream, Loader=yaml.Loader) +# return Settings(**settings) +# def check_if_app() -> bool: """ @@ -805,6 +826,7 @@ def setup_lookup(func): elif v is not None: sanitized_kwargs[k] = v return func(*args, **sanitized_kwargs) + return wrapper @@ -1062,6 +1084,7 @@ def check_authorization(func): report.add_result( Result(owner=func.__str__(), code=1, msg=error_msg, status="warning")) return report, kwargs + return wrapper @@ -1087,6 +1110,7 @@ def under_development(func): Result(owner=func.__str__(), code=1, msg=error_msg, status="warning")) return report + return wrapper @@ -1141,12 +1165,13 @@ def report_result(func): else: true_output = None return true_output + return wrapper def is_list_etc(object): match object: - case str(): #: I don't want to iterate strings, so hardcoding that + case str(): #: I don't want to iterate strings, so hardcoding that return False case Report(): return False @@ -1174,6 +1199,7 @@ def create_holidays_for_year(year: int | None = None) -> List[date]: offset = -d.weekday() # weekday == 0 means Monday output = d + timedelta(offset) return output.date() + if not year: year = date.today().year # NOTE: Includes New Year's day for next year. @@ -1218,15 +1244,85 @@ class classproperty(property): def __get__(self, owner_self, owner_cls): return self.fget(owner_cls) + # NOTE: Monkey patching... hooray! builtins.classproperty = classproperty -ctx = get_config(None) -class Settings2(BaseSettings, extra="allow"): +class Settings(BaseSettings, extra="allow"): + """ + Pydantic model to hold settings - model_config = SettingsConfigDict(yaml_file="C:\\Users\lwark\AppData\Local\submissions\config\config.yml", - yaml_file_encoding='utf-8') + Raises: + FileNotFoundError: Error if database not found. + + """ + database_schema: str | None = None + directory_path: Path | None = None + database_user: str | None = None + database_password: str | None = None + database_name: str | None = None + database_path: Path | str | None = None + backup_path: Path | str | None = None + submission_types: dict | None = None + database_session: Session | None = None + package: Any | None = None + logging_enabled: bool = Field(default=False) + + @classproperty + def main_aux_dir(cls): + if platform.system() == "Windows": + os_config_dir = "AppData/local" + # logger.info(f"Got platform Windows, config_dir: {os_config_dir}") + else: + os_config_dir = ".config" + # logger.info(f"Got platform {platform.system()}, config_dir: {os_config_dir}") + return Path.home().joinpath(f"{os_config_dir}/submissions") + + @classproperty + def configdir(cls): + return cls.main_aux_dir.joinpath("config") + + @classproperty + def logdir(cls): + return cls.main_aux_dir.joinpath("logs") + + def __new__(cls, *args, **kwargs): + if "settings_path" in kwargs.keys(): + settings_path = kwargs['settings_path'] + if isinstance(settings_path, str): + settings_path = Path(settings_path) + else: + settings_path = None + if settings_path is None: + # NOTE: Check user .config/submissions directory + # if CONFIGDIR.joinpath("config.yml").exists(): + if cls.configdir.joinpath("config.yml").exists(): + settings_path = cls.configdir.joinpath("config.yml") + # NOTE: Check user .submissions directory + elif Path.home().joinpath(".submissions", "config.yml").exists(): + settings_path = Path.home().joinpath(".submissions", "config.yml") + # NOTE: finally look in the local config + else: + if check_if_app(): + settings_path = Path(sys._MEIPASS).joinpath("files", "config.yml") + else: + settings_path = project_path.joinpath('src', 'config.yml') + # with open(settings_path, "r") as dset: + # default_settings = yaml.load(dset, Loader=yaml.Loader) + else: + # NOTE: check if user defined path is directory + if settings_path.is_dir(): + settings_path = settings_path.joinpath("config.yml") + # NOTE: check if user defined path is file + elif settings_path.is_file(): + settings_path = settings_path + else: + raise FileNotFoundError(f"{settings_path} not found.") + # NOTE: how to load default settings into this? + print(f"Loading settings from {settings_path}") + cls.model_config = SettingsConfigDict(yaml_file=settings_path, yaml_file_encoding='utf-8', extra="allow") + return super().__new__(cls) @classmethod def settings_customise_sources( @@ -1245,6 +1341,286 @@ class Settings2(BaseSettings, extra="allow"): file_secret_settings, ) + @field_validator('database_schema', mode="before") + @classmethod + def set_schema(cls, value): + if value is None: + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='schema') + if value is None: + value = "sqlite" + return value + + @field_validator('backup_path', mode="before") + @classmethod + def set_backup_path(cls, value, values): + match value: + case str(): + value = Path(value) + case None: + value = values.data['directory_path'].joinpath("Database backups") + if not value.exists(): + try: + value.mkdir(parents=True) + except OSError: + value = Path(askdirectory(title="Directory for backups.")) + return value + + @field_validator('directory_path', mode="before") + @classmethod + def ensure_directory_exists(cls, value, values): + if value is None: + match values.data['database_schema']: + case "sqlite": + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent + case _: + Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing + value = Path(askdirectory( + title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file + if isinstance(value, str): + value = Path(value) + try: + check = value.exists() + except AttributeError: + check = False + if not check: + value.mkdir(exist_ok=True) + return value + + @field_validator('database_path', mode="before") + @classmethod + def ensure_database_exists(cls, value, values): + match values.data['database_schema']: + case "sqlite": + if value is None: + value = values.data['directory_path'] + if isinstance(value, str): + value = Path(value) + case _: + if value is None: + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent + return value + + @field_validator('database_name', mode='before') + @classmethod + def get_database_name(cls, value): + if value is None: + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').stem + return value + + @field_validator("database_user", mode='before') + @classmethod + def get_user(cls, value): + if value is None: + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='user') + return value + + @field_validator("database_password", mode='before') + @classmethod + def get_pass(cls, value): + if value is None: + if check_if_app(): + alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini") + else: + alembic_path = project_path.joinpath("alembic.ini") + value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='pass') + return value + + @field_validator('database_session', mode="before") + @classmethod + def create_database_session(cls, value, values): + if value is not None: + return value + else: + match values.data['database_schema']: + case "sqlite": + value = f"/{values.data['database_path']}" + db_name = f"{values.data['database_name']}.db" + template = jinja_template_loading().from_string( + "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}") + case "mssql+pyodbc": + value = values.data['database_path'] + db_name = values.data['database_name'] + template = jinja_template_loading().from_string( + "{{ values['database_schema'] }}://{{ value }}/{{ db_name }}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Trusted_Connection=yes" + ) + case _: + tmp = jinja_template_loading().from_string( + "{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}@{{ values['database_path'] }}") + value = tmp.render(values=values.data) + db_name = values.data['database_name'] + database_path = template.render(values=values.data, value=value, db_name=db_name) + print(f"Using {database_path} for database path") + engine = create_engine(database_path) + session = Session(engine) + return session + + @field_validator('package', mode="before") + @classmethod + def import_package(cls, value): + import __init__ as package + if value is None: + return package + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + try: + del kwargs['settings_path'] + except KeyError: + pass + self.set_from_db() + self.set_scripts() + self.save() + + def set_from_db(self): + if 'pytest' in sys.modules: + output = dict(power_users=['lwark', 'styson', 'ruwang'], + startup_scripts=dict(hello=None), + teardown_scripts=dict(goodbye=None) + ) + else: + session = self.database_session + metadata = MetaData() + try: + metadata.reflect(bind=session.get_bind()) + except AttributeError as e: + print(f"Error getting tables: {e}") + return + if "_configitem" not in metadata.tables.keys(): + print(f"Couldn't find _configitems in {metadata.tables.keys()}.") + return + config_items = session.execute(text("SELECT * FROM _configitem")).all() + output = {} + for item in config_items: + try: + output[item[1]] = json.loads(item[2]) + except (JSONDecodeError, TypeError): + output[item[1]] = item[2] + for k, v in output.items(): + if not hasattr(self, k): + self.__setattr__(k, v) + + def set_scripts(self): + """ + Imports all functions from "scripts" folder, adding them to ctx scripts + """ + if check_if_app(): + p = Path(sys._MEIPASS).joinpath("files", "scripts") + else: + p = Path(__file__).parents[2].joinpath("scripts").absolute() + if p.__str__() not in sys.path: + sys.path.append(p.__str__()) + # NOTE: Get all .py files that don't have __ in them. + modules = p.glob("[!__]*.py") + for module in modules: + mod = importlib.import_module(module.stem) + for function in getmembers(mod, isfunction): + name = function[0] + func = function[1] + # NOTE: assign function based on its name being in config: startup/teardown + # NOTE: scripts must be registered using {name: Null} in the database + if name in self.startup_scripts.keys(): + self.startup_scripts[name] = func + if name in self.teardown_scripts.keys(): + self.teardown_scripts[name] = func + + @timer + def run_startup(self): + """ + Runs startup scripts. + """ + for script in self.startup_scripts.values(): + try: + logger.info(f"Running startup script: {script.__name__}") + thread = Thread(target=script, args=(ctx,)) + thread.start() + except AttributeError: + logger.error(f"Couldn't run startup script: {script}") + + @timer + def run_teardown(self): + """ + Runs teardown scripts. + """ + for script in self.teardown_scripts.values(): + try: + logger.info(f"Running teardown script: {script.__name__}") + thread = Thread(target=script, args=(ctx,)) + thread.start() + except AttributeError: + logger.error(f"Couldn't run teardown script: {script}") + + @classmethod + def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str: + c = ConfigParser() + c.read(alembic_path) + url = c['alembic']['sqlalchemy.url'] + match mode: + case 'path': + path = re.sub(r"^.*//", "", url) + path = re.sub(r"^.*@", "", path) + return Path(path) + case "schema": + return url[:url.index(":")] + case "user": + url = re.sub(r"^.*//", "", url) + try: + return url[:url.index("@")].split(":")[0] + except (IndexError, ValueError) as e: + return None + case "pass": + url = re.sub(r"^.*//", "", url) + try: + return url[:url.index("@")].split(":")[1] + except (IndexError, ValueError) as e: + return None + + def save(self): + if not self.configdir.joinpath("config.yml").exists(): + try: + self.configdir.mkdir(parents=True) + except FileExistsError: + logger.warning(f"Config directory {self.configdir} already exists.") + try: + self.logdir.mkdir(parents=True) + except FileExistsError: + logger.warning(f"Logging directory {self.configdir} already exists.") + dicto = {} + for k, v in self.__dict__.items(): + if k in ['package', 'database_session', 'submission_types']: + continue + match v: + case Path(): + if v.is_dir(): + v = v.absolute().__str__() + elif v.is_file(): + v = v.parent.absolute().__str__() + else: + v = v.__str__() + case _: + pass + dicto[k] = v + with open(self.configdir.joinpath("config.yml"), 'w') as f: + yaml.dump(dicto, f) - +ctx = Settings()