Increased portability.
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
directory_path: null
|
directory_path: null
|
||||||
database_path: null
|
database_path: null
|
||||||
database_schema: sqlite
|
database_schema: null
|
||||||
database_user: null
|
database_user: null
|
||||||
database_password: null
|
database_password: null
|
||||||
database_name: null
|
database_name: null
|
||||||
@@ -230,11 +230,11 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
FileNotFoundError: Error if database not found.
|
FileNotFoundError: Error if database not found.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
database_schema: str
|
database_schema: str | None = None
|
||||||
directory_path: Path | None = None
|
directory_path: Path | None = None
|
||||||
database_user: str | None = None
|
database_user: str | None = None
|
||||||
database_password: str | None = None
|
database_password: str | None = None
|
||||||
database_name: str
|
database_name: str | None = None
|
||||||
database_path: Path | str | None = None
|
database_path: Path | str | None = None
|
||||||
backup_path: Path | str | None = None
|
backup_path: Path | str | None = None
|
||||||
# super_users: list|None = None
|
# super_users: list|None = None
|
||||||
@@ -246,6 +246,21 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
|
|
||||||
model_config = SettingsConfigDict(env_file_encoding='utf-8')
|
model_config = SettingsConfigDict(env_file_encoding='utf-8')
|
||||||
|
|
||||||
|
@field_validator('database_schema', mode="before")
|
||||||
|
@classmethod
|
||||||
|
def set_schema(cls, value):
|
||||||
|
if value is None:
|
||||||
|
# print("No value for dir path")
|
||||||
|
if check_if_app():
|
||||||
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
|
else:
|
||||||
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='schema')
|
||||||
|
if value is None:
|
||||||
|
value = "sqlite"
|
||||||
|
return value
|
||||||
|
|
||||||
@field_validator('backup_path', mode="before")
|
@field_validator('backup_path', mode="before")
|
||||||
@classmethod
|
@classmethod
|
||||||
def set_backup_path(cls, value, values):
|
def set_backup_path(cls, value, values):
|
||||||
@@ -255,35 +270,43 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
case None:
|
case None:
|
||||||
value = values.data['directory_path'].joinpath("Database backups")
|
value = values.data['directory_path'].joinpath("Database backups")
|
||||||
if not value.exists():
|
if not value.exists():
|
||||||
|
try:
|
||||||
value.mkdir(parents=True)
|
value.mkdir(parents=True)
|
||||||
|
except OSError:
|
||||||
|
value = Path(askdirectory(title="Directory for backups."))
|
||||||
|
# value.mkdir(parents=True)
|
||||||
# metadata.backup_path = value
|
# metadata.backup_path = value
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@field_validator('directory_path', mode="before")
|
@field_validator('directory_path', mode="before")
|
||||||
@classmethod
|
@classmethod
|
||||||
def ensure_directory_exists(cls, value):
|
def ensure_directory_exists(cls, value, values):
|
||||||
if value is None:
|
if value is None:
|
||||||
|
match values.data['database_schema']:
|
||||||
|
case "sqlite":
|
||||||
# print("No value for dir path")
|
# print("No value for dir path")
|
||||||
if check_if_app():
|
if check_if_app():
|
||||||
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
else:
|
else:
|
||||||
alembic_path = project_path.joinpath("alembic.ini")
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
# print(f"Getting alembic path: {alembic_path}")
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
value = cls.get_alembic_db_path(alembic_path=alembic_path).parent
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent
|
||||||
# print(f"Using {value}")
|
# print(f"Using {value}")
|
||||||
|
case _:
|
||||||
|
Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing
|
||||||
|
value = Path(askdirectory(
|
||||||
|
title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file
|
||||||
if isinstance(value, str):
|
if isinstance(value, str):
|
||||||
value = Path(value)
|
value = Path(value)
|
||||||
try:
|
try:
|
||||||
check = value.exists()
|
check = value.exists()
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
check = False
|
check = False
|
||||||
if not check:
|
if not check: #and values.data['database_schema'] == "sqlite":
|
||||||
# print(f"No directory found, using Documents/submissions")
|
# print(f"No directory found, using Documents/submissions")
|
||||||
# value = Path.home().joinpath("Documents", "submissions")
|
# value = Path.home().joinpath("Documents", "submissions")
|
||||||
Tk().withdraw() # we don't want a full GUI, so keep the root window from appearing
|
|
||||||
value = Path(askdirectory(title="Select directory for DB storage")) # show an "Open" dialog box and return the path to the selected file
|
|
||||||
value.mkdir(exist_ok=True)
|
value.mkdir(exist_ok=True)
|
||||||
# print(f"Final return of directory_path: {value}")
|
print(f"Final return of directory_path: {value}")
|
||||||
return value
|
return value
|
||||||
|
|
||||||
@field_validator('database_path', mode="before")
|
@field_validator('database_path', mode="before")
|
||||||
@@ -291,14 +314,24 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
def ensure_database_exists(cls, value, values):
|
def ensure_database_exists(cls, value, values):
|
||||||
# if value == ":memory:":
|
# if value == ":memory:":
|
||||||
# return value
|
# return value
|
||||||
if value is None:
|
# and values.data['database_schema'] == "sqlite":
|
||||||
value = values.data['directory_path']
|
# value = values.data['directory_path']
|
||||||
match values.data['database_schema']:
|
match values.data['database_schema']:
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
value = Path(f"{Path(value).absolute().__str__()}/{values.data['database_name']}.db")
|
if value is None:
|
||||||
|
value = values.data['directory_path']
|
||||||
|
if isinstance(value, str):
|
||||||
|
value = Path(value)
|
||||||
# db_name = f"{values.data['database_name']}.db"
|
# db_name = f"{values.data['database_name']}.db"
|
||||||
case _:
|
case _:
|
||||||
value = f"{value}/{values.data['database_name']}"
|
if value is None:
|
||||||
|
if check_if_app():
|
||||||
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
|
else:
|
||||||
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').parent
|
||||||
|
# value = f"{value}/{values.data['database_name']}"
|
||||||
# db_name = values.data['database_name']
|
# db_name = values.data['database_name']
|
||||||
# match value:
|
# match value:
|
||||||
# case str():
|
# case str():
|
||||||
@@ -311,21 +344,64 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
# raise FileNotFoundError(f"Couldn't find database at {value}")
|
# raise FileNotFoundError(f"Couldn't find database at {value}")
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@field_validator('database_name', mode='before')
|
||||||
|
@classmethod
|
||||||
|
def get_database_name(cls, value):
|
||||||
|
if value is None:
|
||||||
|
if check_if_app():
|
||||||
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
|
else:
|
||||||
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='path').stem
|
||||||
|
return value
|
||||||
|
|
||||||
|
@field_validator("database_user", mode='before')
|
||||||
|
@classmethod
|
||||||
|
def get_user(cls, value):
|
||||||
|
if value is None:
|
||||||
|
if check_if_app():
|
||||||
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
|
else:
|
||||||
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='user')
|
||||||
|
print(f"Got {value} for user")
|
||||||
|
return value
|
||||||
|
|
||||||
|
@field_validator("database_password", mode='before')
|
||||||
|
@classmethod
|
||||||
|
def get_pass(cls, value):
|
||||||
|
if value is None:
|
||||||
|
if check_if_app():
|
||||||
|
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
||||||
|
else:
|
||||||
|
alembic_path = project_path.joinpath("alembic.ini")
|
||||||
|
# print(f"Getting alembic path: {alembic_path}")
|
||||||
|
value = cls.get_alembic_db_path(alembic_path=alembic_path, mode='pass')
|
||||||
|
print(f"Got {value} for pass")
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
@field_validator('database_session', mode="before")
|
@field_validator('database_session', mode="before")
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_database_session(cls, value, values):
|
def create_database_session(cls, value, values):
|
||||||
|
|
||||||
if value is not None:
|
if value is not None:
|
||||||
return value
|
return value
|
||||||
else:
|
else:
|
||||||
match values.data['database_schema']:
|
match values.data['database_schema']:
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
value = f"/{values.data['database_path']}"
|
value = f"/{values.data['database_path']}"
|
||||||
# db_name = f"{values.data['database_name']}.db"
|
db_name = f"{values.data['database_name']}.db"
|
||||||
case _:
|
case _:
|
||||||
value = f"@{values.data['database_path']}"
|
print(pprint.pprint(values.data))
|
||||||
|
tmp = jinja_template_loading().from_string("{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}@{{ values['database_path'] }}")
|
||||||
|
value = tmp.render(values=values.data)
|
||||||
|
db_name = values.data['database_name']
|
||||||
template = jinja_template_loading().from_string(
|
template = jinja_template_loading().from_string(
|
||||||
"{{ values['database_schema'] }}://{% if values['database_user'] %}{{ values['database_user'] }}{% if values['database_password'] %}:{{ values['database_password'] }}{% endif %}{% endif %}{{ value }}")
|
"{{ values['database_schema'] }}://{{ value }}/{{ db_name }}")
|
||||||
database_path = template.render(values=values.data, value=value)
|
database_path = template.render(values=values.data, value=value, db_name=db_name)
|
||||||
# print(f"Using {database_path} for database path")
|
# print(f"Using {database_path} for database path")
|
||||||
# database_path = values.data['database_path']
|
# database_path = values.data['database_path']
|
||||||
# if database_path is None:
|
# if database_path is None:
|
||||||
@@ -360,24 +436,11 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
if value is None:
|
if value is None:
|
||||||
return package
|
return package
|
||||||
|
|
||||||
@field_validator('database_name', mode='before')
|
|
||||||
@classmethod
|
|
||||||
def get_database_name(cls, value, values):
|
|
||||||
if value is None:
|
|
||||||
if check_if_app():
|
|
||||||
alembic_path = Path(sys._MEIPASS).joinpath("files", "alembic.ini")
|
|
||||||
else:
|
|
||||||
alembic_path = project_path.joinpath("alembic.ini")
|
|
||||||
# print(f"Getting alembic path: {alembic_path}")
|
|
||||||
value = cls.get_alembic_db_path(alembic_path=alembic_path).stem
|
|
||||||
return value
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.set_from_db(db_path=kwargs['database_path'])
|
self.set_from_db(db_path=kwargs['database_path'])
|
||||||
|
|
||||||
|
|
||||||
def set_from_db(self, db_path: Path):
|
def set_from_db(self, db_path: Path):
|
||||||
if 'pytest' in sys.modules:
|
if 'pytest' in sys.modules:
|
||||||
output = dict(power_users=['lwark', 'styson', 'ruwang'])
|
output = dict(power_users=['lwark', 'styson', 'ruwang'])
|
||||||
@@ -400,11 +463,32 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
self.__setattr__(k, v)
|
self.__setattr__(k, v)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_alembic_db_path(cls, alembic_path) -> Path:
|
def get_alembic_db_path(cls, alembic_path, mode=Literal['path', 'schema', 'user', 'pass']) -> Path | str:
|
||||||
c = ConfigParser()
|
c = ConfigParser()
|
||||||
c.read(alembic_path)
|
c.read(alembic_path)
|
||||||
path = c['alembic']['sqlalchemy.url'].replace("sqlite:///", "")
|
url = c['alembic']['sqlalchemy.url']
|
||||||
|
match mode:
|
||||||
|
case 'path':
|
||||||
|
# path = url.replace("sqlite:///", "")
|
||||||
|
path = re.sub(r"^.*//", "", url)
|
||||||
|
path = re.sub(r"^.*@", "", path)
|
||||||
return Path(path)
|
return Path(path)
|
||||||
|
case "schema":
|
||||||
|
return url[:url.index(":")]
|
||||||
|
case "user":
|
||||||
|
url = re.sub(r"^.*//", "", url)
|
||||||
|
try:
|
||||||
|
return url[:url.index("@")].split(":")[0]
|
||||||
|
except (IndexError, ValueError) as e:
|
||||||
|
print(f"Error on user: {e}")
|
||||||
|
return None
|
||||||
|
case "pass":
|
||||||
|
url = re.sub(r"^.*//", "", url)
|
||||||
|
try:
|
||||||
|
return url[:url.index("@")].split(":")[1]
|
||||||
|
except (IndexError, ValueError) as e:
|
||||||
|
print(f"Error on user: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
def save(self, settings_path: Path):
|
def save(self, settings_path: Path):
|
||||||
if not settings_path.exists():
|
if not settings_path.exists():
|
||||||
@@ -421,6 +505,8 @@ class Settings(BaseSettings, extra="allow"):
|
|||||||
elif v.is_file():
|
elif v.is_file():
|
||||||
print("file")
|
print("file")
|
||||||
v = v.parent.absolute().__str__()
|
v = v.parent.absolute().__str__()
|
||||||
|
else:
|
||||||
|
v = v.__str__()
|
||||||
case _:
|
case _:
|
||||||
pass
|
pass
|
||||||
print(f"Key: {k}, Value: {v}")
|
print(f"Key: {k}, Value: {v}")
|
||||||
|
|||||||
Reference in New Issue
Block a user