This commit is contained in:
zhaojing1987 2023-11-20 09:29:25 +08:00
parent 383bbc91d9
commit 58dec39638
24 changed files with 713 additions and 255 deletions

View file

@ -5,6 +5,8 @@ from src.schemas.appInstall import appInstall
from src.schemas.appResponse import AppResponse
from src.schemas.errorResponse import ErrorResponse
from src.services.app_manager import AppManger
from src.services.common_check import install_validate
from threading import Thread
router = APIRouter()
@ -84,8 +86,16 @@ def apps_install(
appInstall: appInstall,
endpointId: int = Query(None, description="Endpoint ID to install app on,if not set, install on the local endpoint"),
):
return AppManger().install_app(appInstall, endpointId)
# install validate
install_validate(appInstall,endpointId)
# install app
Thread(target=AppManger().install_app, args=(appInstall, endpointId)).start()
# return success
return ErrorResponse(
status_code=200,
message="Success",
details="The app is installing and can be viewed through 'My Apps.'",
)
@router.post(
"/apps/{app_id}/start",

View file

@ -7,7 +7,6 @@ from src.schemas.errorResponse import ErrorResponse
from src.schemas.proxyHosts import ProxyHost
from src.services.app_manager import AppManger
router = APIRouter()
@router.get(

View file

@ -1,7 +1,6 @@
from fastapi import APIRouter, Query,Path
from src.schemas.appSettings import AppSettings
from src.schemas.errorResponse import ErrorResponse
from typing import List
from src.services.settings_manager import SettingsManager

View file

@ -0,0 +1,3 @@
Metadata-Version: 2.1
Name: apphub
Version: 0.2

View file

@ -0,0 +1,10 @@
README.md
setup.py
src/apphub.egg-info/PKG-INFO
src/apphub.egg-info/SOURCES.txt
src/apphub.egg-info/dependency_links.txt
src/apphub.egg-info/entry_points.txt
src/apphub.egg-info/requires.txt
src/apphub.egg-info/top_level.txt
src/cli/__init__.py
src/cli/apphub_cli.py

View file

@ -0,0 +1 @@

View file

@ -0,0 +1,2 @@
[console_scripts]
apphub = cli.apphub_cli:cli

View file

@ -0,0 +1 @@
click

View file

@ -0,0 +1 @@
cli

View file

@ -16,14 +16,24 @@ def cli():
@cli.command()
def genkey():
"""Generate a new API key"""
key = APIKeyManager().generate_key()
click.echo(f"{key}")
try:
key = APIKeyManager().generate_key()
click.echo(f"{key}")
except CustomException as e:
raise click.ClickException(e.details)
except Exception as e:
raise click.ClickException(str(e))
@cli.command()
def getkey():
"""Get the API key"""
key = APIKeyManager().get_key()
click.echo(f"{key}")
try:
key = APIKeyManager().get_key()
click.echo(f"{key}")
except CustomException as e:
raise click.ClickException(e.details)
except Exception as e:
raise click.ClickException(str(e))
@cli.command()
@click.option('--section',required=True, help='The section name')
@ -31,7 +41,12 @@ def getkey():
@click.option('--value', required=True,help='The value of the key')
def setconfig(section, key, value):
"""Set a config value"""
SettingsManager().write_section(section, key, value)
try:
SettingsManager().write_section(section, key, value)
except CustomException as e:
raise click.ClickException(e.details)
except Exception as e:
raise click.ClickException(str(e))
@cli.command()
@click.option('--section',required=True, help='The section name')
@ -47,11 +62,9 @@ def getconfig(section, key):
value = SettingsManager().read_key(section, key)
click.echo(f"{value}")
except CustomException as e:
click.echo(f"{e.details}")
return
raise click.ClickException(e.details)
except Exception as e:
click.echo(f"{e}")
return
raise click.ClickException(str(e))
if __name__ == "__main__":
cli()

View file

@ -15,14 +15,8 @@ base_url = http://websoft9-deployment:9000/api
user_name = admin
user_pwd = j4FYLqfisbv4vkYY
[docker_library]
path = /websoft9/library/apps
[app_media]
path = /websoft9/media/json/
[api_key]
key = d0a4996ad7819ae91a80c05c0d21800b610b5bf9fd53745db16e2f2ad9ae193c
key = df5b3dbcb00091462769fe0bac0c7008c8788ffa72f86c32f4a2c6af90daa285
[domain]
wildcard_domain = test.websoft9.cn

View file

@ -0,0 +1,9 @@
[APP_DB_MYSQL_SETTINGS]
port = 3306
user = root
[docker_library]
path = /websoft9/library/apps
[app_media]
path = /websoft9/media/json/

View file

@ -13,31 +13,40 @@ from src.schemas.errorResponse import ErrorResponse
from fastapi.responses import HTMLResponse
from fastapi.security.api_key import APIKeyHeader
# set uvicorn logger to stdout
uvicorn_logger = logging.getLogger("uvicorn")
# 创建一个日志处理器,将日志发送到 stdout
stdout_handler = logging.StreamHandler(sys.stdout)
# 将日志处理器添加到 Uvicorn 的 logger
uvicorn_logger.addHandler(stdout_handler)
uvicorn_logger.setLevel(logging.INFO)
API_KEY_NAME = "x-api-key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def verify_key(request: Request, api_key_header: str = Security(api_key_header)):
"""
Verify API Key
"""
# skip docs
if request.url.path == "/api/docs":
return None
# validate api key is provided
if api_key_header is None:
raise CustomException(
status_code=400,
message="Invalid Request",
details="No API Key provided"
)
# get api key from config
API_KEY = ConfigManager().get_value("api_key","key")
# validate api key is set
if API_KEY is None:
raise CustomException(
status_code=500,
message="Invalid API Key",
details="API Key is not set"
)
# validate api key is correct
if api_key_header != API_KEY:
logger.error(f"Invalid API Key: {api_key_header}")
raise CustomException(
@ -45,9 +54,7 @@ async def verify_key(request: Request, api_key_header: str = Security(api_key_he
message="Invalid Request",
details="Invalid API Key"
)
return api_key_header
app = FastAPI(
title="AppHub API",
@ -67,7 +74,7 @@ async def custom_swagger_ui_html():
<!DOCTYPE html>
<html>
<head>
<title>Websoft9 API</title>
<title>AppHub API</title>
<link rel="stylesheet" type="text/css" href="/api/static/swagger-ui.css">
<script src="/api/static/swagger-ui-bundle.js"></script>
</head>
@ -86,7 +93,6 @@ async def custom_swagger_ui_html():
"""
# remove 422 responses
@app.on_event("startup")
async def remove_422_responses():
openapi_schema = app.openapi()
for path, path_item in openapi_schema["paths"].items():
@ -94,6 +100,8 @@ async def remove_422_responses():
operation["responses"].pop("422", None)
app.openapi_schema = openapi_schema
app.add_event_handler("startup", remove_422_responses)
#custom error handler
@app.exception_handler(CustomException)
async def custom_exception_handler(request, exc: CustomException):
@ -113,5 +121,5 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
)
app.include_router(api_app.router,tags=["apps"])
app.include_router(api_settings.router,tags=["settings"])
app.include_router(api_proxy.router,tags=["proxys"])
app.include_router(api_proxy.router,tags=["proxys"])
app.include_router(api_settings.router,tags=["settings"])

View file

@ -19,3 +19,4 @@ class AppResponse(BaseModel):
gitConfig: dict[str, Any] = Field({}, description="Git configuration")
containers: List[dict] = Field([], description="Containers")
volumes: List[dict] = Field([], description="Volumes")
error:Optional[str] = Field(None,description="Error message",example="Internal Server Error")

View file

@ -15,15 +15,19 @@ class PortainerSetting(BaseModel):
user_name: str = Field(..., title="The user name for portainer")
user_pwd: str = Field(..., title="The user password for portainer")
class DockerLibrarySetting(BaseModel):
path: str = Field(..., title="The path of docker library")
class ApiKeySetting(BaseModel):
key: str = Field(..., title="The api key")
class AppMediaSetting(BaseModel):
path: str = Field(..., title="The path of app media")
class Domain(BaseModel):
wildcard_domain: str = Field(None, title="The domain name")
class Cockpit(BaseModel):
port: int = Field(..., title="The port of cockpit")
class AppSettings(BaseModel):
nginx_proxy_manager: NginxProxyManagerSetting
gitea: GiteaSetting
portainer: PortainerSetting
docker_library: DockerLibrarySetting
app_media: AppMediaSetting
api_key: ApiKeySetting
domain: Domain
cockpit: Cockpit

View file

@ -11,14 +11,18 @@ class APIKeyManager:
Methods:
generate_key: Generate a new API key.
delete_key: Delete the API key.
get_key: Get the API key.
"""
def generate_key(self):
"""
Generate a new API key.
"""
try:
# Generate a random string
base = secrets.token_urlsafe(32)
# Hash the string
key = hashlib.sha256(base.encode()).hexdigest()
# Save the key
ConfigManager().set_value('api_key', 'key', key)
return key
except Exception as e:

View file

@ -3,12 +3,15 @@ import ipaddress
import json
import os
import shutil
import random
from datetime import datetime
from src.core.config import ConfigManager
from src.core.envHelper import EnvHelper
from src.core.exception import CustomException
from src.schemas.appInstall import appInstall
from src.schemas.appResponse import AppResponse
from src.services.common_check import check_appId, check_appName_and_appVersion, check_domain_names, check_endpointId
from src.schemas.errorResponse import ErrorResponse
from src.services.common_check import check_endpointId, install_validate
from src.services.git_manager import GitManager
from src.services.gitea_manager import GiteaManager
from src.services.portainer_manager import PortainerManager
@ -16,7 +19,7 @@ from src.core.logger import logger
from src.services.proxy_manager import ProxyManager
from src.utils.file_manager import FileHelper
from src.utils.password_generator import PasswordGenerator
from src.services.app_status import appInstalling, appInstallingError,start_app_installation,remove_app_installation,modify_app_information,remove_app_from_errors
class AppManger:
def get_catalog_apps(self,locale:str):
@ -28,11 +31,11 @@ class AppManger:
"""
try:
# Get the app media path
base_path = ConfigManager().get_value("app_media", "path")
base_path = ConfigManager("system.ini").get_value("app_media", "path")
app_media_path = base_path + 'catalog_' + locale + '.json'
# check the app media path is exists
if not os.path.exists(app_media_path):
logger.error(f"Get catalog apps error: {app_media_path} is not exists")
logger.error(f"Get app'catalog error: {app_media_path} is not exists")
raise CustomException()
# Get the app catalog list
@ -40,7 +43,7 @@ class AppManger:
data = json.load(f)
return data
except (CustomException,Exception) as e:
logger.error(f"Get catalog apps error:{e}")
logger.error(f"Get app'catalog error:{e}")
raise CustomException()
def get_available_apps(self,locale:str):
@ -52,7 +55,7 @@ class AppManger:
"""
try:
# Get the app media path
base_path = ConfigManager().get_value("app_media", "path")
base_path = ConfigManager("system.ini").get_value("app_media", "path")
app_media_path = base_path + 'product_' + locale + '.json'
# check the app media path is exists
if not os.path.exists(app_media_path):
@ -81,7 +84,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
try:
# Set the apps info for response
@ -119,7 +125,7 @@ class AppManger:
if "websoft9" in not_stacks:
not_stacks.remove("websoft9")
# Set the not stacks info
# Set the not_stacks info to apps_info
for not_stack in not_stacks:
not_stack_response = AppResponse(
app_id = not_stack,
@ -127,6 +133,34 @@ class AppManger:
)
apps_info.append(not_stack_response)
# Get the installing apps(if app is in installing and in stasks or not_stacks,remove it)
for app_uuid,app in appInstalling.items():
app_response = AppResponse(
app_id = app.get("app_id", None),
status = app.get("status", None),
app_name = app.get("app_name", None),
app_official = app.get("app_official", None),
error = app.get("error", None),
)
if app_response.app_id in not_stacks:
# If app_id is in not_stacks, remove the corresponding AppResponse from apps_info
apps_info = [app_info for app_info in apps_info if app_info.app_id != app_response.app_id]
if any(app_info.app_id == app_response.app_id for app_info in apps_info):
#从apps_info中删除app_id对应的AppResponse
apps_info = [app_info for app_info in apps_info if app_info.app_id != app_response.app_id]
apps_info.append(app_response)
# Get the installing error apps
for app_uuid,app in list(appInstallingError.items()):
app_response = AppResponse(
app_id = app.get("app_id", None),
status = app.get("status", None),
app_name = app.get("app_name", None),
app_official = app.get("app_official", None),
error = app.get("error", None),
)
apps_info.append(app_response)
return apps_info
except (CustomException,Exception) as e:
logger.error(f"Get apps error:{e}")
@ -143,7 +177,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -277,42 +314,55 @@ class AppManger:
appInstall (appInstall): The app install info.
endpointId (int, optional): The endpoint id. Defaults to None.
"""
# Get the library path
library_path = ConfigManager().get_value("docker_library", "path")
# Get the portainer and gitea manager
portainerManager = PortainerManager()
giteaManager = GiteaManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
# validate the app_name and app_version
# Get the info from appInstall
app_name = appInstall.app_name
app_version = appInstall.edition.version
check_appName_and_appVersion(app_name,app_version,library_path)
# validate the app_id
app_id = appInstall.app_id
check_appId(app_id,endpointId,giteaManager,portainerManager)
# validate the domain_names
proxy_enabled = appInstall.proxy_enabled
domain_names = appInstall.domain_names
if proxy_enabled:
check_domain_names(domain_names)
# Check the endpointId is exists.
if endpointId is None:
# Get the local endpointId
endpointId = portainerManager.get_local_endpoint_id()
# add app to appInstalling
app_uuid = start_app_installation(appInstall.app_id, appInstall.app_name)
# Install app - Step 1 : create repo in gitea
repo_url = giteaManager.create_repo(app_id)
try:
repo_url = giteaManager.create_repo(app_id)
except CustomException as e:
# modify app status: error
modify_app_information(app_uuid,e.details)
raise
except Exception as e:
# modify app status: error
modify_app_information(app_uuid,"Create repo error")
logger.error(f"Create repo error:{e}")
raise CustomException()
# Install app - Step 2 : initialize local git repo and push to gitea
try:
# The source directory.
library_path = ConfigManager("system.ini").get_value("docker_library", "path")
local_path = f"{library_path}/{app_name}"
# Create a temporary directory.
app_tmp_dir = "/tmp"
app_tmp_dir_path = f"{app_tmp_dir}/{app_name}"
# Get system time
now = datetime.now()
# Convert the time to a string
timestamp_str = now.strftime("%Y%m%d%H%M%S%f")
# Generate a random number
rand_num = random.randint(1000, 9999)
# 将时间戳和随机数添加到 app_name 后面
app_tmp_dir_path = f"{app_tmp_dir}/{app_name}_{timestamp_str}_{rand_num}"
# If the temporary directory does not exist, create it.
if not os.path.exists(app_tmp_dir):
@ -339,9 +389,18 @@ class AppManger:
# Commit and push to remote repo
self._init_local_repo_and_push_to_remote(app_tmp_dir_path,repo_url)
except (CustomException,Exception) as e:
except CustomException as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
# modify app status: error
modify_app_information(app_uuid,e.details)
raise
except Exception as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
# modify app status: error
modify_app_information(app_uuid,"Initialize repo error")
logger.error(f"Initialize repo error:{e}")
raise CustomException()
# Install app - Step 3 : create stack in portainer
@ -355,9 +414,18 @@ class AppManger:
# Get the stack_id
stack_id = stack_info.get("Id")
except (CustomException,Exception) as e:
except CustomException as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
# modify app status: error
modify_app_information(app_uuid,e.details)
raise
except Exception as e:
# Rollback: remove repo in gitea
giteaManager.remove_repo(app_id)
# modify app status: error
modify_app_information(app_uuid,"Create stack error")
logger.error(f"Create stack error:{e}")
raise CustomException()
# Install app - Step 4 : create proxy in nginx proxy manager
@ -378,17 +446,43 @@ class AppManger:
else:
# Create proxy in nginx proxy manager
ProxyManager().create_proxy_by_app(domain_names,app_id,forward_port,forward_scheme=forward_scheme)
except (CustomException,Exception) as e:
except CustomException as e:
# Rollback-1: remove repo in gitea
giteaManager.remove_repo(app_id)
# Rollback-2: remove stack in portainer
portainerManager.remove_stack_and_volumes(stack_id,endpointId)
# modify app status: error
modify_app_information(app_uuid,e.details)
raise
except Exception as e:
# Rollback-1: remove repo in gitea
giteaManager.remove_repo(app_id)
# Rollback-2: remove stack in portainer
portainerManager.remove_stack_and_volumes(stack_id,endpointId)
# modify app status: error
modify_app_information(app_uuid,"Create proxy error")
logger.error(f"Create proxy error:{e}")
raise CustomException()
# remove app from installing
remove_app_installation(app_uuid)
# Get the app info
# try:
# result = self.get_app_by_id(app_id,endpointId)
# except CustomException as e:
# modify_app_information(app_uuid,e.details)
# raise
# except Exception as e:
# modify_app_information(app_uuid,"Get app info error")
# logger.error(f"Get app info error:{e}")
# raise CustomException()
# Remove the tmp dir
shutil.rmtree(app_tmp_dir_path)
return self.get_app_by_id(app_id,endpointId)
logger.access(f"Successfully installed app: [{app_id}] and created domains:{domain_names}")
# return result
def redeploy_app(self,app_id:str,pull_image:bool,endpointId:int = None):
"""
@ -402,7 +496,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -425,6 +522,7 @@ class AppManger:
user_pwd = ConfigManager().get_value("gitea","user_pwd")
# redeploy stack
portainerManager.redeploy_stack(stack_id,endpointId,pull_image,user_name,user_pwd)
logger.access(f"Successfully redeployed app: [{app_id}]")
def uninstall_app(self,app_id:str,purge_data:bool,endpointId:int = None):
"""
@ -438,7 +536,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -496,9 +597,11 @@ class AppManger:
)
# remove stack and volumes
portainerManager.remove_stack_and_volumes(stack_id,endpointId)
logger.access(f"Successfully uninstalled app: [{app_id}] and removed all data")
else:
# down stack
portainerManager.down_stack(stack_id,endpointId)
logger.access(f"Successfully uninstalled app: [{app_id}] and keep data")
def remove_app(self,app_id:str,endpointId:int = None):
"""
@ -511,7 +614,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -552,6 +658,8 @@ class AppManger:
# remove stack and volumes
portainerManager.remove_stack_and_volumes(stack_id,endpointId)
logger.access(f"Successfully removed app: [{app_id}]")
def start_app(self,app_id:str,endpointId:int = None):
"""
Start app
@ -563,7 +671,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -591,6 +702,7 @@ class AppManger:
)
# start stack
portainerManager.start_stack(app_id,endpointId)
logger.access(f"Successfully started app: [{app_id}]")
def stop_app(self,app_id:str,endpointId:int = None):
"""
@ -603,7 +715,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -631,6 +746,7 @@ class AppManger:
)
# stop stack
portainerManager.stop_stack(app_id,endpointId)
logger.access(f"Successfully stopped app: [{app_id}]")
def restart_app(self,app_id:str,endpointId:int = None):
"""
@ -643,7 +759,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# validate the app_id is exists in portainer
is_stack_exists = portainerManager.check_stack_exists(app_id,endpointId)
@ -671,6 +790,7 @@ class AppManger:
)
# restart stack
portainerManager.restart_stack(app_id,endpointId)
logger.access(f"Successfully restarted app: [{app_id}]")
def get_proxys_by_app(self,app_id:str,endpointId:int = None):
"""
@ -684,7 +804,10 @@ class AppManger:
proxyManager = ProxyManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# Check the app_id is exists
stack_info = portainerManager.get_stack_by_name(app_id,endpointId)
@ -710,7 +833,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# Check the app_id is exists
stack_info = portainerManager.get_stack_by_name(app_id,endpointId)
@ -729,8 +855,8 @@ class AppManger:
details=f"{app_id} is inactive, can not create proxy,you can redeploy it"
)
# Check the domain_names
check_domain_names(domain_names)
# Check the domain_names is exists
# check_domain_names(domain_names)
# Get the forward port
stack_env = self.get_app_by_id(app_id,endpointId).env
@ -742,6 +868,7 @@ class AppManger:
# Get the forward scheme form env file: http or https
proxy_host = proxyManager.create_proxy_by_app(domain_names,app_id,forward_port)
if proxy_host:
logger.access(f"Successfully created domains:{domain_names} for app: [{app_id}]")
return proxy_host
else:
logger.error(f"Create app:{app_id} proxy error")
@ -765,7 +892,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# Check the app_id is exists
stack_info = portainerManager.get_stack_by_name(app_id,endpointId)
@ -776,8 +906,8 @@ class AppManger:
details=f"{app_id} Not Found"
)
# Get the domain_names by app_id from nginx proxy manager
domain_names = proxyManager.get_proxy_host_by_app(app_id)
if not domain_names:
host = proxyManager.get_proxy_host_by_app(app_id)
if not host:
raise CustomException(
status_code=400,
message="Invalid Request",
@ -786,6 +916,7 @@ class AppManger:
# Remove proxy
proxyManager.remove_proxy_host_by_app(app_id)
logger.access(f"Successfully removed all domains for app: [{app_id}]")
def remove_proxy_by_id(self,proxy_id:int):
"""
@ -804,6 +935,7 @@ class AppManger:
)
# Remove proxy
ProxyManager().remove_proxy_host_by_id(proxy_id)
logger.access(f"Successfully removed domains:{host['domain_names']} for app: [{host['forward_host']}]")
def update_proxy_by_app(self,proxy_id:str,domain_names:list[str],endpointId:int = None):
"""
@ -818,7 +950,10 @@ class AppManger:
portainerManager = PortainerManager()
# Check the endpointId is exists.
endpointId = check_endpointId(endpointId, portainerManager)
if endpointId:
check_endpointId(endpointId, portainerManager)
else:
endpointId = portainerManager.get_local_endpoint_id()
# Check the proxy id is exists
host = proxyManager.get_proxy_host_by_id(proxy_id)
@ -832,7 +967,9 @@ class AppManger:
# check_domain_names(domain_names)
# Update proxy
return proxyManager.update_proxy_by_app(proxy_id,domain_names)
result = proxyManager.update_proxy_by_app(proxy_id,domain_names)
logger.access(f"Successfully updated domains:{domain_names} for app: [{host['forward_host']}]")
return result
def _init_local_repo_and_push_to_remote(self,local_path:str,repo_url:str):
"""

View file

@ -0,0 +1,42 @@
import uuid
appInstalling = {} # app installing
appInstallingError = {} # app install error
# Add app to appInstalling
def start_app_installation(app_id, app_name):
app_uuid = str(uuid.uuid4())
app = {
"app_id": app_id,
"app_name": app_name,
"app_official": True,
"status": 3, # installing
}
appInstalling[app_uuid] = app
return app_uuid
# Add app to appInstallingError
def modify_app_information(app_uuid, error):
# If the app is in appInstalling, remove it
if app_uuid in appInstalling:
app = appInstalling.pop(app_uuid)
app["status"] = 4 # error
app["error"] = error
appInstallingError[app_uuid] = app
# If the app is not in appInstalling but in appInstallingError, modify it
elif app_uuid in appInstallingError:
app = appInstallingError[app_uuid]
app["status"] = 4 # error
app["error"] = error
# Remove app from appInstalling
def remove_app_installation(app_uuid):
if app_uuid in appInstalling:
appInstalling.pop(app_uuid)
# Remove app from appInstallingError
def remove_app_from_errors(app_uuid):
if app_uuid in appInstallingError:
appInstallingError.pop(app_uuid)

View file

@ -1,31 +1,36 @@
import os
import json
from src.core.config import ConfigManager
from src.core.logger import logger
from src.core.exception import CustomException
from src.schemas.appInstall import appInstall
from src.services.gitea_manager import GiteaManager
from src.services.portainer_manager import PortainerManager
from src.services.proxy_manager import ProxyManager
from src.services.app_status import appInstalling,appInstallingError
def check_appName_and_appVersion(app_name:str, app_version:str,library_path:str):
def check_appName_and_appVersion(app_name:str, app_version:str):
"""
Check the app_name and app_version is exists in docker library
Args:
app_name (str): App Name
app_version (str): App Version
Raises:
CustomException: If the app_name or app_version is not exists in docker library
"""
if not os.path.exists(f"{library_path}/{app_name}"):
logger.error(f"When install app:{app_name}, the app is not exists in docker library")
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"app_name:{app_name} not supported",
)
else:
try:
# Get docker library path
library_path = ConfigManager("system.ini").get_value("docker_library", "path")
if not os.path.exists(f"{library_path}/{app_name}"):
logger.error(f"When install app:{app_name}, the app is not exists in docker library")
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"app_name:{app_name} not supported",
)
else:
with open(f"{library_path}/{app_name}/variables.json", "r") as f:
variables = json.load(f)
community_editions = [d for d in variables["edition"] if d["dist"] == "community"]
@ -38,6 +43,11 @@ def check_appName_and_appVersion(app_name:str, app_version:str,library_path:str)
message="Invalid Request",
details=f"app_version:{app_version} not supported",
)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"When install app:{app_name}, validate app_name and app_version error:{e}")
raise CustomException()
def check_appId(app_id:str,endpointId:int,giteaManager:GiteaManager,portainerManager:PortainerManager):
"""
@ -50,14 +60,33 @@ def check_appId(app_id:str,endpointId:int,giteaManager:GiteaManager,portainerMan
Raises:
CustomException: If the app_id is exists in gitea or portainer
"""
# validate the app_id is installing
for app_uuid,app in appInstalling.items():
if app_id == app.get("app_id", None):
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"{app_id} is installing"
)
# validate the app_id is installing error
for app_uuid,app in appInstallingError.items():
if app_id == app.get("app_id", None):
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"The app with the same name has already failed to install. Please check in 'My Apps'."
)
# validate the app_id is exists in gitea
is_repo_exists = giteaManager.check_repo_exists(app_id)
if is_repo_exists:
logger.error(f"When install app,the app_id:{{app_id}} is exists in gitea")
logger.error(f"When install app,the app_id:{app_id} is exists in gitea")
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"App_id:{app_id} is exists in gitea"
details=f"App_id:{app_id} is exists(in gitea)"
)
# validate the app_id is exists in portainer
@ -67,9 +96,9 @@ def check_appId(app_id:str,endpointId:int,giteaManager:GiteaManager,portainerMan
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"app_id:{app_id} is exists in portainer"
details=f"app_id:{app_id} is exists(in portainer)"
)
def check_domain_names(domain_names:list[str]):
"""
Check the domain_names is exists in proxy
@ -82,27 +111,63 @@ def check_domain_names(domain_names:list[str]):
"""
ProxyManager().check_proxy_host_exists(domain_names)
def check_endpointId(endpointId, portainerManager):
def check_endpointId(endpointId:int, portainerManager):
"""
Check the endpointId is exists
Args:
endpointId ([type]): [description]
portainerManager ([type]): [description]
endpointId (int): Endpoint Id
portainerManager (PortainerManager): Portainer Manager
Raises:
CustomException: If the endpointId is not exists
"""
if endpointId is None:
# Get the local endpointId
endpointId = portainerManager.get_local_endpoint_id()
else :
# validate the endpointId is exists
# validate the endpointId is exists
if endpointId:
is_endpointId_exists = portainerManager.check_endpoint_exists(endpointId)
if not is_endpointId_exists:
logger.error(f"EndpointId:{endpointId} Not Found")
raise CustomException(
status_code=400,
message="Invalid Request",
details="EndpointId Not Found"
)
return endpointId
def install_validate(appInstall:appInstall,endpointId:int):
"""
before install app, check the appInstall is valid
Args:
appInstall (appInstall): App Install
Raises:
CustomException: If the appInstall is not valid
"""
try:
portainerManager = PortainerManager()
giteaManager = GiteaManager()
# Get the app_name and app_version
app_name = appInstall.app_name
app_version = appInstall.edition.version
proxy_enabled = appInstall.proxy_enabled
domain_names = appInstall.domain_names
app_id = appInstall.app_id
# Check the app_name and app_version is exists in docker library
check_appName_and_appVersion(app_name, app_version)
# Check the app_id is exists in gitea and portainer
check_appId(app_id, endpointId, giteaManager, portainerManager)
# Check the domain_names is exists in proxy
if proxy_enabled:
check_domain_names(domain_names)
# Check the endpointId is exists
check_endpointId(endpointId, portainerManager)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"When install app, validate appInstall error:{e}")
raise CustomException()

View file

@ -10,6 +10,13 @@ from src.external.gitea_api import GiteaAPI
class GiteaManager:
"""
Gitea Manager
Methods:
check_repo_exists: Check repo is exist.
create_repo: Create repository.
get_file_content_from_repo: Get file content from repository.
update_file_in_repo: Update file in repository.
remove_repo: Remove repository.
"""
def __init__(self):
"""
@ -65,17 +72,23 @@ class GiteaManager:
response = self.gitea.create_repo(repo_name)
if response.status_code == 201:
repo_json = response.json()
# 将repo_json字符串中:localhost/w9git 替换为:websoft9-git:3000
url = repo_json["clone_url"].replace("localhost/w9git","websoft9-git:3000")
# http://localhost/w9git/websoft9/test.git
# http://websoft9-git:3000/websoft9/wp.git
return url
else:
logger.error(f"Create repo:{repo_name} error:{response.status_code}:{response.text}")
raise CustomException()
def get_file_content_from_repo(self, repo_name: str, file_path: str):
"""
Get file content from repository
Args:
repo_name (str): Repository name
file_path (str): File path
Returns:
dict: File content
"""
response = self.gitea.get_file_content_from_repo(repo_name, file_path)
if response.status_code == 200:
response_json = response.json() # The gitea Api: if the repo is empty, the response is: []
@ -93,12 +106,27 @@ class GiteaManager:
raise CustomException()
def update_file_in_repo(self, repo_name: str, file_path: str, content: str,sha: str):
"""
Update file in repository
Args:
repo_name (str): Repository name
file_path (str): File path
content (str): File content
sha (str): File sha
"""
response = self.gitea.update_file_content_in_repo(repo_name, file_path, content, sha)
if response.status_code != 201:
logger.error(f"Update file:{file_path} content in repo:{repo_name} error:{response.status_code}:{response.text}")
raise CustomException()
def remove_repo(self, repo_name: str):
"""
Remove repository
Args:
repo_name (str): Repository name
"""
response = self.gitea.remove_repo(repo_name)
if response.status_code != 204:
logger.error(f"Remove repo:{repo_name} error:{response.status_code}:{response.text}")

View file

@ -1,8 +1,4 @@
import json
import time
import jwt
import keyring
from src.core.config import ConfigManager
from src.core.exception import CustomException
from src.external.portainer_api import PortainerAPI
from src.core.logger import logger
@ -16,7 +12,6 @@ class PortainerManager:
portainer (PortainerAPI): Portainer API
Methods:
_set_portainer_token(): Set Portainer token
get_local_endpoint_id(): Get local endpoint id
check_endpoint_exists(endpoint_id): Check endpoint exists
check_stack_exists(stack_name, endpoint_id): Check stack exists
@ -32,59 +27,10 @@ class PortainerManager:
def __init__(self):
try:
self.portainer = PortainerAPI()
# self._set_portainer_token()
except Exception as e:
logger.error(f"Init Portainer API Error:{e}")
raise CustomException()
def _set_portainer_token(self):
"""
Set Portainer token
"""
service_name = "portainer"
token_name = "user_token"
# Try to get token from keyring
try:
jwt_token = keyring.get_password(service_name, token_name)
except Exception as e:
jwt_token = None
# if the token is got from keyring,vaildate the exp time
if jwt_token is not None:
try:
decoded_jwt = jwt.decode(jwt_token, options={"verify_signature": False})
exp_timestamp = decoded_jwt['exp']
# if the token is not expired, return it
if int(exp_timestamp) - int(time.time()) > 3600:
self.portainer.set_jwt_token(jwt_token)
return
except Exception as e:
logger.error(f"Decode Portainer's Token Error:{e}")
raise CustomException()
# if the token is expired or not got from keyring, get a new one
try:
userName = ConfigManager().get_value("portainer", "user_name")
userPwd = ConfigManager().get_value("portainer", "user_pwd")
except Exception as e:
logger.error(f"Get Portainer's UserName and UserPwd Error:{e}")
raise CustomException()
token_response = self.portainer.get_jwt_token(userName, userPwd)
if token_response.status_code == 200:
jwt_token = token_response.json()["jwt"]
self.portainer.set_jwt_token(jwt_token)
# set new token to keyring
try:
keyring.set_password(service_name, token_name, jwt_token)
except Exception as e:
logger.error(f"Set Portainer's Token To Keyring Error:{e}")
raise CustomException()
else:
logger.error(f"Error Calling Portainer API: {token_response.status_code}:{token_response.text}")
raise CustomException()
def get_local_endpoint_id(self):
"""
Get local endpoint id: the endpoint id of the local docker engine
@ -93,26 +39,41 @@ class PortainerManager:
Returns:
str: local endpoint id
"""
# get all endpoints
response = self.portainer.get_endpoints()
if response.status_code == 200:
endpoints = response.json()
local_endpoint = None
for endpoint in endpoints:
# find the local endpoint
if endpoint["URL"] == "unix:///var/run/docker.sock":
if local_endpoint is None:
if local_endpoint is None: # if there is only one local endpoint, return it
local_endpoint = endpoint
elif endpoint["Id"] < local_endpoint["Id"]:
elif endpoint["Id"] < local_endpoint["Id"]: # if there are multiple local endpoints, return the one with the smallest id
local_endpoint = endpoint
if local_endpoint is not None:
if local_endpoint is not None:
return local_endpoint["Id"]
else:
logger.error(f"Can't find local endpoint")
raise CustomException()
logger.error(f"Get local endpoint id error: Local endpoint is not exist")
raise CustomException(
status_code=400,
message="Invalid Request",
details="Local endpoint is not exist"
)
else:
logger.error(f"Get local endpoint id error: {response.status_code}:{response.text}")
raise CustomException()
def check_endpoint_exists(self, endpoint_id: int):
"""
Check endpoint exists
Args:
endpoint_id (int): endpoint id
Returns:
bool: endpoint exists or not
"""
response = self.portainer.get_endpoint_by_id(endpoint_id)
if response.status_code == 200:
return True
@ -123,6 +84,17 @@ class PortainerManager:
raise CustomException()
def check_stack_exists(self, stack_name: str, endpoint_id: int):
"""
Check stack exists
Args:
stack_name (str): stack name
endpoint_id (int): endpoint id
Returns:
bool: stack exists or not
"""
# get all stacks
response = self.portainer.get_stacks(endpoint_id)
if response.status_code == 200:
stacks = response.json()

View file

@ -9,7 +9,20 @@ from src.external.nginx_proxy_manager_api import NginxProxyManagerAPI
class ProxyManager:
"""
Proxy Manager
This class is used to manage proxy hosts
Attributes:
nginx (NginxProxyManagerAPI): The Nginx Proxy Manager API instance
Methods:
check_proxy_host_exists: Check proxy host is exist
create_proxy_by_app: Create a proxy host
update_proxy_by_app: Update a proxy host
get_proxy_host_by_app: Get proxy host by app
remove_proxy_host_by_app: Remove proxy host by app
remove_proxy_host_by_id: Remove proxy host by id
get_proxy_hosts: Get proxy hosts
get_proxy_host_by_id: Get proxy host by id
"""
def __init__(self):
"""
@ -74,7 +87,29 @@ class ProxyManager:
raise CustomException()
else:
raise CustomException()
def _handler_nginx_error(self,response):
"""
Handler Nginx Proxy Manager API Error
Args:
response (Response): Response
"""
# If status_code is 500, raise CustomException
if response.status_code == 500:
logger.error(f"Nginx Proxy Manager API Error:{response.status_code}:{response.text}")
raise CustomException()
else:
# Get error message from response
response_dict = json.loads(response.text)
error_dict = response_dict.get('error', {})
details = error_dict.get('message','Unknown Error')
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=details
)
def check_proxy_host_exists(self,domain_names: list[str]):
"""
Check proxy host is exist
@ -86,23 +121,41 @@ class ProxyManager:
bool: True if proxy host is exist, False if proxy host is not exist, raise exception if error
"""
response = self.nginx.get_proxy_hosts()
if response.status_code == 200:
proxy_hosts = response.json()
matching_domains = []
for proxy_host in proxy_hosts:
matching_domains += [domain for domain in domain_names if domain in proxy_host.get("domain_names", [])]
try:
if response.status_code == 200:
proxy_hosts = response.json()
matching_domains = []
for proxy_host in proxy_hosts:
matching_domains += [domain for domain in domain_names if domain in proxy_host.get("domain_names", [])]
if matching_domains:
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=f"{matching_domains} already used"
)
else:
logger.error(f"Check proxy host:{domain_names} exists error:{response.status_code}:{response.text}")
if matching_domains:
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=f"{matching_domains} already used"
)
else:
self._handler_nginx_error(response)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"Check proxy host:{domain_names} exists error:{e}")
raise CustomException()
def create_proxy_by_app(self,domain_names: list[str],forward_host: str,forward_port: int,advanced_config: str = "",forward_scheme: str = "http"):
"""
Create a proxy host
Args:
domain_names (list[str]): Domain names
forward_host (str): Forward host
forward_port (int): Forward port
advanced_config (str, optional): Advanced config. Defaults to "".
forward_scheme (str, optional): Forward scheme. Defaults to "http".
Returns:
dict: Proxy host
"""
response = self.nginx.create_proxy_host(
domain_names=domain_names,
forward_scheme=forward_scheme,
@ -110,97 +163,130 @@ class ProxyManager:
forward_port=forward_port,
advanced_config=advanced_config,
)
# if response.status_code == 201:
# return response.json()
# elif response.status_code == 500:
# logger.error(f"Create proxy for app:{forward_host} error:{response.status_code}:{response.text}")
# raise CustomException()
# else:
# logger.error(f"Create proxy for app:{forward_host} error:{response.status_code}:{response.text}")
# raise CustomException(
# status_code=400,
# message=f"Invalid Request",
# details=f"{json.loads(response.text).get('error',{}).get('message','Unknown Error')}"
# )
if response.status_code != 201:
logger.error(f"Create proxy for app:{forward_host} error:{response.status_code}:{response.text}")
raise CustomException()
self._handler_nginx_error(response)
else:
return response.json()
def update_proxy_by_app(self,proxy_id:int,domain_names: list[str]):
"""
Update a proxy host
Args:
proxy_id (int): Proxy id
domain_names (list[str]): Domain names
Returns:
dict: Proxy host
"""
# Get proxy host by id
req_json = self.get_proxy_host_by_id(proxy_id)
if req_json is None:
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=f"Proxy host:{proxy_id} not found"
)
# update domain_names
req_json["domain_names"] = domain_names
keys_to_delete = ["id","created_on","modified_on","owner_user_id","enabled","certificate","owner","access_list","use_default_location","ipv6"]
for key in keys_to_delete:
req_json.pop(key, None)
try:
if req_json is None:
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=f"Proxy host:{proxy_id} not found"
)
# update domain_names
req_json["domain_names"] = domain_names
# delete useless keys from req_json(because the req_json is from get_proxy_host_by_id and update_proxy_host need less keys)
keys_to_delete = ["id","created_on","modified_on","owner_user_id","enabled","certificate","owner","access_list","use_default_location","ipv6"]
for key in keys_to_delete:
req_json.pop(key, None)
response = self.nginx.update_proxy_host(proxy_id=proxy_id, json=req_json)
if response.status_code == 200:
return response.json()
elif response.status_code == 500:
logger.error(f"Update proxy for app:{req_json['forward_host']} error:{response.status_code}:{response.text}")
response = self.nginx.update_proxy_host(proxy_id=proxy_id, json=req_json)
if response.status_code == 200:
return response.json()
else:
self._handler_nginx_error(response)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"Update proxy host:{proxy_id} error:{e}")
raise CustomException()
else:
logger.error(f"Update proxy for app:{req_json['forward_host']} error:{response.status_code}:{response.text}")
response_dict = json.loads(response.text)
error_dict = response_dict.get('error', {})
details = error_dict.get('message')
raise CustomException(
status_code=400,
message=f"Invalid Request",
details=details
)
def get_proxy_host_by_app(self,app_id:str):
"""
Get proxy host by app
Args:
app_id (str): App id
Returns:
list[dict]: Proxy hosts
"""
response = self.nginx.get_proxy_hosts()
if response.status_code == 200:
proxys_host = response.json()
proxy_result = []
for proxy_host in proxys_host:
if proxy_host.get("forward_host") == app_id:
# proxy_data = {
# "proxy_id": proxy_host.get("id"),
# "domain_names": proxy_host.get("domain_names")
# }
proxy_result.append(proxy_host)
return proxy_result
else:
logger.error(f"Get proxy host by app:{app_id} error:{response.status_code}:{response.text}")
try:
if response.status_code == 200:
proxys_host = response.json()
proxy_result = []
for proxy_host in proxys_host:
if proxy_host.get("forward_host") == app_id:
proxy_result.append(proxy_host)
return proxy_result
else:
self._handler_nginx_error(response)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"Get proxy host by app:{app_id} error:{e}")
raise CustomException()
def remove_proxy_host_by_app(self,app_id:str):
proxy_hosts = self.get_proxy_host_by_app(app_id)
if proxy_hosts:
for proxy_host in proxy_hosts:
response = self.nginx.delete_proxy_host(proxy_host.get("id"))
if response.status_code != 200:
logger.error(f"Remove proxy host:{proxy_host.get('id')} for app:{app_id} error:{response.status_code}:{response.text}")
raise CustomException()
"""
Remove proxy host by app
def remove_proxy_host_by_id(self,proxy_id:int):
response = self.nginx.delete_proxy_host(proxy_id)
if response.status_code != 200:
logger.error(f"Remove proxy host:{proxy_id} error:{response.status_code}:{response.text}")
Args:
app_id (str): App id
"""
proxy_hosts = self.get_proxy_host_by_app(app_id)
try:
if proxy_hosts:
for proxy_host in proxy_hosts:
response = self.nginx.delete_proxy_host(proxy_host.get("id"))
if response.status_code != 200:
self._handler_nginx_error(response)
except CustomException as e:
raise e
except Exception as e:
logger.error(f"Remove proxy host by app:{app_id} error:{e}")
raise CustomException()
def remove_proxy_host_by_id(self,proxy_id:int):
"""
Remove proxy host by id
Args:
proxy_id (int): Proxy id
"""
response = self.nginx.delete_proxy_host(proxy_id)
if response.status_code != 200:
self._handler_nginx_error(response)
def get_proxy_hosts(self):
"""
Get proxy hosts
Returns:
list[dict]: Proxy hosts
"""
response = self.nginx.get_proxy_hosts()
if response.status_code == 200:
return response.json()
else:
logger.error(f"Get proxy hosts error:{response.status_code}:{response.text}")
raise CustomException()
self._handler_nginx_error(response)
def get_proxy_host_by_id(self,proxy_id:int):
"""
Get proxy host by id
Args:
proxy_id (int): Proxy id
Returns:
dict: Proxy host
"""
proxy_hosts = self.get_proxy_hosts()
try:
for proxy_host in proxy_hosts:
@ -209,4 +295,5 @@ class ProxyManager:
return None
except Exception as e:
logger.error(f"Get proxy host by id:{proxy_id} error:{e}")
raise CustomException()
raise CustomException()

View file

@ -7,17 +7,40 @@ from src.core.logger import logger
from src.schemas.appSettings import AppSettings
class SettingsManager:
"""
Settings Manager
This class is used to read and write settings from the config file
Attributes:
config_file_path (str): The absolute path of the config file
config (ConfigParser): The config parser object
Methods:
read_all: Read all the settings from the config file
write_all: Write all the settings to the config file
read_section: Read a section from the config file
read_key: Read a key from a section in the config file
write_section: Write a key value pair to a section in the config file
"""
def __init__(self):
# Get the absolute path of the current file
script_dir = os.path.dirname(os.path.realpath(__file__))
# Get the absolute path of the config directory
config_dir = os.path.join(script_dir, "../config")
# Set the absolute path of the config file
self.config_file_path = os.path.join(config_dir, "config.ini")
self.config_file_path = os.path.abspath(self.config_file_path)
self.config = configparser.ConfigParser()
def read_all(self) -> Dict[str, Dict[str, str]]:
"""
Read all the settings from the config file
"""
try:
# Read the config file
self.config.read(self.config_file_path)
data = {s:dict(self.config.items(s)) for s in self.config.sections()}
return AppSettings(**data)
@ -26,16 +49,36 @@ class SettingsManager:
raise CustomException()
def write_all(self, data: AppSettings):
"""
Write all the settings to the config file
Args:
data (AppSettings): The settings to be written to the config file
"""
# Read the config file
for section, kv in data.model_dump().items():
# Add section if not exist
if section not in self.config.sections():
self.config.add_section(section)
# Update the key value pair
for key, value in kv.items():
self.config.set(section, key, value)
# Write the config file
with open(self.filename, 'w') as configfile:
self.config.write(configfile)
def read_section(self, section: str) -> Dict[str, str]:
"""
Read a section from the config file
Args:
section (str): The section to be read from the config file
Returns:
Dict[str, str]: The key value pairs of the section
"""
try:
# Read the config file
self.config.read(self.config_file_path)
if section not in self.config.sections():
raise CustomException(
@ -47,18 +90,31 @@ class SettingsManager:
except CustomException as e:
raise e
except Exception as e:
logger.error(e)
logger.error("Error in read_section:"+str(e))
raise CustomException()
def read_key(self, section: str, key:str) -> str:
"""
Read a key from a section in the config file
Args:
section (str): The section to be read from the config file
key (str): The key to be read from the section in the config file
Returns:
str: The value of the key
"""
try:
# Read the config file
self.config.read(self.config_file_path)
# Check if section exists
if section not in self.config.sections():
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"Section:{section} does not exist"
)
# Check if key exists
if key not in self.config[section]:
raise CustomException(
status_code=400,
@ -73,20 +129,32 @@ class SettingsManager:
raise CustomException()
def write_section(self, section: str, key:str,value:str):
"""
Write a key value pair to a section in the config file
Args:
section (str): The section to be read from the config file
key (str): The key to be written to the section in the config file
value (str): The value to be written to the section in the config file
"""
try:
# Read the config file
self.config.read(self.config_file_path)
# Check if section exists
if section not in self.config.sections():
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"Section:{section} does not exist"
)
# Check if key exists
if key not in self.config[section]:
raise CustomException(
status_code=400,
message="Invalid Request",
details=f"Key:{key} does not exist"
)
# Update the key value pair
self.config.set(section, key, value)
with open(self.config_file_path, 'w') as configfile:
self.config.write(configfile)

View file

@ -10,7 +10,7 @@ class PasswordGenerator:
lowercase_letters = string.ascii_lowercase # all lowercase letters
uppercase_letters = string.ascii_uppercase # all uppercase letters
digits = string.digits # all digits
special_symbols = "`$%()[]{},.*+-:;<>?_~/|\"" # all special symbols
special_symbols = "$%()[],.*+-:;<>?_~/|" # all special symbols
# get 4 random characters from each category
password = [