Before code cleanup.

This commit is contained in:
lwark
2024-07-30 11:02:48 -05:00
parent f94b37def0
commit 2a07265cbc
3 changed files with 13 additions and 81 deletions

View File

@@ -235,9 +235,6 @@ class Settings(BaseSettings, extra="allow"):
database_name: str | None = None
database_path: Path | str | None = None
backup_path: Path | str | None = None
# super_users: list|None = None
# power_users: list|None = None
# rerun_regex: str
submission_types: dict | None = None
database_session: Session | None = None
package: Any | None = None
@@ -272,8 +269,6 @@ class Settings(BaseSettings, extra="allow"):
value.mkdir(parents=True)
except OSError:
value = Path(askdirectory(title="Directory for backups."))
# value.mkdir(parents=True)
# metadata.backup_path = value
return value
@field_validator('directory_path', mode="before")
@@ -302,7 +297,6 @@ class Settings(BaseSettings, extra="allow"):
check = False
if not check: # and values.data['database_schema'] == "sqlite":
# print(f"No directory found, using Documents/submissions")
# value = Path.home().joinpath("Documents", "submissions")
value.mkdir(exist_ok=True)
# print(f"Final return of directory_path: {value}")
return value
@@ -310,17 +304,12 @@ class Settings(BaseSettings, extra="allow"):
@field_validator('database_path', mode="before")
@classmethod
def ensure_database_exists(cls, value, values):
# if value == ":memory:":
# return value
# and values.data['database_schema'] == "sqlite":
# value = values.data['directory_path']
match values.data['database_schema']:
case "sqlite":
if value is None:
value = values.data['directory_path']
if isinstance(value, str):
value = Path(value)
# db_name = f"{values.data['database_name']}.db"
case _:
if value is None:
if check_if_app():
@@ -329,17 +318,6 @@ class Settings(BaseSettings, extra="allow"):
alembic_path = project_path.joinpath("alembic.ini")
# print(f"Getting alembic path: {alembic_path}")
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent
# value = f"{value}/{values.data['database_name']}"
# db_name = values.data['database_name']
# match value:
# case str():
# value = Path(value)
# case None:
# value = values.data['directory_path'].joinpath("submissions.db")
# if value.exists():
# return value
# else:
# raise FileNotFoundError(f"Couldn't find database at {value}")
return value
@field_validator('database_name', mode='before')
@@ -383,7 +361,6 @@ class Settings(BaseSettings, extra="allow"):
@field_validator('database_session', mode="before")
@classmethod
def create_database_session(cls, value, values):
if value is not None:
return value
else:
@@ -401,28 +378,6 @@ class Settings(BaseSettings, extra="allow"):
"{{ values['database_schema'] }}://{{ value }}/{{ db_name }}")
database_path = template.render(values=values.data, value=value, db_name=db_name)
# print(f"Using {database_path} for database path")
# database_path = values.data['database_path']
# if database_path is None:
# # NOTE: check in user's .submissions directory for submissions.db
# if Path.home().joinpath(".submissions", "submissions.db").exists():
# database_path = Path.home().joinpath(".submissions", "submissions.db")
# # NOTE: finally, look in the local dir
# else:
# database_path = project_path.joinpath("submissions.db")
# else:
# if database_path == ":memory:":
# pass
# # NOTE: check if user defined path is directory
# elif database_path.is_dir():
# database_path = database_path.joinpath("submissions.db")
# # NOTE: check if user defined path is a file
# elif database_path.is_file():
# database_path = database_path
# else:
# raise FileNotFoundError("No database file found. Exiting program.")
# print(f"Using {database_path} for database file.")
# engine = create_engine(f"sqlite:///{database_path}") #, echo=True, future=True)
# engine = create_engine("postgresql+psycopg2://postgres:RE,4321q@localhost:5432/submissions")
engine = create_engine(database_path)
session = Session(engine)
return session
@@ -444,8 +399,6 @@ class Settings(BaseSettings, extra="allow"):
output = dict(power_users=['lwark', 'styson', 'ruwang'])
else:
print(f"Hello from database settings getter.")
# session = Session(create_engine(f"sqlite:///{db_path}"))
# print(self.__dict__)
session = self.database_session
metadata = MetaData()
@@ -466,7 +419,6 @@ class Settings(BaseSettings, extra="allow"):
output[item[1]] = json.loads(item[2])
except (JSONDecodeError, TypeError):
output[item[1]] = item[2]
# config_items = {item[1]: json.loads(item[2]) for item in config_items}
for k, v in output.items():
if not hasattr(self, k):
self.__setattr__(k, v)
@@ -479,7 +431,6 @@ class Settings(BaseSettings, extra="allow"):
url = c['alembic']['sqlalchemy.url']
match mode:
case 'path':
# path = url.replace("sqlite:///", "")
path = re.sub(r"^.*//", "", url)
path = re.sub(r"^.*@", "", path)
return Path(path)
@@ -523,7 +474,6 @@ class Settings(BaseSettings, extra="allow"):
dicto[k] = v
with open(settings_path, 'w') as f:
yaml.dump(dicto, f)
# return settings
def get_config(settings_path: Path | str | None = None) -> Settings:
@@ -540,16 +490,14 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
# logger.debug(f"Creating settings...")
if isinstance(settings_path, str):
settings_path = Path(settings_path)
# custom pyyaml constructor to join fields
# NOTE: custom pyyaml constructor to join fields
def join(loader, node):
seq = loader.construct_sequence(node)
return ''.join([str(i) for i in seq])
# register the tag handler
yaml.add_constructor('!join', join)
# make directories
# NOTE: make directories
try:
CONFIGDIR.mkdir(parents=True)
except FileExistsError:
@@ -574,10 +522,8 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
settings_path = project_path.joinpath('src', 'config.yml')
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
# NOTE: Tell program we need to copy the config.yml to the user directory
# NOTE: copy settings to config directory
# settings = Settings(**copy_settings(settings_path=CONFIGDIR.joinpath("config.yml"), settings=default_settings))
settings = Settings(**default_settings)
settings.save(settings_path=CONFIGDIR.joinpath("config.yml"))
# print(f"Default settings: {pprint.pprint(settings.__dict__)}")
@@ -593,7 +539,6 @@ def get_config(settings_path: Path | str | None = None) -> Settings:
logger.error("No config.yml file found. Writing to directory.")
with open(settings_path, "r") as dset:
default_settings = yaml.load(dset, Loader=yaml.Loader)
# return Settings(**copy_settings(settings_path=settings_path, settings=default_settings))
settings = Settings(**default_settings)
settings.save(settings_path=settings_path)
# logger.debug(f"Using {settings_path} for config file.")
@@ -623,7 +568,7 @@ class GroupWriteRotatingFileHandler(handlers.RotatingFileHandler):
"""
Override base class method to make the new log file group writable.
"""
# Rotate the file first.
# NOTE: Rotate the file first.
handlers.RotatingFileHandler.doRollover(self)
# NOTE: Add group write to the current permissions.
currMode = os.stat(self.baseFilename).st_mode
@@ -692,6 +637,11 @@ def setup_logger(verbosity: int = 3):
Returns:
logger: logger object
"""
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
logger = logging.getLogger("submissions")
logger.setLevel(logging.DEBUG)
# NOTE: create file handler which logs even debug messages
@@ -700,10 +650,6 @@ def setup_logger(verbosity: int = 3):
except FileExistsError:
logger.warning(f"Logging directory {LOGDIR} already exists.")
# NOTE: logging to file turned off due to repeated permission errors
# fh = GroupWriteRotatingFileHandler(LOGDIR.joinpath('submissions.log'), mode='a', maxBytes=100000, backupCount=3, encoding=None, delay=False)
# file logging will always be debug
# fh.setLevel(logging.DEBUG)
# fh.name = "File"
# NOTE: create console handler with a higher log level
# NOTE: create custom logger with STERR -> log
ch = logging.StreamHandler(stream=sys.stdout)
@@ -718,19 +664,10 @@ def setup_logger(verbosity: int = 3):
ch.name = "Stream"
# NOTE: create formatter and add it to the handlers
formatter = CustomFormatter()
# fh.setFormatter(formatter)
ch.setFormatter(formatter)
# NOTE: add the handlers to the logger
# logger.addHandler(fh)
logger.addHandler(ch)
# NOTE: Output exception and traceback to logger
def handle_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.critical("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
return logger
@@ -746,11 +683,6 @@ def copy_settings(settings_path: Path, settings: dict) -> dict:
Returns:
dict: output dictionary for use in first run
"""
# NOTE: if the current user is not a superuser remove the superusers entry
# if not getpass.getuser() in settings['super_users']:
# del settings['super_users']
# if not getpass.getuser() in settings['power_users']:
# del settings['power_users']
if not settings_path.exists():
with open(settings_path, 'w') as f:
yaml.dump(settings, f)