数据库优化

This commit is contained in:
wzj 2025-06-14 12:32:35 +08:00
parent 357d7814ee
commit ce31a591af
3 changed files with 204 additions and 29 deletions

48
app.py
View File

@ -1,3 +1,4 @@
# app.py
import os
import subprocess
from datetime import datetime, timedelta
@ -13,11 +14,28 @@ import zipfile
import shutil
import re
from pypinyin import pinyin, Style
import uuid
from flask_migrate import Migrate
# 从配置文件中导入配置
from config import Config
app = Flask(__name__)
app.secret_key = 'your-secret-key-here'
from pypinyin import pinyin, Style
app.config.from_object(Config)
# 确保证书存储目录存在
os.makedirs(Config.CERT_STORE, exist_ok=True)
# 初始化数据库
from database import initialize_database
initialize_database()
# 初始化数据库迁移
#migrate = Migrate(app, db)
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
def to_pinyin(text):
"""将中文转换为拼音"""
@ -33,24 +51,6 @@ def to_pinyin(text):
def jinja2_to_pinyin(text):
return to_pinyin(text)
# 数据库配置
db_config = {
'host': '192.168.31.11',
'database': 'cert_manager',
'user': 'root',
'password': 'Home123#$.'
}
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# 确保证书存储目录存在
CERT_STORE = os.path.join(os.path.dirname(__file__), 'cert_store')
os.makedirs(CERT_STORE, exist_ok=True)
class User(UserMixin):
pass
@ -58,7 +58,7 @@ class User(UserMixin):
@login_manager.user_loader
def load_user(user_id):
try:
conn = mysql.connector.connect(**db_config)
conn = mysql.connector.connect(**Config.DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
@ -82,7 +82,7 @@ def load_user(user_id):
# 辅助函数
def get_db_connection():
try:
conn = mysql.connector.connect(**db_config)
conn = mysql.connector.connect(**Config.DB_CONFIG)
return conn
except Error as e:
print(f"Database connection error: {e}")
@ -1372,4 +1372,4 @@ def download_file(filename):
if __name__ == '__main__':
app.run(debug=True, ssl_context='adhoc', host='0.0.0.0', port='9875')
app.run(debug=Config.DEBUG, ssl_context='adhoc', host=Config.APP_HOST, port=Config.APP_PORT)

View File

@ -1,8 +1,28 @@
# config.py
import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here'
SQLALCHEMY_DATABASE_URI = 'sqlite:///cert_manager.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
CERTS_ROOT = os.path.join(os.path.dirname(__file__), 'certs')
CAPTCHA_ENABLED = True
# Flask配置
SECRET_KEY = os.getenv('SECRET_KEY', 'your-secret-key-here')
# 数据库配置
DB_CONFIG = {
'host': '192.168.31.11',
'database': 'cert_manager',
'user': 'root',
'password': 'Home123#$.'
}
# 证书存储路径
CERT_STORE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cert_store')
# 管理员初始凭据
ADMIN_USERNAME = os.getenv('ADMIN_USERNAME', 'admin')
ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD', '123456')
ADMIN_EMAIL = os.getenv('ADMIN_EMAIL', 'admin@example.com')
# 应用运行配置
APP_HOST = '0.0.0.0'
APP_PORT = 9875
DEBUG = True

155
database.py Normal file
View File

@ -0,0 +1,155 @@
# database.py
import mysql.connector
from mysql.connector import Error
from config import Config
def create_database():
"""创建数据库(如果不存在)"""
try:
# 连接到MySQL服务器不带数据库名
conn = mysql.connector.connect(
host=Config.DB_CONFIG['host'],
user=Config.DB_CONFIG['user'],
password=Config.DB_CONFIG['password']
)
cursor = conn.cursor()
# 创建数据库
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.DB_CONFIG['database']}")
print(f"数据库 {Config.DB_CONFIG['database']} 已创建或已存在")
# 切换到新数据库
cursor.execute(f"USE {Config.DB_CONFIG['database']}")
# 创建表
sql_scripts = [
"""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(128) NOT NULL,
email VARCHAR(100),
is_admin BOOLEAN DEFAULT FALSE,
is_active tinyint(1) DEFAULT '1',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS certificate_authorities (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
common_name VARCHAR(100) NOT NULL,
organization VARCHAR(100),
organizational_unit VARCHAR(100),
country VARCHAR(2),
state VARCHAR(100),
locality VARCHAR(100),
key_size INT DEFAULT 2048,
days_valid INT DEFAULT 3650,
cert_path VARCHAR(255),
key_path VARCHAR(255),
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS certificates (
id INT AUTO_INCREMENT PRIMARY KEY,
common_name VARCHAR(100) NOT NULL,
san_dns TEXT,
san_ip TEXT,
organization VARCHAR(100),
organizational_unit VARCHAR(100),
country VARCHAR(2),
state VARCHAR(100),
locality VARCHAR(100),
key_size INT DEFAULT 2048,
days_valid INT DEFAULT 365,
cert_path VARCHAR(255),
key_path VARCHAR(255),
csr_path VARCHAR(255),
ca_id INT,
status ENUM('active', 'revoked', 'expired') DEFAULT 'active',
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NULL,
revoked_at TIMESTAMP NULL,
revocation_reason VARCHAR(255),
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id),
FOREIGN KEY (created_by) REFERENCES users(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS certificate_revocation_list (
id INT AUTO_INCREMENT PRIMARY KEY,
ca_id INT NOT NULL,
crl_path VARCHAR(255),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
next_update TIMESTAMP NULL,
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS captcha (
id INT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR(10) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
]
for script in sql_scripts:
cursor.execute(script)
conn.commit()
print("所有表已创建或已存在")
except Error as e:
print(f"数据库初始化错误: {e}")
raise
finally:
if conn.is_connected():
cursor.close()
conn.close()
def create_admin_user():
"""创建初始管理员用户"""
from werkzeug.security import generate_password_hash
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
cursor = conn.cursor(dictionary=True)
# 检查管理员是否已存在
cursor.execute("SELECT id FROM users WHERE username = %s", (Config.ADMIN_USERNAME,))
if cursor.fetchone():
print("管理员用户已存在")
return
# 创建管理员
password_hash = generate_password_hash(Config.ADMIN_PASSWORD)
cursor.execute("""
INSERT INTO users (username, password_hash, email, is_admin, is_active)
VALUES (%s, %s, %s, %s, %s)
""", (Config.ADMIN_USERNAME, password_hash, Config.ADMIN_EMAIL, True, True))
conn.commit()
print("管理员用户创建成功")
except Error as e:
print(f"创建管理员用户错误: {e}")
raise
finally:
if conn.is_connected():
cursor.close()
conn.close()
def initialize_database():
"""初始化数据库"""
create_database()
create_admin_user()