From ce31a591afbbb76123151fdbe262b2ddaa49ad7c Mon Sep 17 00:00:00 2001 From: wzj <244142824@qq.com> Date: Sat, 14 Jun 2025 12:32:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 48 ++++++++-------- config.py | 30 ++++++++-- database.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 29 deletions(-) create mode 100644 database.py diff --git a/app.py b/app.py index 1c73848..56b0eb7 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,4 @@ +# app.py import os import subprocess from datetime import datetime, timedelta @@ -13,11 +14,28 @@ import zipfile import shutil import re from pypinyin import pinyin, Style -import uuid +from flask_migrate import Migrate + +# 从配置文件中导入配置 +from config import Config app = Flask(__name__) -app.secret_key = 'your-secret-key-here' -from pypinyin import pinyin, Style +app.config.from_object(Config) + +# 确保证书存储目录存在 +os.makedirs(Config.CERT_STORE, exist_ok=True) + +# 初始化数据库 +from database import initialize_database +initialize_database() + +# 初始化数据库迁移 +#migrate = Migrate(app, db) + +# Flask-Login 配置 +login_manager = LoginManager() +login_manager.init_app(app) +login_manager.login_view = 'login' def to_pinyin(text): """将中文转换为拼音""" @@ -33,24 +51,6 @@ def to_pinyin(text): def jinja2_to_pinyin(text): return to_pinyin(text) -# 数据库配置 -db_config = { - 'host': '192.168.31.11', - 'database': 'cert_manager', - 'user': 'root', - 'password': 'Home123#$.' -} - -# Flask-Login 配置 -login_manager = LoginManager() -login_manager.init_app(app) -login_manager.login_view = 'login' - -# 确保证书存储目录存在 -CERT_STORE = os.path.join(os.path.dirname(__file__), 'cert_store') -os.makedirs(CERT_STORE, exist_ok=True) - - class User(UserMixin): pass @@ -58,7 +58,7 @@ class User(UserMixin): @login_manager.user_loader def load_user(user_id): try: - conn = mysql.connector.connect(**db_config) + conn = mysql.connector.connect(**Config.DB_CONFIG) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user_data = cursor.fetchone() @@ -82,7 +82,7 @@ def load_user(user_id): # 辅助函数 def get_db_connection(): try: - conn = mysql.connector.connect(**db_config) + conn = mysql.connector.connect(**Config.DB_CONFIG) return conn except Error as e: print(f"Database connection error: {e}") @@ -1372,4 +1372,4 @@ def download_file(filename): if __name__ == '__main__': - app.run(debug=True, ssl_context='adhoc', host='0.0.0.0', port='9875') \ No newline at end of file + app.run(debug=Config.DEBUG, ssl_context='adhoc', host=Config.APP_HOST, port=Config.APP_PORT) \ No newline at end of file diff --git a/config.py b/config.py index fc8fd60..1c5e436 100644 --- a/config.py +++ b/config.py @@ -1,8 +1,28 @@ +# config.py import os + class Config: - SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here' - SQLALCHEMY_DATABASE_URI = 'sqlite:///cert_manager.db' - SQLALCHEMY_TRACK_MODIFICATIONS = False - CERTS_ROOT = os.path.join(os.path.dirname(__file__), 'certs') - CAPTCHA_ENABLED = True \ No newline at end of file + # Flask配置 + SECRET_KEY = os.getenv('SECRET_KEY', 'your-secret-key-here') + + # 数据库配置 + DB_CONFIG = { + 'host': '192.168.31.11', + 'database': 'cert_manager', + 'user': 'root', + 'password': 'Home123#$.' + } + + # 证书存储路径 + CERT_STORE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cert_store') + + # 管理员初始凭据 + ADMIN_USERNAME = os.getenv('ADMIN_USERNAME', 'admin') + ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD', '123456') + ADMIN_EMAIL = os.getenv('ADMIN_EMAIL', 'admin@example.com') + + # 应用运行配置 + APP_HOST = '0.0.0.0' + APP_PORT = 9875 + DEBUG = True \ No newline at end of file diff --git a/database.py b/database.py new file mode 100644 index 0000000..d9c49ff --- /dev/null +++ b/database.py @@ -0,0 +1,155 @@ +# database.py +import mysql.connector +from mysql.connector import Error +from config import Config + + +def create_database(): + """创建数据库(如果不存在)""" + try: + # 连接到MySQL服务器(不带数据库名) + conn = mysql.connector.connect( + host=Config.DB_CONFIG['host'], + user=Config.DB_CONFIG['user'], + password=Config.DB_CONFIG['password'] + ) + + cursor = conn.cursor() + + # 创建数据库 + cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.DB_CONFIG['database']}") + print(f"数据库 {Config.DB_CONFIG['database']} 已创建或已存在") + + # 切换到新数据库 + cursor.execute(f"USE {Config.DB_CONFIG['database']}") + + # 创建表 + sql_scripts = [ + """ + CREATE TABLE IF NOT EXISTS users ( + id INT AUTO_INCREMENT PRIMARY KEY, + username VARCHAR(50) UNIQUE NOT NULL, + password_hash VARCHAR(128) NOT NULL, + email VARCHAR(100), + is_admin BOOLEAN DEFAULT FALSE, + is_active tinyint(1) DEFAULT '1', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """, + """ + CREATE TABLE IF NOT EXISTS certificate_authorities ( + id INT AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(100) NOT NULL, + common_name VARCHAR(100) NOT NULL, + organization VARCHAR(100), + organizational_unit VARCHAR(100), + country VARCHAR(2), + state VARCHAR(100), + locality VARCHAR(100), + key_size INT DEFAULT 2048, + days_valid INT DEFAULT 3650, + cert_path VARCHAR(255), + key_path VARCHAR(255), + created_by INT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (created_by) REFERENCES users(id) + ) + """, + """ + CREATE TABLE IF NOT EXISTS certificates ( + id INT AUTO_INCREMENT PRIMARY KEY, + common_name VARCHAR(100) NOT NULL, + san_dns TEXT, + san_ip TEXT, + organization VARCHAR(100), + organizational_unit VARCHAR(100), + country VARCHAR(2), + state VARCHAR(100), + locality VARCHAR(100), + key_size INT DEFAULT 2048, + days_valid INT DEFAULT 365, + cert_path VARCHAR(255), + key_path VARCHAR(255), + csr_path VARCHAR(255), + ca_id INT, + status ENUM('active', 'revoked', 'expired') DEFAULT 'active', + created_by INT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NULL, + revoked_at TIMESTAMP NULL, + revocation_reason VARCHAR(255), + FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id), + FOREIGN KEY (created_by) REFERENCES users(id) + ) + """, + """ + CREATE TABLE IF NOT EXISTS certificate_revocation_list ( + id INT AUTO_INCREMENT PRIMARY KEY, + ca_id INT NOT NULL, + crl_path VARCHAR(255), + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + next_update TIMESTAMP NULL, + FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id) + ) + """, + """ + CREATE TABLE IF NOT EXISTS captcha ( + id INT AUTO_INCREMENT PRIMARY KEY, + code VARCHAR(10) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + ] + + for script in sql_scripts: + cursor.execute(script) + + conn.commit() + print("所有表已创建或已存在") + + except Error as e: + print(f"数据库初始化错误: {e}") + raise + finally: + if conn.is_connected(): + cursor.close() + conn.close() + + +def create_admin_user(): + """创建初始管理员用户""" + from werkzeug.security import generate_password_hash + + try: + conn = mysql.connector.connect(**Config.DB_CONFIG) + cursor = conn.cursor(dictionary=True) + + # 检查管理员是否已存在 + cursor.execute("SELECT id FROM users WHERE username = %s", (Config.ADMIN_USERNAME,)) + if cursor.fetchone(): + print("管理员用户已存在") + return + + # 创建管理员 + password_hash = generate_password_hash(Config.ADMIN_PASSWORD) + cursor.execute(""" + INSERT INTO users (username, password_hash, email, is_admin, is_active) + VALUES (%s, %s, %s, %s, %s) + """, (Config.ADMIN_USERNAME, password_hash, Config.ADMIN_EMAIL, True, True)) + + conn.commit() + print("管理员用户创建成功") + + except Error as e: + print(f"创建管理员用户错误: {e}") + raise + finally: + if conn.is_connected(): + cursor.close() + conn.close() + + +def initialize_database(): + """初始化数据库""" + create_database() + create_admin_user() \ No newline at end of file